PublishSubject 源码分析
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subjects;
import java.util.*;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.Observer;
import rx.exceptions.*;
import rx.internal.operators.BackpressureUtils;
/**
* Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the
* subscriber.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
* <p>
* Example usage:
* <p>
* <pre> {@code
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
} </pre>
*
* @param <T>
* the type of items observed and emitted by the Subject
*/
public final class PublishSubject<T> extends Subject<T, T> {
final PublishSubjectState<T> state;
/**
* Creates and returns a new {@code PublishSubject}.
*
* @param <T> the value type
* @return the new {@code PublishSubject}
*/
public static <T> PublishSubject<T> create() {
return new PublishSubject<T>(new PublishSubjectState<T>());
}
protected PublishSubject(PublishSubjectState<T> state) {
super(state);
this.state = state;
}
@Override
public void onNext(T v) {
state.onNext(v);
}
@Override
public void onError(Throwable e) {
state.onError(e);
}
@Override
public void onCompleted() {
state.onCompleted();
}
@Override
public boolean hasObservers() {
return state.get().length != 0;
}
/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since 1.2
*/
public boolean hasThrowable() {
return state.get() == PublishSubjectState.TERMINATED && state.error != null;
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted}
* @since 1.2
*/
public boolean hasCompleted() {
return state.get() == PublishSubjectState.TERMINATED && state.error == null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since 1.2
*/
public Throwable getThrowable() {
if (state.get() == PublishSubjectState.TERMINATED) {
return state.error;
}
return null;
}
static final class PublishSubjectState<T>
extends AtomicReference<PublishSubjectProducer<T>[]>
implements OnSubscribe<T>, Observer<T> {
/** */
private static final long serialVersionUID = -7568940796666027140L;
@SuppressWarnings("rawtypes")
static final PublishSubjectProducer[] EMPTY = new PublishSubjectProducer[0];
@SuppressWarnings("rawtypes")
static final PublishSubjectProducer[] TERMINATED = new PublishSubjectProducer[0];
Throwable error;
@SuppressWarnings("unchecked")
public PublishSubjectState() {
lazySet(EMPTY);
}
@Override
public void call(Subscriber<? super T> t) {
PublishSubjectProducer<T> pp = new PublishSubjectProducer<T>(this, t);
t.add(pp);
t.setProducer(pp);
if (add(pp)) {
if (pp.isUnsubscribed()) {
remove(pp);
}
} else {
Throwable ex = error;
if (ex != null) {
t.onError(ex);
} else {
t.onCompleted();
}
}
}
boolean add(PublishSubjectProducer<T> inner) {
for (;;) {
PublishSubjectProducer<T>[] curr = get();
if (curr == TERMINATED) {
return false;
}
int n = curr.length;
@SuppressWarnings("unchecked")
PublishSubjectProducer<T>[] next = new PublishSubjectProducer[n + 1];
System.arraycopy(curr, 0, next, 0, n);
next[n] = inner;
if (compareAndSet(curr, next)) {
return true;
}
}
}
@SuppressWarnings("unchecked")
void remove(PublishSubjectProducer<T> inner) {
for (;;) {
PublishSubjectProducer<T>[] curr = get();
if (curr == TERMINATED || curr == EMPTY) {
return;
}
int n = curr.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (curr[i] == inner) {
j = i;
break;
}
}
if (j < 0) {
return;
}
PublishSubjectProducer<T>[] next;
if (n == 1) {
next = EMPTY;
} else {
next = new PublishSubjectProducer[n - 1];
System.arraycopy(curr, 0, next, 0, j);
System.arraycopy(curr, j + 1, next, j, n - j - 1);
}
if (compareAndSet(curr, next)) {
return;
}
}
}
@Override
public void onNext(T t) {
for (PublishSubjectProducer<T> pp : get()) {
pp.onNext(t);
}
}
@SuppressWarnings("unchecked")
@Override
public void onError(Throwable e) {
error = e;
List<Throwable> errors = null;
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
try {
pp.onError(e);
} catch (Throwable ex) {
if (errors == null) {
errors = new ArrayList<Throwable>(1);
}
errors.add(ex);
}
}
Exceptions.throwIfAny(errors);
}
@SuppressWarnings("unchecked")
@Override
public void onCompleted() {
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
pp.onCompleted();
}
}
}
static final class PublishSubjectProducer<T>
extends AtomicLong
implements Producer, Subscription, Observer<T> {
/** */
private static final long serialVersionUID = 6451806817170721536L;
final PublishSubjectState<T> parent;
final Subscriber<? super T> actual;
long produced;
public PublishSubjectProducer(PublishSubjectState<T> parent, Subscriber<? super T> actual) {
this.parent = parent;
this.actual = actual;
}
@Override
public void request(long n) {
if (BackpressureUtils.validate(n)) {
for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
return;
}
long u = BackpressureUtils.addCap(r, n);
if (compareAndSet(r, u)) {
return;
}
}
}
}
@Override
public boolean isUnsubscribed() {
return get() == Long.MIN_VALUE;
}
@Override
public void unsubscribe() {
if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
parent.remove(this);
}
}
@Override
public void onNext(T t) {
long r = get();
if (r != Long.MIN_VALUE) {
long p = produced;
if (r != p) {
produced = p + 1;
actual.onNext(t);
} else {
unsubscribe();
actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests"));
}
}
}
@Override
public void onError(Throwable e) {
if (get() != Long.MIN_VALUE) {
actual.onError(e);
}
}
@Override
public void onCompleted() {
if (get() != Long.MIN_VALUE) {
actual.onCompleted();
}
}
}
}
第一步:创建Observable:PublishSubject.create();
第二步:在PublishSubject类的Create()方法中,创建了观察者管理器PublishSubjectProducer
第三步:在PublishSubject的create方法中创建了new PublishSubject<T>(state,state)对象,并将管理器传入
第四步:在PublishSubject的.call方法中创建了new PublishSubject<T>(state,state)对象,并将管理器传入
第五步:在call()方法中创建了new PublishSubject<T>(state,state)对象,并将管理器传入
BehaviorSubject 使用
private BehaviorSubject<String> behaviorSubject;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
//我默认发送一条消息,以后都是正常
behaviorSubject = BehaviorSubject.create("发送默认消息");
}
public void click(View v) {
behaviorSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("main", "完成......");
}
@Override
public void onError(Throwable e) {
Log.i("main", "异常......");
}
@Override
public void onNext(String message) {
Log.i("main", "消息:" + message);
}
});
}
public void send(View v) {
behaviorSubject.onNext("你好,要坚持住.....");
}
结果输出:
08-03 10:26:07.037 18544-18544/com.haocai.architect.rxjava I/main: 消息:发送默认消息
BehaviorSubject 源码
public final class BehaviorSubject<T> extends Subject<T, T> {
/** An empty array to trigger getValues() to return a new array. */
private static final Object[] EMPTY_ARRAY = new Object[0];
private final SubjectSubscriptionManager<T> state;
/**
* Creates a {@link BehaviorSubject} without a default item.
*
* @param <T>
* the type of item the Subject will emit
* @return the constructed {@link BehaviorSubject}
*/
public static <T> BehaviorSubject<T> create() {
return create(null, false);
}
/**
* Creates a {@link BehaviorSubject} that emits the last item it observed and all subsequent items to each
* {@link Observer} that subscribes to it.
*
* @param <T>
* the type of item the Subject will emit
* @param defaultValue
* the item that will be emitted first to any {@link Observer} as long as the
* {@link BehaviorSubject} has not yet observed any items from its source {@code Observable}
* @return the constructed {@link BehaviorSubject}
*/
public static <T> BehaviorSubject<T> create(T defaultValue) {
return create(defaultValue, true);
}
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
if (hasDefault) {
state.setLatest(NotificationLite.next(defaultValue));
}
state.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.getLatest());
}
};
state.onTerminated = state.onAdded;
return new BehaviorSubject<T>(state, state);
}
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
this.state = state;
}
@Override
public void onCompleted() {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.completed();
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
}
}
}
@Override
public void onError(Throwable e) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
try {
bo.emitNext(n);
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}
Exceptions.throwIfAny(errors);
}
}
@Override
public void onNext(T v) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.next(v);
for (SubjectObserver<T> bo : state.next(n)) {
bo.emitNext(n);
}
}
}
/* test support */ int subscriberCount() {
return state.observers().length;
}
@Override
public boolean hasObservers() {
return state.observers().length > 0;
}
/**
* Check if the Subject has a value.
* <p>Use the {@link #getValue()} method to retrieve such a value.
* <p>Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value
* retrieved by {@code getValue()} may get outdated.
* @return true if and only if the subject has some value and hasn't terminated yet.
* @since 1.2
*/
public boolean hasValue() {
Object o = state.getLatest();
return NotificationLite.isNext(o);
}
/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since 1.2
*/
public boolean hasThrowable() {
Object o = state.getLatest();
return NotificationLite.isError(o);
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted()}
* @since 1.2
*/
public boolean hasCompleted() {
Object o = state.getLatest();
return NotificationLite.isCompleted(o);
}
/**
* Returns the current value of the Subject if there is such a value and
* the subject hasn't terminated yet.
* <p>The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()}
* and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an
* exception or the Subject terminated (with or without receiving any value).
* @return the current value or {@code null} if the Subject doesn't have a value,
* has terminated or has an actual {@code null} as a valid value.
* @since 1.2
*/
public T getValue() {
Object o = state.getLatest();
if (NotificationLite.isNext(o)) {
return NotificationLite.getValue(o);
}
return null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since 1.2
*/
public Throwable getThrowable() {
Object o = state.getLatest();
if (NotificationLite.isError(o)) {
return NotificationLite.getError(o);
}
return null;
}
/**
* Returns a snapshot of the currently buffered non-terminal events into
* the provided {@code a} array or creates a new array if it has not enough capacity.
* @param a the array to fill in
* @return the array {@code a} if it had enough capacity or a new array containing the available values
* @since 1.2
*/
@SuppressWarnings("unchecked")
public T[] getValues(T[] a) {
Object o = state.getLatest();
if (NotificationLite.isNext(o)) {
if (a.length == 0) {
a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
}
a[0] = NotificationLite.getValue(o);
if (a.length > 1) {
a[1] = null;
}
} else
if (a.length > 0) {
a[0] = null;
}
return a;
}
/**
* Returns a snapshot of the currently buffered non-terminal events.
* <p>The operation is thread-safe.
*
* @return a snapshot of the currently buffered non-terminal events.
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
* @since 1.2
*/
@SuppressWarnings("unchecked")
public Object[] getValues() {
T[] r = getValues((T[])EMPTY_ARRAY);
if (r == EMPTY_ARRAY) {
return new Object[0]; // don't leak the default empty array.
}
return r;
}
}
ReplaySubject 基本使用
private ReplaySubject<String> replaySubject;
private EditText editText;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
//缓存所有的消息,一旦有新的观察者订阅了,会立马将缓存的所有信息,发送给对应的观察者
replaySubject = ReplaySubject.create();
}
public void click(View v) {
replaySubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("main", "完成......");
}
@Override
public void onError(Throwable e) {
Log.i("main", "异常......");
}
@Override
public void onNext(String message) {
Log.i("main", "消息:" + message);
}
});
}
public void send(View v) {
if(editText==null){
editText = (EditText) findViewById(R.id.et_text);
}
replaySubject.onNext(editText.getText().toString());
}
AsyncSubject 基本使用
private AsyncSubject<String> asyncSubject;
private EditText editText;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
//总结
//特点一:我们发送消息的时候,调用了onNext方法,必须还要调用onComplete()方法提交
//消息才能发送出去,要不然观察者收不到消息
//特点二:采用了AsyncSubject创建并且添加注册观察者,那么所有的观察者,只会有且只能够收到最近的一条消息
asyncSubject = AsyncSubject.create();
}
public void click(View v) {
asyncSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("main", "完成......");
}
@Override
public void onError(Throwable e) {
Log.i("main", "异常......");
}
@Override
public void onNext(String message) {
Log.i("main", "消息:" + message);
}
});
}
public void send(View v) {
// if(editText==null){
// editText = (EditText) findViewById(R.id.et_text);
// }
asyncSubject.onNext("hello world");
asyncSubject.onNext("qwert");
asyncSubject.onNext("asdfg");
asyncSubject.onNext("zxcvb");
asyncSubject.onCompleted();
}
结果输出:
08-03 14:33:56.457 15968-15968/com.haocai.architect.rxjava I/main: 消息:zxcvb
08-03 14:33:56.457 15968-15968/com.haocai.architect.rxjava I/main: 完成......
AsyncSubject 源码分析
public final class AsyncSubject<T> extends Subject<T, T> {
final SubjectSubscriptionManager<T> state;
volatile Object lastValue;
/**
* Creates and returns a new {@code AsyncSubject}.
* @param <T> the result value type
* @return the new {@code AsyncSubject}
*/
public static <T> AsyncSubject<T> create() {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
state.onTerminated = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
Object v = state.getLatest();
if (v == null || NotificationLite.isCompleted(v)) {
o.onCompleted();
} else
if (NotificationLite.isError(v)) {
o.onError(NotificationLite.getError(v));
} else {
o.actual.setProducer(new SingleProducer<T>(o.actual, NotificationLite.<T>getValue(v)));
}
}
};
return new AsyncSubject<T>(state, state);
}
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
this.state = state;
}
@Override
public void onCompleted() {
if (state.active) {
Object last = lastValue;
if (last == null) {
last = NotificationLite.completed();
}
for (SubjectObserver<T> bo : state.terminate(last)) {
if (last == NotificationLite.completed()) {
bo.onCompleted();
} else {
bo.actual.setProducer(new SingleProducer<T>(bo.actual, NotificationLite.<T>getValue(last)));
}
}
}
}
@Override
public void onError(final Throwable e) {
if (state.active) {
Object n = NotificationLite.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
try {
bo.onError(e);
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}
Exceptions.throwIfAny(errors);
}
}
@Override
public void onNext(T v) {
lastValue = NotificationLite.next(v);
}
@Override
public boolean hasObservers() {
return state.observers().length > 0;
}
/**
* Check if the Subject has a value.
* <p>Use the {@link #getValue()} method to retrieve such a value.
* <p>Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value
* retrieved by {@code getValue()} may get outdated.
* @return true if and only if the subject has some value but not an error
* @since 1.2
*/
public boolean hasValue() {
Object v = lastValue;
Object o = state.getLatest();
return !NotificationLite.isError(o) && NotificationLite.isNext(v);
}
/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since 1.2
*/
public boolean hasThrowable() {
Object o = state.getLatest();
return NotificationLite.isError(o);
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted()}
* @since 1.2
*/
public boolean hasCompleted() {
Object o = state.getLatest();
return o != null && !NotificationLite.isError(o);
}
/**
* Returns the current value of the Subject if there is such a value and
* the subject hasn't terminated with an exception.
* <p>The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()}
* and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an
* exception or the Subject terminated without receiving any value.
* @return the current value or {@code null} if the Subject doesn't have a value,
* has terminated with an exception or has an actual {@code null} as a value.
* @since 1.2
*/
public T getValue() {
Object v = lastValue;
Object o = state.getLatest();
if (!NotificationLite.isError(o) && NotificationLite.isNext(v)) {
return NotificationLite.getValue(v);
}
return null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since 1.2
*/
public Throwable getThrowable() {
Object o = state.getLatest();
if (NotificationLite.isError(o)) {
return NotificationLite.getError(o);
}
return null;
}
}
Observable中其他的一些api
第一个:just()方法
AppInfo appInfo1 = new AppInfo("Michael同学", 0);
AppInfo appInfo2 = new AppInfo("Michael同学", 0);
AppInfo appInfo3 = new AppInfo("Michael同学", 0);
Observable.just(appInfo1, appInfo2, appInfo3);
//源码
public static <T> Observable<T> just(T t1, T t2, T t3) {
return from((T[])new Object[] { t1, t2, t3 });
}
单个对象转数组
第二个:repeat()方法
public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
final Observable<T> source;
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
final boolean stopOnComplete;
final boolean stopOnError;
private final Scheduler scheduler;
static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> ts) {
return ts.map(new Func1<Notification<?>, Notification<?>>() {
@Override
public Notification<?> call(Notification<?> terminal) {
return Notification.createOnNext(null);
}
});
}
};
public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> {
final long count;
public RedoFinite(long count) {
this.count = count;
}
@Override
public Observable<?> call(Observable<? extends Notification<?>> ts) {
return ts.map(new Func1<Notification<?>, Notification<?>>() {
int num;
@Override
public Notification<?> call(Notification<?> terminalNotification) {
if (count == 0) {
return terminalNotification;
}
num++;
if (num <= count) {
return Notification.createOnNext(num);
} else {
return terminalNotification;
}
}
}).dematerialize();
}
}
public static final class RetryWithPredicate implements Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>> {
final Func2<Integer, Throwable, Boolean> predicate;
public RetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
this.predicate = predicate;
}
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
@SuppressWarnings("unchecked")
@Override
public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
final int value = n.getValue();
if (predicate.call(value, term.getThrowable())) {
return Notification.createOnNext(value + 1);
} else {
return (Notification<Integer>) term;
}
}
});
}
}
public static <T> Observable<T> retry(Observable<T> source) {
return retry(source, REDO_INFINITE);
}
public static <T> Observable<T> retry(Observable<T> source, final long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 expected");
}
if (count == 0) {
return source;
}
return retry(source, new RedoFinite(count));
}
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
}
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
}
public static <T> Observable<T> repeat(Observable<T> source) {
return repeat(source, Schedulers.trampoline());
}
public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
return repeat(source, REDO_INFINITE, scheduler);
}
public static <T> Observable<T> repeat(Observable<T> source, final long count) {
return repeat(source, count, Schedulers.trampoline());
}
public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
if (count == 0) {
return Observable.empty();
}
if (count < 0) {
throw new IllegalArgumentException("count >= 0 expected");
}
return repeat(source, new RedoFinite(count - 1), scheduler);
}
public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
}
public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
}
public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
}
private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
Scheduler scheduler) {
this.source = source;
this.controlHandlerFunction = f;
this.stopOnComplete = stopOnComplete;
this.stopOnError = stopOnError;
this.scheduler = scheduler;
}
@Override
public void call(final Subscriber<? super T> child) {
// when true is a marker to say we are ready to resubscribe to source
final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
// incremented when requests are made, decremented when requests are fulfilled
final AtomicLong consumerCapacity = new AtomicLong();
final Scheduler.Worker worker = scheduler.createWorker();
child.add(worker);
final SerialSubscription sourceSubscriptions = new SerialSubscription();
child.add(sourceSubscriptions);
// use a subject to receive terminals (onCompleted and onError signals) from
// the source observable. We use a BehaviorSubject because subscribeToSource
// may emit a terminal before the restarts observable (transformed terminals)
// is subscribed
final Subject<Notification<?>, Notification<?>> terminals = BehaviorSubject.<Notification<?>>create().toSerialized();
final Subscriber<Notification<?>> dummySubscriber = Subscribers.empty();
// subscribe immediately so the last emission will be replayed to the next
// subscriber (which is the one we care about)
terminals.subscribe(dummySubscriber);
final ProducerArbiter arbiter = new ProducerArbiter();
final Action0 subscribeToSource = new Action0() {
@Override
public void call() {
if (child.isUnsubscribed()) {
return;
}
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
boolean done;
@Override
public void onCompleted() {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnCompleted());
}
}
@Override
public void onError(Throwable e) {
if (!done) {
done = true;
unsubscribe();
terminals.onNext(Notification.createOnError(e));
}
}
@Override
public void onNext(T v) {
if (!done) {
child.onNext(v);
decrementConsumerCapacity();
arbiter.produced(1);
}
}
private void decrementConsumerCapacity() {
// use a CAS loop because we don't want to decrement the
// value if it is Long.MAX_VALUE
while (true) {
long cc = consumerCapacity.get();
if (cc != Long.MAX_VALUE) {
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
break;
}
} else {
break;
}
}
}
@Override
public void setProducer(Producer producer) {
arbiter.setProducer(producer);
}
};
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
sourceSubscriptions.set(terminalDelegatingSubscriber);
source.unsafeSubscribe(terminalDelegatingSubscriber);
}
};
// the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'
// type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert
// the retry/repeat relevant values to the control handler
final Observable<?> restarts = controlHandlerFunction.call(
terminals.lift(new Operator<Notification<?>, Notification<?>>() {
@Override
public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
return new Subscriber<Notification<?>>(filteredTerminals) {
@Override
public void onCompleted() {
filteredTerminals.onCompleted();
}
@Override
public void onError(Throwable e) {
filteredTerminals.onError(e);
}
@Override
public void onNext(Notification<?> t) {
if (t.isOnCompleted() && stopOnComplete) {
filteredTerminals.onCompleted();
} else if (t.isOnError() && stopOnError) {
filteredTerminals.onError(t.getThrowable());
} else {
filteredTerminals.onNext(t);
}
}
@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
};
}
}));
// subscribe to the restarts observable to know when to schedule the next redo.
worker.schedule(new Action0() {
@Override
public void call() {
restarts.unsafeSubscribe(new Subscriber<Object>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Object t) {
if (!child.isUnsubscribed()) {
// perform a best endeavours check on consumerCapacity
// with the intent of only resubscribing immediately
// if there is outstanding capacity
if (consumerCapacity.get() > 0) {
worker.schedule(subscribeToSource);
} else {
// set this to true so that on next request
// subscribeToSource will be scheduled
resumeBoundary.compareAndSet(false, true);
}
}
}
@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
});
}
});
child.setProducer(new Producer() {
@Override
public void request(final long n) {
if (n > 0) {
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
arbiter.request(n);
if (resumeBoundary.compareAndSet(true, false)) {
worker.schedule(subscribeToSource);
}
}
}
});
}
}
数据重复操作
第三个:defer()方法
只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。(懒加载)
private Observable<AppInfo> observable;
private AppInfoAdapter appInfoAdapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
initView();
// registerObserver();
}
private void initView() {
ListView listView = (ListView) findViewById(R.id.lv_app_name);
appInfoAdapter = new AppInfoAdapter(this);
listView.setAdapter(appInfoAdapter);
}
/**
* 创建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
// 一旦观察者订阅,那么会立马回调Func0中的call方法,创建Observable(懒加载)
return Observable.defer(new Func0<Observable<AppInfo>>() {
// @Override
// public Observable<AppInfo> call() {
// return Observable.create(new Observable.OnSubscribe<AppInfo>() {
//
// @Override
// public void call(Subscriber<? super AppInfo> t) {
// t.onNext(new AppInfo("梦想同学", 0));
// t.onCompleted();
// }
// });
// }
@Override
public Observable<AppInfo> call() {
Log.i("main", "创建了Observable......");
AppInfo appInfo1 = new AppInfo("Michael同学", 0);
AppInfo appInfo2 = new AppInfo("Michael同学", 0);
AppInfo appInfo3 = new AppInfo("Michael同学", 0);
return Observable.just(appInfo1, appInfo2, appInfo3).repeat(3);
}
});
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
// 完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
// 添加数据
appInfoAdapter.addAppInfo(t);
}
});
}
点击onClick 加载9条数据
第四个:ranger()方法
private Observable<Integer> observable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
}
/**
* 创建Observable
*
* @return
*/
private Observable<Integer> getApps() {
// 从指定位置开始发送,你需向后发送多少个
// 第一个参数:起始值
// 第二个参数:数量
// 注意:第一个参数根据数量进行累加,之后发送
// range()函数用两个数字作为参数:第一个是起始点,第二个是我们想发射数字的个数。
return Observable.range(5, 5);
}
public void click(View v) {
observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
// 完成之后刷新UI
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
// 添加数据
Log.i("main", "接收到的数据: " + t);
}
});
}
结果输出:
08-04 05:49:20.818 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 5
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 6
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 7
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 8
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 9
第五个:interval()方法
interval():创建一个按固定时间间隔发射整数序列的Observable
这个比较好理解,interval()也是用来创建Observable的,并且也可以延迟发送。但interval()是按周期执行的,所以可以这么认为:interval()是一个可以指定线程的TimerTask(威力加强版……)
第六个:timer()方法
private Observable<Long> observable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
}
/**
* 创建Observable
*
* @return
*/
private Observable<Long> getApps() {
// timer方法解释
// 第一个参数:发送消息之间的间隔时间
// 第二个参数:时间单位(毫秒、秒、分钟、小时等等......)
return Observable.timer(3, TimeUnit.SECONDS);
}
public void click(View v) {
// 轮训通知观察者
observable.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
// 完成之后刷新UI
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long t) {
// 添加数据
Log.i("main", "接收到的数据: " + t);
}
});
}
输出结果
08-04 08:25:14.754 24259-24357/com.haocai.architect.rxjava I/main: 接收到的数据: 0
总结:
使用Observable步骤:
准备观察者、准备Observable,注册观察者(observable.subscribe(new Observer()))、取消订阅(subscribe.unsubscribe()) 以及中间一些逻辑处理