先来个RxAndroid的github地址
官方例子
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())吧
.subscribe(/* an Observer */);
- - -
###简化例子
```java
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(LOG_TAG,"[onSubscribe] " + Thread.currentThread().getId());
}
@Override
public void onNext(String value) {
Log.i(LOG_TAG,"[onNext] "+value + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.i(LOG_TAG,"[onError] "+e);
}
@Override
public void onComplete() {
Log.i(LOG_TAG,"[onComplete] "+Thread.currentThread().getId());
}
};
Observable.just("next -- > 1","next --> 2")
.subscribe(observer);
- 接下来看看Observable.just()方法的实现
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
/*
ObjectHelper.requireNonNull(item1, "The first item is null");
这个方法仅仅是判断item1是不是null
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
*/
return fromArray(item1, item2);
}
- 可以看到just方法最后调用了 fromArray() 方法 接下来看看fromArray方法的实现
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE); } else if (items.length == 1) { return return RxJavaPlugins.onAssembly(new ObservableJust<T>(items[0])); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }
- 可以看到fromArray方法就是调用了RxJavaPlugins.onAssembly这个方法,根据items的长度不同传递了不同的参数
- 先分析items.length == 1的情况,这个情况下传入的实例是:new ObservableJust<T>(items[0])),对应上面的例子就是new ObservableJust<String>("one"));
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } @Override public T call() { return value; }
}
- 这里有些疑问。subscribeActual()这个方法是干什么的?以及里面的**ScalarDisposable**又是做什么的?先放一放后面再说
- 再来看看**RxJavaPlugins.onAssembly**这个方法
```java
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
- 这里有一个非空的判断(以后再说这个),可以先理解成就直接吧source返回
- 小结一哈just方法,首先是判断传入的item是不是空,如果不是空就调用了fromArray方法,在fromArray里面构造了一个Observable对象,然后直接返回。
- just方法构造完了以后就调用了subscribe()方法并传入了一个Observer对象。看看suubscribe的核心代码(就两句话)
public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); }
- 先来看看RxJavaPlugins.onSubscribe(this, observer)这个究竟做了什么事~~
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) { BiFunction<Observable, Observer, Observer> f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); } return observer; }
- 这里面又有一个非空的判断,不管这个非空判断,也就是直接返回了传入的observer对象
- 现在代码就比较清晰了,其实就是直接调用了subscribeActual(observer)这个方法
-
小结一哈,Observable.just()根据参数的长度构造了一个特定的Observable对象并返回,然后调用了该对象的subscribeActual方法并传入observer
- 接下来再来看前面留下的问题,fromArray方法里面有根据items的长度进行实例化不同的Observable
- item长度为1的时候 --> ObservableJust
- 它的subscribeActual()方法
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}- 这里又引入了一个新的类**ScalarDisposable**,来看看这个又是做什么的 ```java public static final class ScalarDisposable<T> extends AtomicInteger //这个类是用来原子操作的类,java里面i++都不是线程安全的~~ implements QueueDisposable<T>, Runnable { private static final long serialVersionUID = 3880992722410194083L; final Observer<? super T> observer; final T value; static final int START = 0; static final int FUSED = 1; static final int ON_NEXT = 2; static final int ON_COMPLETE = 3; public ScalarDisposable(Observer<? super T> observer, T value) { this.observer = observer; this.value = value; } //中间省略了一大堆方法~~ @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { //上面就是比较和赋值原子操作 observer.onNext(value);//在这里可以看到调用了onNext() if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } }
- 它的subscribeActual()方法
- 当items的长度大于1的时候 --> ObservableFromArray
- 它的subscribeActual()方法
public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); }
- 这里又出现了一个新的Disposable --> FromArrayDisposable,但是不管怎么样儿,最后都调用了d.run()方法
- item长度为1的时候 --> ObservableJust
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> actual;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {//for循环调用撒~~
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);//调用onNext方法
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
```
小结:
调用just的时候构造了一个Observable对象,并根据不同的参数实例化不同的Observable,不同的Observable有不同的subscribeActual()方法实现,subscribeActual方法里面都有一个Disposable对象,最后都调用了Disposable的run(该方法调用了onNext()方法)方法,最后在subscribe的时候实际上就是调用了Observable的subscribeActual方法。
线程切换分析
eg:
Observable.just("next -- > 1","next --> 2")
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(observer);
-
subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
- 可以看到这里也是调用了RxJavaPlugins.onAsswmbly()方法,只是这里的参数变成了ObservableSubscribeOn的实例。
- ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); } ....省略其他代码
- 可以看到里面主要是调用了,Scheduler的schedulerDirect()方法,并在这个里面调用了,source.subscribe()
- 这里我们就仅仅去看看Scheduler.newThread()的实现
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
//省略部分代码....
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
return sr;
}
```
- 这里就可以看出来实际上就是用的线程池来做的~~
- observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
- 这个也是调用了RxJavaPlugins的onAssembly方法,传入的对象是ObservableObserveOn的实例。
- ObservableObserveOn
//仅仅提出了核心代码哈 protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
- 这里对传入的Scheduler进行了判断,如果是TrampolineScheduler类型就直接调用了,Source的subscribe方法,这个Scource其实就是调用observeOn方法的Observable
- 先来看看当Scheduler是newThreadScheduler的时候,可以看到实例化了一个ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer<? super T> actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue<T> queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } //省略了很多代码.... @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } ``` - 从源码可以看出来,**ObserveOnObserver**其实就是对Observer的一个包装 - 在**onNext**方法中可以看到线程切换的代码
小结:
其实搞了半天就是一个线程池在里面切换,对对象的各种包装。subscribeOn就是对Observable的包装,切换了线程来调用source.subscribe()方法,而observeOn则是对Observer的包装,并重写了里面的回调方法,在回调的时候会自动切换线程。
AndroidSchedulers.mainThread()这个Scheduler的分析
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
- 可以看到最后都是实例化了HandlerScheduler,不同的是Looper的不同,
- 再来看看HandlerScheduler的实现(仅仅贴出了主要的两个方法)
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
```
- 最后都是用的handler.postDelayed方法来做的线程切换,so android上面的Schulder其实就是用了,Handler机制~~
RxAndroid使用不当会有内存泄漏的哦~~
Nothing is certain in this life. The only thing i know for sure is that. I love you and my life. That is the only thing i know. have a good day