RxJava 相信各位已经使用了很久,大部分人在刚学习 RxJava 感叹切换线程的方便,调用逻辑清晰的同时,并不知道其中的原理,主要是靠记住运行的顺序。
随着我们设计出的 RxJava流 越来越复杂,一些复杂的问题并不能靠着记住的运行顺序就能解决。
下面,就通过最常用的操作符的源码来看看所谓的流
是什么运行的。
首先我们用Single
举例,设计一个最基本的 RxJava 流,只有一个 Observable(ColdObservable)
和Obsever
:
Disposable disposable = Single.just("wtf")
.subscribe(it -> Log.i("subscribe", it));
上游发送一个"wtf"
,下游接受时将其打印出来。上游发送端使用 Single.just
作为创建方法,
看一下 just()
方法里做了什么。
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "value is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
其中 ObjectHelper.requireNonNull
只是空检查。
RxJavaPlugins.onAssembly
方法,这个方法其实就是通过一个全局的变量 onSingleAssembly
来对方法进行 Hook ,这一系列xxxAssembly
全局变量默认为空,所以实际上当我们没有设置的时候其实 just
方法是直接返回了一个 新实例化的SingleJust
对象。
再看看SingleJust
内部:
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
实例化的时候只是将值保存了下来,没有其它操作。
下一步调用subscribe()
来启动这个流(ColdObservable)
,然后看看subscribe
中做了什么:
public final void subscribe(SingleObserver<? super T> subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
try {
//核心逻辑
subscribeActual(subscriber);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
同样 RxJavaPlugins.onSubscribe
默认没有作用,实际的核心逻辑是调用了subscribeActual(SingleObserver)
。
对于我们上面设计的流,则是调用了 SingleJust 中的 subscribeActual(SingleObserver)
回顾上面 SingleJust
中 subscribeActual(SingleObserver)
的实现:
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
得到两个信息
- 首先调用下游观察者
SingleObserver
的OnSubscribe
方法并传递用于取消操作的Disposable
- 调用
OnSuccess
方法并传递之前保存下来的value
Map 操作符
现在我们加入一个常用且重要的Map
操作到流中
Disposable disposable = Single.just("wtf")
.map(it-> 0)
.subscribe(it -> Log.i("subscribe", String.of(it)));
上面这个流包括了三种典型的操作 创建Creation
操作符Transformation
和 订阅Subscribe
。
依然先检查map()
方法,可以看到其中实例化了一个SingleMap
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
再看看 SingleMap
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
类中信息稍微复杂一些:
- 首先我们关注在
SingleMap
实例化的时候也是只做了保存数据的操作,而没有实际逻辑:将流的上游保存为source
将数据转换的方法保存为mapper
- 第二步我们知道下游观察者
SingleObserver
会调用核心逻辑subscribeActual
方法来启动流 - 在这里的
subscribeActual
方法中可以看到几个重要的信息-
MapSingleObserver
是一个观察者 -
MapSingleObserver
保存了下游的观察者SingleObserver
以及mapper
- 上游
source
被MapSingleObserver
订阅
-
由此可以看出在SingleMap
被下游观察者订阅了之后,实例化了一个新的观察者MapSingleObserver
并保存下游观察者SingleObserver
的信息,再去订阅上游SingleJust
。
这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能的设计模式称为装饰者模式
。
总结上面的执行顺序:
- 在
Rx流
的最后一步调用subscribe
启动流(ColdObservable)
- 首先执行
SingleMap
中的subscribeActual
方法,其中包括生成新的MapSingleObserver
并订阅SingleJust
- 执行
SingleJust
中的subscribeActual
:调用下游MapSingleObserver
的onSubscribe
onSuccess
方法 -
MapSingleObserver
中的onSubsribe
onSuccess
方法也很简单,分别调用下游Observer
的onSubsribe``onSuccess(异常时 onError)
方法
observeOn 操作符
Rxjava首先被大家津津乐道之处是可以方便的切换线程,避免Callback Hell
,现在来看看线程切换操作符。
我们加入线程切换操作符 observeOn
Disposable disposable = Single.just("wtf")
.map(it-> 0)
.observeOn(Schedulers.io())
.subscribe(it -> Log.i("subscribe", String.of(it)));
同样的,在 observeOn
方法中实例化了一个SingleObserveOn
public final Single<T> observeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
}
继续看SingleObserveOn
类中信息
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> actual;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
类似的
- 构造函数中保存了上游和线程切换的信息
-
subscribeActual
实例化了一个新的观察者ObserveOnSingleObserver
不同的
-
ObserveOnSingleObserver
还继承了AtomicReference<Disposable>
、实现了Disposable``Runnable
接口 -
onSuccess``onError
中都没有直接调用下游的onSuccess
onError
方法,而是调用了Disposable d = scheduler.scheduleDirect(this);
来执行run
方法中的逻辑,而run
方法中的逻辑则是调用下游的onSuccess
onError
方法
查看schedulerDirect
内部信息
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
创建了一个对应线程的Worker
和一个可用于取消的DisposeTask
并执行,对于IoScheduler
则是创建了EventLoopWorker
,再看看EventLoopWorker
中的信息。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
EventLoopWorker
中则是维护了一套包含相应的线程池
、可取消的CompositeDisposable
、以及用于运行Runable
的ThreadWorker
。总的来说就是一套可以在相应线程运行且可取消的类和逻辑。
- 上面则解释了为什么
observeOn
可以切换下游的线程(onSuccess
onError
) - 同样解释了为什么不会改变
onSubsribe
的调用线程,因为可以看到onSubscribe
方法中直接调用了下游的onSucsribe
,并没有受到线程切换的影响。
SubscribeOn
实际上,subsribeOn
是 RxJava2.x 中比较复杂也是相较于 RxJava1.x 改动比较大的一个操作符,它甚至会影响流的执行顺序。(可以参见唐雪茂写的 Rxjava流的设计 中的1
2
两个流)
我们现在设计两个Rx流
Disposable disposable = Single.just("wtf")
.doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
.subscribe(it -> Log.i("subscribe", 4);
Disposable disposable2 = Single.just("wtf")
.doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
.subscribeOn(Schedulers.io())
.doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
.subscribe(it -> Log.i("subscribe", 4);
你可能已经知道并记住了两个流的打印的顺序分别是 01234
23014
,但是为什么doOnSubsribe
方法和RxJava1
中调用顺序完全不一样,为什么通过subscribeOn
切换线程会影响执行顺序?
先找到 SingleSubscribeOn
类
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
//直接调用下游 onSubscribe
s.onSubscribe(parent);
//再执行订阅上游的方法
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> actual;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.actual = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
//没有继续调用下游的 onSubscribe 方法
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
task.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void run() {
source.subscribe(this);
}
}
}
同样的直接看subscribeActual
方法及onSubscribe
方法,发现事情并没有那么简单,和之前的操作符的逻辑区别很大:
-
SubscribeOnObserver
同样还继承了AtomicReference<Disposable>
,实现了Disposable``Runnable
接口 - 并没有直接调用
subscribe
订阅上游,而是执行了其它操作符在onSubscribe
中订阅下游的操作 - 然后再结合
Disposable f = scheduler.scheduleDirect(parent);
和run
方法可以知道在新的线程中执行了订阅上游的操作source.subscribe(this);
-
onSubsribe
中并没有再继续调用下游的onSubsribe
综合起来可以知道,本来应该在整个流从下至上订阅完成后按照从上至下的顺序执行 onSubscribe
的流,在使用subsribeOn
操作符的后,在订阅的时(执行subscribeActual
),就开始执行下游的onSubscribe
且在当前线程!然后才在指定的io
线程执行之下而上的操作,这也是为什么subsribeOn
影响的是上游的线程。
小结:
我认为实际上 Rx 使用了很多优秀的设计将我们各种常用的操作进行了封装,让我们自由组合使用,其本身并没有用什么黑科技。例如切换线程本质上则是帮我们启用了一个新的线程并把接下来的代码放进去执行。
当然,其中还有很多更深入的内容需要我们继续发现和学习。