最简单的观察者列车
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("邦");
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
观察subscribe()得知 大体流程:
1、会立即调用onStart()方法,在其它操作之前调用
subscriber.onStart();
2、之后它喜欢用SafeSubscriber吧subscriber包起来(装饰模式)
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
3、包起来后,就开始调用observable的call()方法启动整个列车了
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//可以忽略那个hook,至今没发现hook中有什么实际的代码,方法都只是返回传入的参数而已
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {//hook中仅仅返回了参数
return onSubscribe;
}
4、而我们在call()中操作的subscribe实际上是装饰者SafeSubscriber。原因是传入的参数subscriber就是包好的SafeSubscriber。
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
5、但其实我们在call()中调用的SafeSubscriber.onNext()方法会直接调用SafeSubscriber内部被包起来的subscriber的onNext()方法
@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
6、结果因为这个被包起来的subscriber方法是我们写的订阅者,于是订阅者的onNext()触发了 PS: 所以仅仅是包起来,并没有其它操作
7、综上所述 调用subscribe()之前都是准备阶段,各种包裹,存储变量。一旦调用subscribe(),整个列车就启动了。
最简单的异常处理:
1、并不是全程try包起来异常处理的。
2、第一个异常检测是在subscribe()方法开始时判断订阅者与被订阅者是否为null,抛出“你是不是故意找茬”的异常,这个检测甚至在调用onStart()之前。
3、值得一提的是onStart()并没有被try包裹起来。
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();
4、有try块包裹了列车的启动方法call()。处理的方式是 (1)手动检测抛出致命错误(这个操作挺频繁) -> (2)传递Throwable给subscriber的onError() PS: 在检测致命错误后其实还会检测是否订阅了,但因为一定是(已订阅),所以没区别(因为根本没初始化“是否订阅”这个变量)
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);//(1)手动检测抛出致命错误
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
try {
subscriber.onError(hook.onSubscribeError(e));//(2)传递Throwable给subscriber的onError()
} catch (Throwable e2) {//(3)onError都出错了、抛出 “啊,完蛋啦”
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(r);
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
5、注意这里的onError()方法,它并不是我们写的onError(),而是爱包装的SafeSubscriber的onError(),在此方法内有一个唯一的标识用于让此方法只会被调用一次。
SafeSubscriber的onError()只做了一个“默认异常处理(其实就是什么都不做)”就执行了我们写的onError()来解决异常。不管我们的onError()执行成功了,还是抛出异常了,又或是根本没写onError(),它都会unsubscribe()来取消订阅。 PS“是否订阅”变量终于改变了
unsubscribe()还做了另外的操作,但这里没有看到。 PS: 这就是SafeSubscriber(安全订阅者),它代理了对subscribe的操作,当出异常时执行额外的代码。 这可能是RxJava的秘密
6、如果subscriber.onError()都报错了、就只会检测抛出致命错误后抛出错误 “你的onError()抛异常啦!,异常为 $%# ” PS: 连这个异常都包起来了
PS: 你可能认为我漏掉了onCompleted(),但这个方法无论是运行成功,还是因失败抛出异常,它都没有被调用。
最简单的泛型限定:
1、这里被限制的类型只有两处,(1)、create(OnSubscribe<T>) ; (2)、subscribe(Subscribe<T>)
它限制泛型的秘密在create()方法中,create()内创建了Observable对象,当光标选中Observable构造方法里的泛型T时,整个滚动条都绿了!
Observable拥有的泛型只有一个<T>,在构造时实现了<T>的类型,又在subscribe()中限定了<T>,导致subscribe()的参数泛型也必须一致了。
PS: subscribe()中机智的使用了<? super T>,这是为数不多的泛型父类限定,理由也很简单(父类引用子类)
<head>添加map运算符</head>
瞬间设计模式的难度以几何的倍数上升,为了清晰直观的看源码,我仿写了它的代码。
public class MyRxJava {
{//主体调用部分
MyObservable.create(new MyOnSubscribe() {
@Override
public void call(MySubscriber s) {
s.onNext();
}
}).subscribe(new MySubscriber() {
@Override
void onStart() {
}
@Override
void onNext() {
}
@Override
void onCompleted() {
}
});
}
}
class MyObservable {//RxJava的操作主体,Observable
private final MyOnSubscribe onSubscribe;
public MyObservable(MyOnSubscribe subscribe) {
this.onSubscribe = subscribe;
}
public static MyObservable create(MyOnSubscribe subscribe) {
return new MyObservable(subscribe);
}
public final MySubscriber subscribe(MySubscriber subscriber) {
subscriber.onStart();
this.onSubscribe.call(subscriber);
return subscriber;
}
}
interface MyOnSubscribe {//create时使用,被订阅者
void call(MySubscriber s);
}
abstract class MySubscriber {//订阅时使用,订阅者
private boolean isUnsubscribed;//是否被取消订阅(目前没用)
abstract void onStart();
abstract void onNext();
abstract void onCompleted();
public void unsubscribe() {
isUnsubscribed = false;
}
public boolean isUnsubscribed() {
return isUnsubscribed;
}
}