RxJava2源码学习
Rxjava最引以为傲的链式操作,每个方法都是产生一个Obserable,这样才能链式调用。每个方法产生的Obserable内部都有三个东西,代理Observer,下游Observer,上游Obserable。
1.事件的创建:
这是一段没有任何操作符和线程调度的代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
进去看看create操作符到底干了什么,怎么创建的一个Obserable。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//检查是否为null
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
--->onAssembly,这里创建了一个ObservableCreate传了进去
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
//f默认为null
if (f != null) {
return apply(f, source);
}
return source;
}
这个方法接受一个Obserable,方法里的f默认为null,需要我们预先设置,这个有关hook,一般用不到,后面所有有关hook的代码都先忽略。所以实际上就是返回了我们刚刚传进来的suorce。这样create方法实际返回了ObservableCreate对象,就是我们需要的Obserable了。
--->ObservableCreate,构造方法的代码,可以看到继承自Obserable
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//构造方法传进来了我们在外面的回调
this.source = source;
}
}
创建完成了,再看订阅:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//有关hook,忽略
observer = RxJavaPlugins.onSubscribe(this, observer);
//检查observer是否为null
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//这是实际订阅方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
--->subscribeActual,这是个Obserable的抽象方法,需要子类具体Obserable去实现。我们去刚刚使用create方法创建的ObservableCreate看看实现。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建一个发射器,同时也是一个开关,数据将由它源源不断的发射到下游
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用下游观察者的onSubscribe将开关传递下去,用来控制事件发射
observer.onSubscribe(parent);
try {
//suorce就是我们的回调类,在subscribe里面我们操纵e.onNext(),e.onComplete(),e.onError()发射事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里可以看到上游的Obserable已经可以将事件发射出去了,那到底怎么传递到下游的,既然是通过e.onNext(),e.onComplete(),e.onError()发射出去的,看看呗
//继承了发射器ObservableEmitter,实现了Disposable开关,同时还发现它还继承了AtomicReference<Disposable>,AtomicXXX系列的类是线程安全的原子操作类,不用加锁,Rxjava里面的Disposable开关的控制就是通过它来保证线程安全的。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//构造方法,将下游Observer保存
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
//重点,数据传递
@Override
public void onNext(T t) {
//null判断,因此Rxjava2不能发射null了。
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//检查控制订阅关系的开关Dispose,不为false才发送数据
if (!isDisposed()) {
observer.onNext(t);
}
}
//异常事件
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
//同样检查开关
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//这是finally块,因为产生onError或complete,就意味着订阅关系已终止,必须解除
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
//同样检查开关
if (!isDisposed()) {
try {
//这是finally块,因为产生onError或complete,就意味着订阅关系已终止,必须解除
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
//解除订阅关系
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
//是否已解除订阅关系
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
2.操作符map:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//RxJavaPlugins.onAssembly有关hook,实际就是返回了ObservableMap对象,上面讲过
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
--->ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//保存上游Obserable
super(source);
//保存提供实际转换操作的外部回调对象
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//这里创建了一个Observer,用于订阅上游,这样数据才能链式传递,但是它只是一个中间代理,用于接受上游数据,但是还需要转换并且传递到下游。所以传进去下游Observer t和回调的 function对象。
source.subscribe(new MapObserver<T, U>(t, function));
}
}
从第一个创建的Obserable说起,它要做的工作是调用下游的onNext等等方法传递事件。那么这需要一个Observer对象,但是现在我们做了中间操作,事件需要经过处理,因此就需要在本节点Obserable内部维护一个代理Observer用于订阅上游的事件,然后完成特定的操作如map数据类型转换。再继续调用下游的Observer.onNext等方法,将事件传递下去。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//保存下游observer
super(actual);
//保存外部转换操作的回调对象
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//当onComplete或onError事件发生后,done为true,是开关
if (done) {
return;
}
//sourceMode默认为NONE
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//数据转换
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//将数据传递给调用下游
actual.onNext(v);
}
}
在中间插了个map操作符的Rx链子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return null;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Obserable--->map--->Observer
Obserable链子的产生是从上游到下游,每个方法都是产生一个Obserable,每个下游的Obserable在创建时就保存了上游的Obserable。事件订阅动作肯定是发生在最后一个Obserable。每次Obserable的subcribe动作都是直接调用的subscribeActual方法
map Obserable订阅observer:
ObservableMap.subcribe(observer);
再到-->
@Override
public void subscribeActual(Observer<? super U> t) {
//这个suorce哪来的,不就是ObservableMap创建时传进来的上游Obserable吗
source.subscribe(new MapObserver<T, U>(t, function));
}
订阅到这里还不够啊,因为数据源在最顶部d的Obserable,于是必须要创建中间代理Observer订阅上游Obserable,接受上游的事件。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//将下游Observer保存
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//开关也是这里传递下去的
observer.onSubscribe(parent);
try {
//最终订阅到了这里,就是我们数据发射的源头,外部回调对象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
于是就这样,订阅动作从链子最底部传到了最顶部的Obserable。接着画风一转,订阅流程结束,开始事件发射流程:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//数据发射
e.onNext(1);
}
}).
而在发射器的onNext方法里面:
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//开始向下游传递事件了,这个Observer显然就是ObservableMap里面的代理Observer。
observer.onNext(t);
}
--->ObservableMap.onNext,事件已经从上一个Obserable传递到了ObservableMap
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//做完数据类型转换
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//继续调用下游onNext向下游发送事件,这里显然就是最终的Observer
actual.onNext(v);
}
总结:先从下往上订阅,再从上往下发送。
3.线程调度
- subcribeOn():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//创建一个代理Observer,同时又是一个Disposed开关
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//开关传递到下游
s.onSubscribe(parent);
//将线程调度返回一个Disposed开关,方便对线程进行控制管理
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
//这里好像并没有suorce.subcribe(s);
}
}
--->SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//订阅在这里,注意,是将subscrible操作放到了线程当中哦,想想后面哪些操作在这个线程中。
source.subscribe(parent);
}
}
}
--->scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
再到
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//这里由选择的线程决定,会获得不同的Worker,subscribeOn(Schedulers.io())
final Worker w = createWorker();
//先不管
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//将线程加入Disposed控制,以便在取消订阅时,能及时关闭线程
DisposeTask task = new DisposeTask(decoratedRun, w);
//任务开始执行
w.schedule(task, delay, unit);
return task;
}
--->DisposeTask
//就是个包装过后的任务,实现了Disposed接口,实现了开关管理的控制
static final class DisposeTask implements Runnable, Disposable {
//要实施的任务
final Runnable decoratedRun;
//线程工作
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
//运行完了必须切断
dispose();
runner = null;
}
}
//关闭线程
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
选中Worker,Ctrl+H查看他的继承树。随便找个具体看看。
看看NewThreadWorker关键源码:
//线程调度的最终方法
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包装任务
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//一般为null,忽略
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//加入线程池
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
//记得之前在创建worker时就将它加入了Disposed管理。这里控制了线程调度
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
再来看看:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
@Override
public void onNext(T t) {
//代码执行到这里,一定是在某个调度的线程当中,但是不一定就是在这次调度的线程,因为它不一定就是离顶部最近的subscribeOn,因为线程任务都会加入Disposed管理,因此这里不需要判断了
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
前面知道了整个事件链子是先从下往上订阅,再从上往下发射。source.subscribe(parent);这句代码发生在调度线程中。因此在后面的所有操作都是发生在了这个调度线程当总中,这也就解释了为什么多个subscribeOn()只有第一个有效,因为subscribeOn()是在订阅的链子上,所有的发射操作都是在订阅的后面,自然发射操作也就只受离顶部最近的subscribeOn的影响了。而下面的subscribeOn只是影响了一些订阅操作而已,但是这我们察觉不出来,并不关心。
- ObserveOn():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//这里好像没有RxJavaPlugins.onAssembly()那家伙了。。。bufferSize()是一个常量,缓冲池的大小
return observeOn(scheduler, false, bufferSize());
}
再看
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//又来了。。。
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
//保存上游Obserable
super(source);
//保存Scheduler
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//特殊调度器,暂不做考虑
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//直接创建Worker
Scheduler.Worker w = scheduler.createWorker();
//订阅
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
--->ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//缓冲队列,上游发射过来的事件都会先存到这里,然后在这里取事件发射给下游
SimpleQueue<T> queue;
Disposable s;
//存储异常
Throwable error;
//是否已完成
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
//判断是否终止
if (done) {
return;
}
//true
if (sourceMode != QueueDisposable.ASYNC) {
//将事件加入队列
queue.offer(t);
}
//调度线程,注意啊,onNext方法是在发射链子上的,因此可以想到,ObserveOn()影响发射过程,且只影响后面的发射操作
schedule();
}
void schedule() {
//原子操作,自增然后返回原值
if (getAndIncrement() == 0) {
//把自己当做任务,传了进去
worker.schedule(this);
}
}
}
--->run
@Override
public void run() {
//默认false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
--->drainNormal
void drainNormal() {
int missed = 1;
//事件缓冲队列
final SimpleQueue<T> q = queue;
//下游
final Observer<? super T> a = actual;
for (;;) {
//检查是否已终结
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//事件出列
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
//再次检查是否终结
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//将事件发射给下游
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
总结:subscribeOn在订阅链子上执行,observeOn在发射链子上执行,影响的操作
但是有些操作或方法比较特别点:doOnSubscribe()和onSubscribe()
doOnSubscribe是指在onSubscribe()发生之前调用。
看看ObserableCreate:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//在这里将Dispose开关传递下去,
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
一般的操作符也是规规矩矩的将这个Dispose开关一样的传下去,例如ObserableMap里面的代理Observer:
@SuppressWarnings("unchecked")
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable<T>)s;
}
if (beforeDownstream()) {
//直接传递
actual.onSubscribe(this);
afterDownstream();
}
}
}
但是,看看subscribeOn里面的代理Observer的代码:
@Override
public void onSubscribe(Disposable s) {
//没有直接传递dispose开关,只是对上游的开关做了设置
DisposableHelper.setOnce(this.s, s);
}
那么我们的下游需要的开关在哪里呢?
ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//自己创建了一个dispose传给了下游,并且在下面的线程调度之前执行
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,在订阅阶段,并且在线程调度之前执行。
因此我们可以得出结论:
如果有subscribeOn在doOnSubscribe()的下面,那么doOnSubscribe()和onSubscribe()都执行在下面最近的subscribeOn指定的线程里,否则执行在默认线程里面。ObserveOn不对doOnSubscribe()和onSubscribe()造成任何影响,因为前面说过,ObserveOn只对订阅之后的发射阶段可能造成影响。