RxJava——目前最热门的响应式函数编程框架。
笔者也是初涉Rx,所以打算通过这篇文章来理解Rx的操作流程,加深自己对Rx的理解。
本文不涉及RxJava的入门使用,如有需有:
关于RxJava的入门推荐:抛物线大佬的精品——给 Android 开发者的 RxJava 详解
[笔者仍为Android初学者。如有解释错误的地方,欢迎评论区指正探讨]
本文主要根据RxJava2的源码解析整个流程。
引入
首先简单的看一下关于RxJava的一般使用:
前提:定义了一个login接口,返回值为 { isSuccess, UserInfo}
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //调用登录接口
.map(new Function<LoginApiBean, UserInfoBean>() {
@Override
protected UserInfoBean decode(LoginApiBean loginApiBean) {
//处理登录结果,返回UserInfo
if (loginApiBean.isSuccess()) {
return loginApiBean.getUserInfoBean();
} else {
throw new RequestFailException("获取网络请求失败");
}
}
})
.doOnNext(new Consumer<UserInfoBean>() { //保存登录结果UserInfo
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {
saveUserInfo(bean);
}
})
.subscribeOn(Schedulers.io()) //调度线程
.observeOn(AndroidSchedulers.mainThread()) //调度线程
.subscribe(new Consumer<UserInfoBean>() {
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整个请求成功,根据获取的UserInfo更新对应的View
showSuccessView(bean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//请求失败,显示对应的View
showFailView();
}
});
为了便于理解,上述逻辑没有对应的进行封装,简单的展示 RxJava 的几个重要流程。
按着代码的顺序我们理一下步骤:
首先是通过create方法,生成一个Observable对象,并传入一个ObservableOnSubscribe对象,在其回调方法中调用login接口返回LoginApiResult,并执行onNext
然后通过map方法将LoginApiResult转换成UserInfoBean
紧接着是通过doOnNext方法进行保存saveUserInfo操作
然后是线程的调度,分别通过subscribeOn和observeOn将上面提到的步骤都执行在IO线程,下面的步骤都执行在主(UI)线程中
最后是通过Consumer根据执行结果完成(成功或抛异常),执行对应的UI更新方法。
按着顺序,我们一步一步的跟进看看RxJava到底是如何实现这些操作的。
任务链的构建
入口类——Observable
既然要了解RxJava,那么比不可少的我们应该先来看看他的入口类,也就是Observable:
public abstract class Observable<T> implements ObservableSource<T> {
@Override //交由子类实现的出现方法
protected abstract void subscribeActual(Observer observer) ;
@Override //实现了ObservableSource的方法
public final void subscribe(Observer<? super T> observer) {
//省略一堆判空等处理
subscribeActual(observer);
}
省略了一堆静态方法之后,我们可以看到,Observable是一个抽象类,实现了ObservableSource接口,并留了subscribeActual这个抽象方法。
ObservableSource接口只定义了subscribe一个方法,可以看到这个方法做了一些基础判断之后直接跳转到子类的subscribeActual方法。
所以一个被观察者被subscribe的逻辑其实是交由Observable子类来实现的,每个不同的被观察者可以根据自己的需求实现 "被订阅" 后的操作
(贼拗口- -md,总觉得这里用subscribe这个命名很奇怪,还是setSubscriber好懂)
(换而言之,每个子类可以实现各自被setSubscriber后的动作)
Create
接下来是如何生成一个Obserable对象,我们看到create方法。
create方法便是Obserable其中一个关键的静态方法。
我们跟进看一下源码:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
首先第一句代码是对传入的对象进行判空,内部内部实现是如果传入null,会抛异常。
接着是生成一个ObservableCreate对象,然后将这个对象传入RxJavaPlugins进行组装。
RxJavaPlugins提供了一系列的Hook function,通过钩子函数这种方法对RxJava的标准操作进行加工,当我们没有进行配置时,默认是直接返回原来的对象,也就是返回ObservableCreate对象。
(为了方便讲解,后续将直接忽视判空和RxJavaPlugins的代码)
分析后可以看到,这里其实直接返回一个ObservableCreate对象。
我们跟进去看一下这个对象的一些基本信息:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
可以简单的看到,这个类继承了Observalbe类,并存储了我们刚才传进去的ObservableOnSubscribe对象。当然这个类也实现了刚才说的subscribeActual方法,我们待会再看。
map
往下,我们调用了Obserable的map方法:
我们跟进:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
可以看到其实是返回了一个ObservableMap对象,接受了两个参数,一个是this,在这里指的也就是刚才的ObservableCreate ,还有一个Function对象,
我们再跟进去看一下ObservableMap的基础信息:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
可以看到其实构造方法和刚才的ObservableCreate一样,将传入的对象进行了存储。
不过可以发现- -这个类并不是继承自Observable,而是AbstractObservableWithUpstream,我们再跟进看看:
// Base class for operators with a source consumable.
// 带有source的operator的基类
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
可以看到这个父类其实继承了Observable,看到官方的注释可以知道,这个类是所有接受上一级输入的操作符(operator 如map)的基类,这里的逻辑并复杂,其实只是简单的封装了一下上一级的输入source和输出先下一级的数据。
分析之后可以看到,调用了map方法其实也是返回了一个Observable对象。
doOnNext
接着往下是doOnNext,- -看到这里可以猜测也是简单的返回一个Observable对象吧。。
不管怎么说,先进入源码看一看:
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
return new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate);
}
可以看到跳转到doOnEach方法,传入的参数除了我们传进来的Consumer之外,其实都是传了空实现的Consumer对象。
可以看到- -真的是简单的返回一个Observable对象。
老规矩,先看一下ObservableDoOnEach的基础信息:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
同样的对所有信息进行了保存。可以看到这个类也是继承了AbstractObservableWithUpstream,可以接受上一层的输入,并向下一层输出数据。
subscribeOn & observeOn
= =接着是线程调度,其实不看也猜得出。。。这里也是直接返回对应的Observable对象。
首先看一下subscribeOn:
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>(this, scheduler);
}
再看一下ObserveOn:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
可以看到,这里分别返回了0ObservableSubscribeOn和ObservableObserveOn对象。照旧我们先看看这两个个类的基础信息:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
怎么样- -一路看到这里,也能知道他这里的基础信息是什么了吧。
再看看另外一个:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
同样的保存了传进去的基础信息,我们发现其中共有的都保存了Scheduler对象,我们先稍微看一下Scheduler:
public abstract class Scheduler {
@NonNull
public abstract Worker createWorker();
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
w.schedule(new Runnable() {
@Override
public void run() {
try {
run.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
可以看到,Scheduler对外暴露了scheduleDirect方法,这个方法通过调用抽象方法createWorker得到worker对象,然后调用worker对象的schedule方法,执行runnable。
看到这里大致就能猜出Scheduler对应的逻辑啦,内部的worker对象维护自己的线程池,然后每次执行schedule方法时把runnable对象提交到线程池中。先这样理解,最后我们再深入一下。
subscribe
终于来到最后这个方法了- -md。。。前面全都是直接返回对象,难道所有逻辑都在最后实现吗?- -进去看一下先。
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
因为subscribe的重载方法很多- -这里只挑最终的两个,其中LambdaObserver其实就是把传进来的Consumer包装成一个Observer(看清不是Observable!Observer是订阅者),内部就是简单的在各个阶段调用我们传进去的Consumer的accpet方法。
Observer其实只是个接口,里面定义了接收到被观察者(Observable)发出的事件时,订阅者(Observer)应该执行的方法:
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
接着就是直接调用了subscribeActual方法。刚才我们在上述的步骤也说了,这个方法是Observable的抽象方法。
其实到这里我们可以看出,整个步骤通过对象的嵌套,形成了一条完整的链。
逆向逐级订阅
跟踪subscribe
按照我们刚才的案例,到最后subscribe
方法的调用关系应该是这样的:
ObservableObserveOn.subscribe(LambdaObserver)。
所以我们跟进看一下ObservableObserveOn.subscribe方法的实现:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//省略部分代码
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
可以看到,这里通过Scheduler创建了一个wroker对象,然后调用了source(上一级)的subscribe方法,并通过已有的observer对象生成一个ObserveOnObserver(注意是Observer)对象作为传参。
看到这里也大概知道套路了= =和刚才一样,会一直沿着整条链返回,一个一个订阅对应Observable并生成新的嵌套的Observer。
我们依旧跟着看看,ObservableObserveOn.subscribe之后是ObservableSubscribeOn.subscribe:
@Override
public void subscribeActual(final Observer<? super T> s) {
//将上一级传进来的订阅者包装为线程安全的原子变量
//SubscribeOnObserver只是简单的包装,这里不展开
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//先在当前线程执行订阅者的onSubscribe方法
s.onSubscribe(parent);
//然后在指定的线程中执行source(上一级)的subscribe
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
根据我们最开始的业务逻辑,我们这里的scheduler应该对应IO线程,也就是说往下执行的subscribe操作都是执行再IO线程中的。(现在是逆向遍历刚才建立的observable链。)
紧接着ObservableDoOnEach.subscribe:
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new ObservableDoOnEach.DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}
可以看到,这里也是封装了我们传进去的Consumer参数,直接调用了上一级的source.subscribe方法。
= =那么就接着往下看。应该来到了ObservableMap.subscribe方法了。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new ObservableMap.MapObserver<T, U>(t, function));
}
可以看到也是封装了我们传进去的Function参数,然后调用上一级source.subscribe,也就是ObservableCreate.subscribe,也就到了链的一开始。
我们跟进看看ObservableCreate.subscribe:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//首先是创建了CreateEmitter对象,这个类有没有觉得特别眼熟- -
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter<T>(observer);
//然后调用了订阅者observer的onSubscribe方法
//这里的订阅者来自刚才的map操作
observer.onSubscribe(parent);
try {
//调用上一级source的subscribe方法
//显然- -没有上一级了,这里的source就是我们一开始创建的observer对象,调用的subscribe方法也就是我们调用的login()方法的地方
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//捕获异常
parent.onError(ex);
}
}
终于回到了第一级,可以看到,一样的封装了observer订阅者,(这里的订阅者来自map操作),然后调用了source.subscribe方法,(- -看到这里不知道你们还记不记得source来自哪- -看下面代码)这个source来自我们一开始调用Observable.create时传进来的参数,而subscribe方法就是我们一开始执行login()方法的地方。
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //调用登录接口
……省略
也就是说,在刚才所有的逆序遍历过程中,下一级的Observable会生成的对应的Observer订阅上一级的source。
执行任务链
接下来就是激动人心的执行我们定义的任务了。(md终于- -)
= =在分析前,先重新看一下我们刚才的业务逻辑:
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //调用登录接口
.map(new Function<LoginApiBean, UserInfoBean>() {
@Override
protected UserInfoBean decode(LoginApiBean loginApiBean) {
//处理登录结果,返回UserInfo
if (loginApiBean.isSuccess()) {
return loginApiBean.getUserInfoBean();
} else {
throw new RequestFailException("获取网络请求失败");
}
}
})
.doOnNext(new Consumer<UserInfoBean>() { //保存登录结果UserInfo
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {
saveUserInfo(bean);
}
})
.subscribeOn(Schedulers.io()) //调度线程
.observeOn(AndroidSchedulers.mainThread()) //调度线程
.subscribe(new Consumer<UserInfoBean>() {
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整个请求成功,根据获取的UserInfo更新对应的View
showSuccessView(bean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//请求失败,显示对应的View
showFailView();
}
});
一趟创建,一趟逆向订阅,我们又回到了最开始的地方。我们刚才分析到,ObservableCreate会执行我们定义的方法。
所以就来到了这段代码:
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
行- -就是login,就是调用ObservableEmitter.onNext方法。我们跟进:
public final class ObservableCreate<T> extends Observable<T> {
protected void subscribeActual(Observer<? super T> observer) {
//可以看到,这里传入的Observer参数是来自下一级的订阅者
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//省略一堆- -
}
//省略继承关系
static final class CreateEmitter<T> {
//保存订阅者
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//省略判空
if (!isDisposed()) {
//调用订阅者的onNext方法
observer.onNext(t);
}
}
}
}
可以看到吧,简单的执行一些判断后,就调用了订阅者的onNext方法,而通过上面的代码,我们可以看到observer来自于subscribe时调用构造函数的传参,而通过上述的分析,我们知道,这里的订阅者来自下一级,也就是map操作生成的订阅者。这里很自然的进入了map操作。
(后面不再贴出observer的来源)
我们再往下看到MapObserver:
@Override
public void onNext(T t) {
//省略一些细节上的判断
U v;
//mapper就是我们new 的function对象
v = mapper.apply(t)
actual.onNext(v);
}
可以看到,这里调用了我们定义的apply方法,获得了新的对象,然后调用了下一级订阅者的onNext方法。
嘿嘿嘿看到这里大概就知道执行任务链的套路了。嵌套的调用下一级的onNext方法。
我们先继续往下看,来到了DoOnEachObserver中:
@Override
public void onNext(T t) {
onNext.accept(t);
actual.onNext(t);
}
bingo!基本上和我们猜想的一样~accept方法就是我们定义的doOnNext的操作啊~
再接着往下来到SubscribeOnObserver:
@Override
public void onNext(T t) {
actual.onNext(t);
}
这货更直接。。直接就调过去了- -(这里涉及到Scheduler的线程调度,后面再补充)
快到重点了,再看一下ObserveOnObserver:
@Override
public void onNext(T t) {
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
擦,这货逻辑贼复杂。。毕竟在这里进行了线程调度。暂时不深入。
只需要知道:这货把任务提交给了Scheduler中的worker。等到任务结束获取到结果后会调用下一级的onNext方法。
强行来到最后一层了~
这里的Observer就是我们调用subscribe时传入的Observer啦~
那就是调用:
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整个请求成功,根据获取的UserInfo更新对应的View
showSuccessView(bean);
}
行了- -走完整个流程了。。相信看到这里就能大致理解Rx的流程怎么走了(也没有想象的那么复杂嘛~)
在刚才的遍历订阅后,每一步操作都会通知对应的Observer,从而完成整调任务链。
总结
总结一下:
- 创建任务链,每一步都会返回对应的Observable对象。
- 逆向逐级订阅。每一步都会生成对应的Observer对上一步生成的Observable进行订阅
- 执行任务链。执行任务链之后,每一步都会通知对应的Observer,从而完成整调任务链。
嘿嘿嘿,感觉整个流程也没有想想中的难~对Rx的理解又更上一层了。
= =呃。写到这发现篇幅略长。Scheduler的解析还是另起一篇文章吧,挖个坑先。
[笔者仍为Android初学者。如有解释错误的地方,欢迎评论区指正探讨]