操作符:
1.map(new Func1(...)) :用于变换Observable,返回新的Observable
调度器:
1.调用observeOn(...)后之后的操作符map等会立马改变成在observeOn所设置的线程运行
observeOn是设置订阅者的运行线程,即
2.调用subscribeOn(...)后,到调用observeOn()为止,中间的操作符均在subscribeOn设置的线程运行。(subscribeOn不管在哪调用,都和开头调用一样效果,,而且调用多次,只有第一次有用)。
3.默认情况下, Observable.doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
原理
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
首先,调用Observable.create(```),其实就是调用构造函数,将 onSubscribe保存为成员变量,调用subscribe(Subscriber subscriber)其实就是调用onSubscribe.call(subscriber)。
这个call()函数就是我们自己重写的那个函数。
而调用lift()操作符后,新返回一个Observable。
这里,我们梳理一下代码流程:
- 首先,调用Observable.create(···),生成Observable对象,记为ObservableA,它里面又一个成员变量OnSubscribe对象,我们记为OnSubscribeA,OnSubscribeA里面有我们重写的call()方法。
- 接下来,我们调用lift()操作符进行变换,又生成一个Observable对象,这个对象我们记为ObservableB,ObservableB对象是由lift()操作符生成的,与我们无关,ObservableB的成员变量OnSubscribe对象也是新生成的,并且是由lift()自己生成的,与我们无关,这个OnSubscribe对象我们记为OnSubscribeB,OnSubscribeB的call()函数也是lift()生成的,与我们无关。此步骤结束后返回ObservableB
- 然后,我们调用subscribe(···)函数,传入Subscriber对象,调用ObservableB.OnSubscribeB.call(subscriber)。ok,接下来我们就看看
ObservableB.OnSubscribeB.call(···)都干啥了。
ObservableB.OnSubscribeB.call(subscriber)分析:
- 首先调用operator.call(subscriber),这个operator返回一个新的Subscriber,这步很关键,就是这步实现转换,并且是逆向转换。而且新生成的Subscriber包含旧的(这个旧的是指我们自己继承重写的Subscriber,就是subscribe(subscriber)中的参数subscriber),这个新生成的Subscriber用来接收最初ObservableA发出的数据,即作为ObservableA.OnSubscribeA(···)函数的参数。
- 接下来调用新的Subscriber.onStart();
- 然后调用ObservableA.OnSubscribeA.call(新的Subscriber),这个call()就是我们自己重写的那个。