rxjava代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("有情况");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
if("有情况".equals(s)){
return 70;
}
return 10;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("rxjava","危险等级" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上面代码我们可以做个设定,比如被观察者是个间谍,他发现有情况,然后向观察者报信,但是如果直接报信太危险,于是乎他们密码本,把发现的情况转变成密码再给观察者,为了更方便看代码,我把rxjava的内部类给抽出来
ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("有情况");
}
};
Function<String,Integer> function = new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
if("有情况".equals(s)){
return 70;
}
return 10;
}
};
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("rxjava","危险等级" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
Observable.create(observableOnSubscribe).map(function).subscribe(observer);
角色
observableOnSubscribe :被观察者、间谍、有情况报信
function :密码本,负责把间谍信息转换成密码(这里把string转换成integer)
observer : 观察者,负责拿到密码
好,下面开始看代码
1、首先看Observable.create方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//就是判空
ObjectHelper.requireNonNull(source, "source is null");
//这个地方返回Observable本身
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
2、再看RxJavaPlugins.onAssembly方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
//这个地方onObservableAssembly我没有做过赋值,所以为空,所以原样返回source
return source;
}
3、那么就直接看1步骤里的new ObservableCreate<T>(source)),进入代码
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
这里发现就是做一个赋值操作,把我们创建的被观察者给了source
(其实1、2、3步骤照抄上一篇文章://www.greatytc.com/p/9e0a3ee4e0b3)
4、再看Observable的map方法
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//这里原样返回 new ObservableMap<T, R>(this, mapper)
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
首先看这里返回的是ObservableMap,这里的角色要分清楚,即Observable.create(observableOnSubscribe).map(function).subscribe(observer);中调用subscribe方法的不再是create的那个observable,而是这个new的ObservableMap,这里创建ObservableMap传的入参,第一个this就是发送String的Observal,mapper就是密码本,就是我们自己new的那个function ,负责把String转变成integer。
我们进入它的构造函数
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
再进入这个super(source)
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
我们发现,这个地方就是个赋值的操作,最后ObservableMap拥有了发送String数据的Observable的引用,同时还拥有了转换器(密码本也行,就是我们定义的那个function)的引用,自此,map的代码我们告一段落
5、我们再看Observable.create(observableOnSubscribe).map(function).subscribe(observer)中的subscribe方法,我们上面说了,因为map返回的是我们new的ObservableMap对象,所以这里调用subscribe方法的是这个ObservableMap,我们进入ObservableMap中的subscribe方法(其实还是进入了它的父类Observable的subscribe方法中)。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
从代码中我们看到,还是调用抽象方法subscribeActual,
6、这个时候再回到ObservableMap的subscribeActual方法
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
好,这个地方注意了,我们先看他们的角色
参数t:就是我们自己new的那个 Integer 的观察者
source:就是我们步骤4最后面赋值的那个source,就是Observable.create的那个Observable,它里面放的被观察者发送的数据是String
function:转换器(也就是我们自己说的那个密码本)
然后看这里的操作
source调用它的subscribe方法,直接放了一个new的MapObserver对象,我们进入它的构造方法
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
再看一下super(actual);
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
到这里为止,我们发现,我们自己new的那个 Integer 的观察者,赋值给了downstream,我们自己写的转换器(密码本、function)赋值给了mapper
好,以上相当于初始化工作都已经结束啦,下面再把发送数据的流程代码说一遍
7、这个时候,我们的间谍,用于发送String数据的被观察者发送了数据emitter.onNext("有情况")
我们进入onNext的方法里
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
看,其实还是调用了observer.onNext(t),我们需要清楚的是,这个observer是谁??我们看看这个observer是怎么来的
首先CreateEmitter的构造函数传进来的,那又是哪里创建的CreateEmitter对象呢,是在 ObservableCreate 的 subscribeActual 方法里,而哪里调用了这个 subscribeActual 方法呢?我们知道subscribeActual 是抽象函数,是在subscribe里调用的,那哪里调用了这个 subscribe方法呢?其实前面已经说过了,就在步骤6那里,不错,就是这个source.subscribe(new MapObserver<T, U>(t, function))
好了,饶了这么一大圈,其实emitter.onNext("有情况"),然后调用了observer.onNext(t),这个observer就是我们上面new的那个MapObserver
好,我们此时进入MapObserver的onNext方法
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
看关键代码
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.")
这里需要注意,t是间谍发送的“有情况”字符串
mapper.apply()就是我们自己定义的转换器(密码本,就是那个function)
然后这里就把这个String数据转换成了v,v就是我们观察者需要的integer呀
再看关键代码
downstream.onNext(v);
这里的downstream是谁??看步骤6的最后,这个downstream就是我们自己new的那个Integer的观察者呀,然后它就拿到信息了呀
一套流程打完收工!!