** 会看(我在这呢,X_X!!!) =》会用** =》 会说 =》会写 =》会模仿 =》**能new **
说明:本文章是依据 RxJava 1.x (1.2.1) 源码分析学习的。
这是开头
本文主要看源码分析一个RxJava最简单的使用示例。一步俩步,摩擦......
正文在这
首先,写个最简单的RxJava使用示例,如下:
//创建一个Observable 的对象
Observable<String> mObservable = Observable.create(
new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello world!");
subscriber.onCompleted();
}
});
//创建一个Subscriber对象
Subscriber<String> mSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
//完成
}
@Override
public void onError(Throwable e) {
//异常
}
@Override
public void onNext(String s) {
//下一步
System.out.print(s);
}
};
//关联 Observable 和 Subscriber
mObservable.subscribe(mSubscriber);
我们就根据上面最简单的使用示例,按下面步骤来学习一下RxJava的源码:
- Observable(被观察者) 的创建
- Observer(观察者)的创建
- subscribe() 绑定
开始喽:
-
1.Observable(被观察者) 的创建
从 Observable.create
入手
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
//.....
//相关代码:RxJavaHooks.onCreate(f)
//.....
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
create
创建了一个Observable对象,并将OnSubscribe f
参数赋值给了Observable 对象的成员变量 onSubscribe
。当然,这中间调用了RxJavaHooks.onCreate(f)
,我们来看下它的源码。
/**
* Hook to call when an Observable is created.
* @param <T> the value type
* @param onSubscribe the original OnSubscribe logic
* @return the original or replacement OnSubscribe instance
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
这里其实也就是返回了 onSubscribe
本身,RxJavaHooks是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码,大家这样理解就行了。
Ok,上面说了这么久 Observable.OnSubscribe ,那这个 Observable.OnSubscribe 主要是干嘛的呢?,,看源码:
/**
* Invoked when Observable.subscribe is called.
* @param <T> the output value type
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}.
//Action1
public interface Action1<T> extends Action {
void call(T t);
}
可以看到 Observable.OnSubscribe
是一个拥有 call
方法的接口类而已。不过,注意到 Observable.subscribe
(后面讲)被调用时,该接口方法会被调用。
好的,总结就是create()方法创建了一个Observable,且给Observable中OnSubscribe变量进行赋值。
-
2.Observer(观察者)的创建
在文中,为了方便,所有的观察者(Observer)我将用 Subscriber(是实现Observer的接口的抽象类) 来代替,这个会在下面 subscribe()
讲解中说明。先看下,Subscriber 的构造:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
...省略其他代码..
protected Subscriber() { this(null, false);}
protected Subscriber(Subscriber<?> subscriber) {
this(subscriber, true);
}
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
...省略其他代码...
}
Subscriber
是一个抽象类,主要实现了 Observer
和Subscription
接口类。
public interface Observer<T> {
//提示观察者,当被观察者的所有事件已经结束了。
//与 onError 互斥,
void onCompleted();
//提示观察者,当有一个异常发生时。
//与 onCompleted互斥,
void onError(Throwable e);
/** 为观察者提供了一个新的项目观察
* 这个方法可能被调用 0 ~ N 次。
* 当 onCompleted 或 onError 被调用后,onNext 不会在被调用
*/
void onNext(T t);
}
.....
public interface Subscription {
//观察者与被观察者解绑,不再对它进行观察
void unsubscribe();
//检查是否已经解绑
boolean isUnsubscribed();
}
上面的代码注释已经很清楚了写明了 Subscriber 的相关方法作用。
3.subscribe() 绑定
subscribe()
实现 被观察者Observable 和 观察者**Subscriber ** 的绑定关系,也是代码开始执行的地方。现在来看下这个方法的源码,我们先找到 subscribe(final Observer<? super T> observer)
看下,如下:
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new ObserverSubscriber<T>(observer));
}
.....
//再查看下 ObserverSubscriber这个类
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;
public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
}
从上面代码中,我们看到在 subscribe()中最终就是将 Observer 转成了 Subscriber 进行后续代码执行的。这也就说明了之前的用Subsciber代替Observer的原因。
好的,我们现在继续查看 subscribe()方法的源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
....省略无关代码...
subscriber.onStart();
...省略无关代码.....
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
}
subscriber.onStart();
(观察者Subscriber的onStart 方法可以做些预先操作,比如发起请求前,显示进度条等),最主要的是 执行了这一句hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
,ok,我们先看下 hook.onSubscribeStart
的实现:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
竟然,就是直接返回了 onSubscribe
这个对象,什么事也没干,而我们知道这个 onSubscribe
就是我们创建 Observable时传进入的参数,所以 subscribe()
简化如下
//subscriber 就是我们的观察者
onSubscribe.call(subscriber);
然后,就是在 call() 方法中执行 Subscriber的 onNext(),onCompleted(),或 onError()相关方法了。
Observable<String> mObservable = Observable.create(
new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
//调用 subscriber 的相关方法
subscriber.onNext("Hello world!");
subscriber.onCompleted();
}
});
我们来画一个流程图,如下:
结合图,我们整理下思路:
- 首先,通过 create() 方法创建ObservableA对象,并赋值其变量OnSubscribeA。
- 创建观察者对象Subscriber。
- 然后,通过 subscribe() 绑定 ObservableA 和 Subsciber,并调用 OnSubcribeA.call()。
- 最后,就是在 call()方法中,执行调用 Subscriber 的 onNext / onCompleted / onError 的对应方法。
尾巴呢?
到这里,我们算是把RxJava最简单的部分弄清楚了,如果文中有什么差错或者不懂的地方,欢迎大家指正与交流!
接下来呢,我们将向高级部分冲刺,比如 各种操作符、lift的原理、Scheduler线程控制的原理 等等,我们将一一探索其源码的奥秘,(●'◡'●)