能看到这篇文章,真的不会亏的,尤其对于不了解其原理的朋友们。
Part 1:使用观察者模式
RxJava 主要运用了观察者模式,只不过在观察者模式中,一个被观察者可以有多个观察者。在 RxJava 中是使用的是变种观察者模式,一个被观察者只有一个观察者。
既然是观察者模式,首先需要有个观察者和被观察者。
有个抽象的观察者接口 Observer,接口很简单,我们常见的四个方法。
public interface Observer<T> {
//接收消息 update
void onNext(T t);
// 建立关联时
void onSubscribe();
// 接收异常消息
void onError(Throwable e);
// 接收消息完成
void onComplete();
}
还有个抽象的被观察者类 ObservableSource,里面只有一个方法,就是用来订阅被观察者 Observer。
public interface ObservableSource<T> {
// 绑定 Observable 与 Observer, 订阅
void subscribeObserver(Observer<T> observer);
}
创建具体的被观察者 Observable,实现了 ObservableSource。到此,一个观察者模式的框架就完成了。
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public void subscribeObserver(Observer<T> observer) {
observer.onNext(msg);
}
}
到这里,我们看到源码中并没有直接使用 onNext,而是使用了模板方法,把具体的订阅逻辑交给子类处理。Observable 做下改进,抽象出一个 subscribeActual
方法。
一下。
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public void subscribeObserver(Observer<T> observer) {
// observer.onNext(msg); 不这样使用,而是把逻辑交给框架使用者
// 把功能留给不同的 Observable 处理
// map flatmap ……
subscribeActual(observer);
}
// 模板方法 把订阅过程的逻辑交给子类去做
protected abstract void subscribeActual(Observer<T> observer);
}
Part 2: 使用发射器发送消息
写到这里,还缺少一个发送消息的操作,也就是模式中的 pushMessage
。那么先打包一个发射器,在里面实现 observer.onNext 等操作。为了便于扩展,也将发射器定义成接口,可以扩展其实现类。
public interface Emitter<T> {
//接收消息 update
void onNext(T t);
// 接收异常消息
void onError(Throwable e);
// 接收消息完成
void onComplete();
}
现在已经有观察者和被观察者了,他们通过 subscribeActual
建立了绑定关系,现在也有发送消息的发射器,但Observable 和 Emitter 还没有建立联系。
现在要实现:Observable 使用 发射器发送消息。因此这里使用一个接口 ObservableOnSubscribe 来绑定发射器,里面只有一个方法就是绑定一个发射器,被观察者通过这个接口就实现了与发射器的绑定,同时观察者和发射器也实现了分离。
public interface ObservableOnSubscribe<T> {
void subscribe(Emitter<T> emitter);
}
Part 3: 具体的被观察者
到现在,基本的框架已经搭建差不多了,现在可以来写下被观察者的具体实现
- 是被观察者那就要继承 Observable
- 通过 ObservableOnSubscribe 绑定 被观察者和发射器,将 ObservableOnSubscribe 作为入参
- 订阅观察者的操作需要实现:首先订阅时触发下游的被订阅操作
observer.onSubscribe()
同时也绑定一个真实的发射器。 - 发射器那就要继承 Emitter,一旦发射消息,就调用观察者的 对应 API。
public class ObservableCreate<T> extends Observable<T> {
ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<T> observer) {
observer.onSubscribe();
CreateEmitter createEmitter = new CreateEmitter(observer);
source.subscribe(createEmitter);
// observable 与 emitter 通过 ObservableOnSubscribe 绑定
// emitter 与 observer 通过 observable 绑定
}
// 发射器实现类
static final class CreateEmitter<T> implements Emitter<T> {
Observer<T> observer;
public CreateEmitter(Observer<T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
现在这个壳子就搭好了,RxJava 在创建被观察者的时,是这样使用的 Observable.create()
,所以我们也在 Observable 中创建一个 create()
方法,返回 Observable 对象。
Observable.create(new ObservableOnSubscrib
@Override
public void subscribe(Emitter<Integer> emitter) {
emitter.onNext(1);
}
})
.subscribeObserver(new Observer<Integer>() {
@Override
public void onNext(Integer o) {
log("onNext: " + o);
}
@Override
public void onSubscribe() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Run 一下,结果必须是可以的 666
怎么样,RxJava 就是这么处理的,这里涉及到了一些类,看一下类图
左侧绿色框中用来发送消息,中间蓝色框是被观察者,后边是观察者,这三个部分是相对隔离的。
这三部分的关系其实很清晰
当 Emitter 有动作的时候,比如调用 onNext,就会触发 observer.onNext 的调用。
知道真相的我眼泪掉下来。
了解基本原理之后再去使用相关 API 就会少了很多问号了。