Rxjava2.2.1(2) create-map-subscribe 事件变换-源码分析

rxjava代码

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("有情况");
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) throws Exception {
        if("有情况".equals(s)){
            return 70;
        }
        return 10;
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.e("rxjava","危险等级" + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

上面代码我们可以做个设定,比如被观察者是个间谍,他发现有情况,然后向观察者报信,但是如果直接报信太危险,于是乎他们密码本,把发现的情况转变成密码再给观察者,为了更方便看代码,我把rxjava的内部类给抽出来

ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("有情况");
    }
};
Function<String,Integer> function = new Function<String, Integer>() {
    @Override
    public Integer apply(String s) throws Exception {
        if("有情况".equals(s)){
            return 70;
        }
        return 10;
    }
};
Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.e("rxjava","危险等级" + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};
Observable.create(observableOnSubscribe).map(function).subscribe(observer);

角色

observableOnSubscribe :被观察者、间谍、有情况报信
function :密码本,负责把间谍信息转换成密码(这里把string转换成integer)
observer : 观察者,负责拿到密码

好,下面开始看代码

1、首先看Observable.create方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //就是判空
    ObjectHelper.requireNonNull(source, "source is null");
    //这个地方返回Observable本身
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

2、再看RxJavaPlugins.onAssembly方法

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    //这个地方onObservableAssembly我没有做过赋值,所以为空,所以原样返回source
    return source;
}

3、那么就直接看1步骤里的new ObservableCreate<T>(source)),进入代码

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

这里发现就是做一个赋值操作,把我们创建的被观察者给了source
(其实1、2、3步骤照抄上一篇文章://www.greatytc.com/p/9e0a3ee4e0b3
4、再看Observable的map方法

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //这里原样返回 new ObservableMap<T, R>(this, mapper)
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

首先看这里返回的是ObservableMap,这里的角色要分清楚,即Observable.create(observableOnSubscribe).map(function).subscribe(observer);中调用subscribe方法的不再是create的那个observable,而是这个new的ObservableMap,这里创建ObservableMap传的入参,第一个this就是发送String的Observal,mapper就是密码本,就是我们自己new的那个function ,负责把String转变成integer。
我们进入它的构造函数

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);
    this.function = function;
}

再进入这个super(source)

AbstractObservableWithUpstream(ObservableSource<T> source) {
    this.source = source;
}

我们发现,这个地方就是个赋值的操作,最后ObservableMap拥有了发送String数据的Observable的引用,同时还拥有了转换器(密码本也行,就是我们定义的那个function)的引用,自此,map的代码我们告一段落

5、我们再看Observable.create(observableOnSubscribe).map(function).subscribe(observer)中的subscribe方法,我们上面说了,因为map返回的是我们new的ObservableMap对象,所以这里调用subscribe方法的是这个ObservableMap,我们进入ObservableMap中的subscribe方法(其实还是进入了它的父类Observable的subscribe方法中)。

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

从代码中我们看到,还是调用抽象方法subscribeActual,
6、这个时候再回到ObservableMap的subscribeActual方法

public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}

好,这个地方注意了,我们先看他们的角色
参数t:就是我们自己new的那个 Integer 的观察者
source:就是我们步骤4最后面赋值的那个source,就是Observable.create的那个Observable,它里面放的被观察者发送的数据是String
function:转换器(也就是我们自己说的那个密码本)
然后看这里的操作
source调用它的subscribe方法,直接放了一个new的MapObserver对象,我们进入它的构造方法

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
    super(actual);
    this.mapper = mapper;
}

再看一下super(actual);

public BasicFuseableObserver(Observer<? super R> downstream) {
    this.downstream = downstream;
}

到这里为止,我们发现,我们自己new的那个 Integer 的观察者,赋值给了downstream,我们自己写的转换器(密码本、function)赋值给了mapper
好,以上相当于初始化工作都已经结束啦,下面再把发送数据的流程代码说一遍
7、这个时候,我们的间谍,用于发送String数据的被观察者发送了数据emitter.onNext("有情况")
我们进入onNext的方法里

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

看,其实还是调用了observer.onNext(t),我们需要清楚的是,这个observer是谁??我们看看这个observer是怎么来的
首先CreateEmitter的构造函数传进来的,那又是哪里创建的CreateEmitter对象呢,是在 ObservableCreate 的 subscribeActual 方法里,而哪里调用了这个 subscribeActual 方法呢?我们知道subscribeActual 是抽象函数,是在subscribe里调用的,那哪里调用了这个 subscribe方法呢?其实前面已经说过了,就在步骤6那里,不错,就是这个source.subscribe(new MapObserver<T, U>(t, function))
好了,饶了这么一大圈,其实emitter.onNext("有情况"),然后调用了observer.onNext(t),这个observer就是我们上面new的那个MapObserver
好,我们此时进入MapObserver的onNext方法

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }

    U v;

    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}

看关键代码
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.")
这里需要注意,t是间谍发送的“有情况”字符串
mapper.apply()就是我们自己定义的转换器(密码本,就是那个function)
然后这里就把这个String数据转换成了v,v就是我们观察者需要的integer呀
再看关键代码
downstream.onNext(v);
这里的downstream是谁??看步骤6的最后,这个downstream就是我们自己new的那个Integer的观察者呀,然后它就拿到信息了呀
一套流程打完收工!!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • RxJava2源码分析 RxJava的鼎鼎大名相信Android开发的同学都非常熟悉了,其实不仅仅有RxJava,...
    BlackFlag阅读 260评论 0 3
  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,022评论 1 9
  • RXJava是什么 RxJava 在 GitHub 主页上官方解释为:RxJava is a Java VM im...
    向洋xyang阅读 527评论 0 7
  • 2018年12月1日,再过一个月就是2019年,每一条路都有利弊的存在,每一个决定都需要很大的勇气,依旧很相信自己...
    Daisy倾夕阅读 175评论 0 0
  • 独自凭栏观心房,春风拂面,苍白显凄凉。 独笺心事无处诉,身若浮萍意彷徨。 欲弃红尘世外游,日月做伴,潇洒不再愁。 ...
    读书不负我魏霞阅读 436评论 9 18