基于 RxJava 2.1.6 RxJava github 地址
为什么要用 RxJava
简洁!简洁!简洁!(重要的事情说三遍)
RxJava 最大的优点就是简洁。简洁的代码能让人心旷神怡,减少 bug 。
RxJava 是一种新的编程模式 响应式编程
响应式编程是一种基于异步数据流概念的编程模式。
数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
以上是 RxJava Essentials 中文翻译版 对响应式编程的介绍。使用 RxJava 可以让我们在 java 语言中体验什么是响应式编程。响应式编程有两个重要概念
1. 基于异步
2. 数据流
如何使用 RxJava
RxJava 的设计理念基于 观察者模式
在 RxJava 中首先明白有两个对象观察者和被观察者。
观察者和被观察者可以存在不同的线程之中,所以存在观测线程和被观测线程*
观察者和被观察者通过 subscribe 发生『订阅』关系。
RxJava 提供一些列的链式调用,使用起来如下:
Observable
.create(...)
.observeOn(...)
.subscribeOn(...)
.subscribe(...)
subscribe() 方法为链式调用的最后一层,create() 和 subscribe() 方式之前可以任意设置其他操作。
上面提到『响应式编程』中的数据流就像是一条河。
create() 方法可以比喻为河流的『上游发源地』,subscribe() 则为河流的『入海口』。
在这两个方法之间我们可以添加观察、过滤等操作。
在链式调用中增加一些数据处理
Observable
.create(...)
.observeOn(...)
.subscribeOn(...)
.map(...)
.filter(...)
.subscribe(...)
在 RxJava2 中提供一系列可观测对象(也就是上面链式调用的 Observable 等同功能)
io.reactivex.Flowable
io.reactivex.Observable
io.reactivex.Single
io.reactivex.Completable
io.reactivex.Maybe
这里我们写一个例子
Observable.create((ObservableOnSubscribe<String>) e -> {
for (int i = 0; i < 5; i++) {
e.onNext(i + "");
}
})
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.map(s -> {
System.out.println("map:" + s);
return s + "_map";
})
.filter(o -> {
System.out.println("flat:" + o);
if (o.compareTo("3") < 0) {
return true;
}
return false;
})
.subscribe(o -> System.out.println("subscribe:" + o));
输出结果如下:
map:0
flat:0_map
subscribe:0_map
map:1
flat:1_map
subscribe:1_map
map:2
flat:2_map
subscribe:2_map
map:3
flat:3_map
map:4
flat:4_map
上面的例子中,我们在被观察者中发射了 5 个数据源,观察者和被观察着都在同一个线程中,通过 map 对象给每个发射的对象拼接一个 『_map』字符串,通过 filter 过滤了比『3』字符串小的对象。
所以最终在观察者中接收到了 3 次消息。
源码分析 RxJava 中的核心代码
订阅关系的产生
创建被观察着
『被观测者』是事件产生的一方,创建方式也有很多种。这里列举一下 Observable 创建的方式
-
通过 create() 方法创建
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
-
通过 just() 方法创建
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }
-
通过 fromArray() 方法创建
public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }
其中 ObservableFromArray、ObservableJust、ObservableCreate 都是 Observable 的子类,而 Observable 本身是一个抽象。
这些子类主要实现 Observe 的抽象方法
protected abstract void subscribeActual(Observer<? super T> observer);
再看一下 RxJavaPlugins.onAssembly() 方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这里需要说明一下 RxJavaPlugins.onAssembly()是一个 Hock,如果不做任何 hock 处理,RxJavaPlugins.onAssembly()会直接返回传入的对象。onObservableAssembly 静态成员变量为 null
我们用 ObservableCreate
举例
ObservableCreate 的构造方法需要传入一个 ObservableOnSubscribe 对象
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
并重载 Observable 的 subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual()
方法传入了一个 Observer
对象并且包装到 CreateEmitter
对象中,然后调用
observer.onSubscribe(parent);
和 source.subscribe(parent);
创建观察者
观察者比较简单,需要实现 4 个方法
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
订阅
通常情况下,我们不需要重载 Observer 的每一个方法,RxJava 内部提供了另一个 LambdaObserver 把 Observer 的四个方法拆分为 4 个部分。
Observable.subscribe() 可以只传入一个 Consumer 对象。
内部会把 Consumer 包裹在 LambdaObserver 中,并且返回 LambdaObserver
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
然后调用接收 Observer 的 subscribe() 方法
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} ……
}
这里调用了 Observable 的 subscribeActual(observer)
方法。
这里就完成了 观察者
和 被观察着
之间的订阅关系
如下一段代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("subscribe:" + i);
e.onNext(i + "");
}
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept:" + s);
}
});
调用的时序图如下
线程调度原理分析
上部分分析的订阅关系
的创建,都是在当前线程之中。RxJava 可以指定 观察线程
和 观察者线程
observeOn 原理分析
Observable 的 observeOn 方法有三个
- Observable<T> observeOn(Scheduler scheduler)
- Observable<T> observeOn(Scheduler scheduler, boolean delayError)
- Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
这三个方法中,前两个方法都会再次调用第三个方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
我们看到这里创建了一个 ObservableObserveOn
对象,ObservableObserveOn
是 Observable
的子类。
我们看下 ObservableObserveOn
的 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里我们假设传入的 Scheduler
是 Schedulers.io()
进一步跟踪分析会发现 Schedulers.io()
返回的是 IoScheduler
所以会走上面代码的 else
分支。
先忽略 scheduler.createWorker()
过程,先看下 ObserveOnObserver
这里的 source
是 ObservableCreate
,而 source.subscribe()
会调用 ObservableCreate.subscribeActual(observer)
然后调用 ObserveOnObserver.onSubscribe()
方法
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
……
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
这里传入的 Disposable 是 CreateEmitter
对象,所以不会走 if
分支。
然后创建了一个 SpscLinkedArrayQueue 对象,
紧接着执行 actual.onSubscribe()
也就是 LambdaObserver.onSubscribe()
后面应该执行的是 ObserveOnObserver.onNext()
方法
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
看到会把 onNext(T t)
传入参数放入队列之中,然后执行 schedule
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker.schedule()
接收的是一个 Runnable 对象,所以我们从这里可以看出 Observer 的 onNext()
、onComplete()
、onError()
等方法都是在线程之中执行。
接下来我们看下线程的创建
从 ObservableObserveOn
的 subscribeActual()
方法中的
Scheduler.Worker w = scheduler.createWorker();
Schedulers.io()
的跟踪过程比较简单,最终会得到一个 IoScheduler
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
直接看 createWorker()
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
再看 EventLoopWorker 的构造函数
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
继续看下 CachedWorkerPool 的构造方法
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
终于我们找到了线程池相关的代码
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
pool.get()
方法
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
这里返回了一个 ThreadWorker
看下 ThreadWorker
的创建过程
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
看下父类的构造函数
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
再跟下去
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
这里又出现了一个线程池
下面开始看 worker.schedule(this)
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
继续跟踪下去
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
终于我们找到了执行线程的方法
总结一下真个流程
subscribeOn 流程分析
subscribeOn
方法只有一个
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
和 observeOn 类似,这里把 Observable 包装到 ObservableSubscribeOn 对象中。
直接看subscribeActual
方法
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里把 Observe 包裹在 SubscribeOnObserver 中,并执行
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
这只设置了 Disposable 只能被执行一次。
重点看下 scheduler.scheduleDirect(new SubscribeTask(parent))
先看一些 SubscribeTask 类
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这里看出 SubscribeTask 也是一个 Runnable 在 run 方法中执行 source.subscribe(parent),
所以 SubscribeTask 是观察者线程的关键
继续依照 IoScheduler
为例
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这里和 subscribeOn
原理一样了,通过 Worker
对象把观察者的行为设置在线程之中。
操作符原理分析
RxJava 里面有很多操作符,这里找一个 map 操作进行分析。其他更复杂的操作不一一分析。
先看一个例子
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("subscribe:" + i);
e.onNext(i + "");
}
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("map:" + s);
return s + "_map";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept:" + s);
}
});
输出结果如下:
subscribe:0
map:0
accept:0_map
subscribe:1
map:1
accept:1_map
subscribe:2
map:2
accept:2_map
subscribe:3
map:3
accept:3_map
subscribe:4
map:4
accept:4_map
可以看 ObservableOnSubscribe
里面每次发送的数据都会到 Function.apply()
方法进行『过滤』,把每个发送的 String
转换一下后再发送给 Consumer
看下 map()
方法
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
根据上面的经验,我们直接看 ObservableMap
对象的 subscribeActual()
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
然后继续看 MapObserver
对象的 onNext
,其中 function 就是我们在 map() 方法中传入的 Function 对象。
先看构造函数
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
在看 onNext()
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
可以看出,在执行 actual.onNext(v)
之前,会执行 mapper.apply(t)
从而完成转换。
其他更复杂的操作符,基本都是在各种特定的 XXXObserver 中的 onNext(T t)
方法中做特殊处理。