RxJava源码分析

最简单的观察者列车

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("邦");
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        });

        观察subscribe()得知  大体流程:

         1、会立即调用onStart()方法,在其它操作之前调用
subscriber.onStart();

         2、之后它喜欢用SafeSubscriber吧subscriber包起来(装饰模式)
if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }

         3、包起来后,就开始调用observable的call()方法启动整个列车了
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//可以忽略那个hook,至今没发现hook中有什么实际的代码,方法都只是返回传入的参数而已
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {//hook中仅仅返回了参数
        return onSubscribe;
    }

         4、而我们在call()中操作的subscribe实际上是装饰者SafeSubscriber。原因是传入的参数subscriber就是包好的SafeSubscriber。
public class SafeSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    public SafeSubscriber(Subscriber<? super T> actual) {
        super(actual);
        this.actual = actual;
    }

         5、但其实我们在call()中调用的SafeSubscriber.onNext()方法会直接调用SafeSubscriber内部被包起来的subscriber的onNext()方法
@Override
    public void onNext(T args) {
        try {
            if (!done) {
                actual.onNext(args);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don't add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwOrReport(e, this);
        }
    }

         6、结果因为这个被包起来的subscriber方法是我们写的订阅者,于是订阅者的onNext()触发了 PS: 所以仅仅是包起来,并没有其它操作
         7、综上所述 调用subscribe()之前都是准备阶段,各种包裹,存储变量。一旦调用subscribe(),整个列车就启动了。
    最简单的异常处理:
         1、并不是全程try包起来异常处理的。
         2、第一个异常检测是在subscribe()方法开始时判断订阅者与被订阅者是否为null,抛出“你是不是故意找茬”的异常,这个检测甚至在调用onStart()之前。
         3、值得一提的是onStart()并没有被try包裹起来。
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        
        subscriber.onStart();

         4、有try块包裹了列车的启动方法call()。处理的方式是 (1)手动检测抛出致命错误(这个操作挺频繁) -> (2)传递Throwable给subscriber的onError()  PS: 在检测致命错误后其实还会检测是否订阅了,但因为一定是(已订阅),所以没区别(因为根本没初始化“是否订阅”这个变量)
        try {
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);//(1)手动检测抛出致命错误
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                try {
                    subscriber.onError(hook.onSubscribeError(e));//(2)传递Throwable给subscriber的onError()
                } catch (Throwable e2) {//(3)onError都出错了、抛出 “啊,完蛋啦”
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    hook.onSubscribeError(r);
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

         5、注意这里的onError()方法,它并不是我们写的onError(),而是爱包装的SafeSubscriber的onError(),在此方法内有一个唯一的标识用于让此方法只会被调用一次。
         SafeSubscriber的onError()只做了一个“默认异常处理(其实就是什么都不做)”就执行了我们写的onError()来解决异常。不管我们的onError()执行成功了,还是抛出异常了,又或是根本没写onError(),它都会unsubscribe()来取消订阅。 PS“是否订阅”变量终于改变了
         unsubscribe()还做了另外的操作,但这里没有看到。 PS: 这就是SafeSubscriber(安全订阅者),它代理了对subscribe的操作,当出异常时执行额外的代码。 这可能是RxJava的秘密

         6、如果subscriber.onError()都报错了、就只会检测抛出致命错误后抛出错误 “你的onError()抛异常啦!,异常为 $%# ” PS: 连这个异常都包起来了

        PS: 你可能认为我漏掉了onCompleted(),但这个方法无论是运行成功,还是因失败抛出异常,它都没有被调用。
    最简单的泛型限定:
     1、这里被限制的类型只有两处,(1)、create(OnSubscribe<T>) ; (2)、subscribe(Subscribe<T>)
     它限制泛型的秘密在create()方法中,create()内创建了Observable对象,当光标选中Observable构造方法里的泛型T时,整个滚动条都绿了!
     Observable拥有的泛型只有一个<T>,在构造时实现了<T>的类型,又在subscribe()中限定了<T>,导致subscribe()的参数泛型也必须一致了。
     PS: subscribe()中机智的使用了<? super T>,这是为数不多的泛型父类限定,理由也很简单(父类引用子类)

<head>添加map运算符</head>
瞬间设计模式的难度以几何的倍数上升,为了清晰直观的看源码,我仿写了它的代码。
public class MyRxJava {
    {//主体调用部分
        MyObservable.create(new MyOnSubscribe() {
            @Override
            public void call(MySubscriber s) {
                s.onNext();
            }
        }).subscribe(new MySubscriber() {
            @Override
            void onStart() {

            }

            @Override
            void onNext() {

            }

            @Override
            void onCompleted() {

            }
        });
    }
}

class MyObservable {//RxJava的操作主体,Observable
    private final MyOnSubscribe onSubscribe;

    public MyObservable(MyOnSubscribe subscribe) {
        this.onSubscribe = subscribe;
    }

    public static MyObservable create(MyOnSubscribe subscribe) {
        return new MyObservable(subscribe);
    }

    public final MySubscriber subscribe(MySubscriber subscriber) {
        subscriber.onStart();
        this.onSubscribe.call(subscriber);
        return subscriber;
    }
}

interface MyOnSubscribe {//create时使用,被订阅者

    void call(MySubscriber s);
}

abstract class MySubscriber {//订阅时使用,订阅者
    private boolean isUnsubscribed;//是否被取消订阅(目前没用)

    abstract void onStart();

    abstract void onNext();

    abstract void onCompleted();

    public void unsubscribe() {
        isUnsubscribed = false;
    }

    public boolean isUnsubscribed() {
        return isUnsubscribed;
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容

  • 引言 简单阐述RxJava流程源码,RxJava有以下三种流程,向下递增。 Observable->Observe...
    伍零一阅读 756评论 0 8
  • RxJava强大的地方之一是他的链式调用,轻松地在线程之间进行切换。这几天也大概分析了一下RxJava的线程切换的...
    TripleZhao阅读 1,224评论 2 8
  • RxJava源码分析(1) Rxjava相信大家都不陌生,是现在很流行的一种解决异步通信的框架,分析源码,不会对R...
    JCJIE阅读 294评论 0 0
  • RxJava 简单来说 , 是一个很灵活切换线程的裤子 . 简单试例 源码解读试例 变换思想图解 变换思想总结 1...
    Justson阅读 1,396评论 0 1
  • 情人节前后,理应高调大声秀恩爱、歌颂伟大的爱情。 但看多了励志文章和心灵鸡汤,我现在对喊口号式的寄语无感,甚至有些...
    颗粒crown阅读 941评论 0 3