RxJava 知识梳理(1) - RxJava 基本思想

一、基础概述

RxJava的关键是异步,即使随着程序的逻辑变得复杂,它依然能够保持简洁。

二、API介绍和原理剖析

观察者模式面向的需求是:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应,观察者采用注册Register或者订阅Subscribe的方式,告诉观察者,我需要你的某某状态,并在它变化的时候通知我,在RxJava当中,Observable是被观察者,Observer就是观察者。

RxJava有四个基本概念:

  • Observable:被观察者。
  • Observer:观察者。
  • Subscribe:订阅。
  • Event:事件。

ObservableObserver通过subscribe方法实现订阅关系,Observable可以在需要的时候发出事件来通知Observer

RxJava有以下三种事件:

  • onNext:普通事件。
  • onCompletedRxJava不仅把每个事件单独处理,还会把它们看作一个队列,当不会再有新的onNext事件发出时,需要触发onCompleted事件作为标志。
  • onErroronCompleted和有且仅有一个,并且是事件序列中的最后一个。

三、基本实现

RxJava的基本实现有以下三点:
1)创建观察者 - Observer

Observer<String> observer = new Observer<String>() {

    @Override
    public void onCompleted() {
        Log.d(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext");
    }
};

除了Observer接口之外,RxJava还内置了一个实现了Observer的抽象类:Subscriber,它对Observer接口进行了一些扩展,实质上在RxJavasubscribe过程中,Observer也总是被转换成为一个Subscriber再使用,他们的区别在与:

  • onStart:这是新增的方法,它会在subscribe刚开始,而事件还未发送之前被调用,它总是在subscribe所发生的线程被调用。
  • unsubscribe:这是它实现的另一个接口Subscription的方法,用于取消订阅,在这个方法被调用后,Subscriber将不再接收事件,一般在调用这个方法前,可以使用isUnsubscribed判断一下状态,Observable在订阅之后会持有Subscriber的引用,因此不释放会有内存泄漏的危险。

2)创建被观察者 - Observable
RxJavacreate方法来创建一个observable

rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {

    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello World!");
        subscriber.onCompleted();
    }
});

这里传入了一个Observable.OnSubscribe<T>对象作为参数,它会被存储在返回的Observable对象当中,它的作用相当于一个计划表,当Observable被订阅的时候,OnSubscribecall方法会自动被调用,事件序列被依次触发。
createRxJava最基本的创造事件序列的方法,基于这个方法,还提供了一些快捷方法来创建事件队列:

  • just(T...)
Observable observable = Observable.just("Hello", "Hi", "Aloha");
  • from(T[]) / from(Iterable<? extends T>)
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);

3)订阅 - subscribe

observable.subscribe(observer);
observable.subscribe(subscriber);

其内部核心的代码类似于:

public Subscription subscribe(Subscriber subscriber) {
    //准备方法。
    subscriber.onStart();
    //事件发送的逻辑开始执行,这个onSubscribe就是创建Observable时新建的OnSubscribe对象。
    onSubscribe.call(subscriber);
    //把传入的Subscriber转换为Subscription并返回,方便unsubscribe。
    return subscriber;
}

Observable.subscribe方法除了支持传入ObserverSubscriber,还支持传入Action0Action1这样不完整定义的回调,RxJava会自动根据定义创建出Subscriber

四、线程控制

在不指定线程的情况下,RxJava遵循这样的原则,在哪个线程调用subscribe,就在哪个线程产生事件,在哪个线程产生事件,就在哪个线程消费事件,如果需要消费线程,那么就需要用到SchedulerRxJava内置了几个Scheduler

  • Schedulers.immediate:直接在当前线程运行。
  • Schedulers.newThread:总是启用新线程,并在线程执行操作。
  • Schedulers.io:其内部实现是一个无数量上限的的线程池,可以重用空闲的线程,不要把计算工作放在io,可以避免创建不必要的线程。
  • Schedulers.computation:使用固定的线程池,大小为CPU核数。
  • AndroidSchedulers.mainThread:指定的操作将在Android主线程中运行。

对线程控制有以下两个方法:

  • subscribeOn:指定subscribe发生的线程,即Observable.OnSubscribe被激活时所处的线程,也就是call方法执行时所处的线程。
  • observeOn:指定Subscriber所运行在的线程。

observeOn指定的是Subscriber的线程,而这个Subscriber并不一定是subscribe()参数中的Subscriber,而是observeOn执行时的当前Observable所对应的Subscriber,即它的直接下级Subscriber,也就是它之后的操作所在的线程,因此,如果有多次切换线程的要求,只要在每个想要切换线程的位置调用依次observeOn即可。
observeOn不同,subscribeOn只能调用一次,下面我们来分析一下它的内部实现,首先是subscribeOn的原理:
subscribeOnObserveOn都做了线程切换的工作:

  • subscribeOn的线程切换发生在OnSubscribe中,即在它通知上一级OnSubscribe时,这时事件还没有发送,因此subscribeOn的线程控制可以从事件发出的开端造成影响。
Paste_Image.png
  • observeOn的线程切换则发生在它内建的Subscriber中,即发生在它即将给下一级Subscriber发送事件时,因此控制的是它后面的线程。
Paste_Image.png

五、变换

变换,就是将事件序列中的对象或整个序列进行加工处理,转换不同的事件或者序列。

5.1 map()

通过FuncX,把参数中的Integer转换成为String,是最常用的变换,这个变换是发生在subscribeOn所指定的线程当中的。

Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        long nextId = Thread.currentThread().getId();
        Log.d(TAG, "onNext:" + s + ", threadId=" + nextId);
    }
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        long callId = Thread.currentThread().getId();
        subscriber.onNext(5);
        subscriber.onCompleted();
    }
});
observable.map(new Func1<Integer, String>() {

    @Override
    public String call(Integer integer) {
        long mapId = Thread.currentThread().getId();
        return "My Number is:" + integer;
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

其示意图类似于:


Paste_Image.png

5.2 flatMap

它和map有一个共同点,就是把传入的参数转化之后返回另一个对象,但是和map不同的是,flatMap返回的是一个Observable对象,而且它并不直接把这个对象传给Subscriber,而是通过这个新建的Observable来发送事件,其整个的调用过程:

  • 使用传入的事件对象创建一个Observable
  • 激活这个Observable,通过它来发送事件。
  • 每一个创建出来的Observable发送的事件,被汇入同一个Observable,它复杂将这些事件同一交给Subscriber的回调方法。
Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onCompleted() {}

    @Override
    public void onError(Throwable e) {}

    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext, s=" + s);
    }
};
Observable<List<String>> observable = Observable.create(new Observable.OnSubscribe<List<String>>() {

    @Override
    public void call(Subscriber<? super List<String>> subscriber) {
        List<String> list = new ArrayList<>();
        list.add("First");
        list.add("Second");
        list.add("Third");
        subscriber.onNext(list);
    }
});
observable.flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> strings) {
        return Observable.from(strings);
    }
}).subscribe(subscriber);

其示意图:


Paste_Image.png

六、变换的原理

变换的实质是针对事件序列的处理和再发送,在RxJava的内部,它们是基于同一个基础的变换方法lift(operator)

//生成了一个新的Observable并返回。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    //构造新的Observable时,同时新建了一个OnSubscribe对象。
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            //原始的onSubscribe。
            onSubscribe.call(newSubscriber);
        }
    });
}

示意图:

Paste_Image.png
  • lift创建了一个Observable后,加上之前的原始Observable,有两个Observable
  • 新的Observable里的OnSubscribe加上原始的,共有两个OnSubscribe
  • 当用户通过调用lift/map创建的Observable对象的subscribe方法时,于是它触发了上面的call方法中的内容。
  • 在这个新的OnSubscribecall方法中,传入了目标的Subscriber,同时其外部类中还持有了原始的OnSubscribe。我们先通过operator.call(oldSubscriber)方法,生成了新的Subscriber(new Subscriber),然后利用这个新的Subscriber向原始的Observable进行订阅。

下面我们以前面map实现的例子来分析一下源码,上面的例子通过map操作符把Integer类型的ObservableString类型的Subscriber生成了订阅关系。

  • map方法,它通过lift方法返回了一个String类型的Observable
//其中T=Integer,R=String。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
}
  • 下面看下OperatorMap这个对象,这个对象实现了operator<R,T>接口,而这个接口继承于Func1<Subscriber<? super R>, Subscriber<? super T>>,在它实现的call方法中传入了String类型的Subscriber(目标Subscriber),并返回了Integer类型的Subscriber(代理Subscriber),当它的方法被回调时,会调用目标Subscriber的对应方法,其中在调用onNext时,就用上了外部传入的Func1函数:
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }
  • 接着再回过头来看lift方法:
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    //返回一个Integer类型的Subscriber。
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        st.onStart();
                        //关键方法:Integer类型的OnSubscribe调用对应的Subscribe,这个call方法里面写了我们的逻辑,当它调用onNext(Integer integer)时,实际上调用的是onNext(String str)。
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    o.onError(e);
                }
            }
        });
    }
  • 最后就是调用subscribe方法。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,451评论 7 62
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,158评论 6 151
  • 文章转自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物线在正...
    xpengb阅读 7,017评论 9 73
  • 前言 今天,没有风和日丽,没有太阳高照夏夏苦逼的坐着公交车去面试了一把。无论结果如何,总要涨一波经验的说于是乎夏夏...
    __夏至未至阅读 524评论 6 6
  • 作者/胄宁 空山暮雨渲染着迟夏的凄悲, 劫后重生仍旧残留着的余味, 你楚楚的眼神在向他传达着暧昧, 在这暧昧的背后...
    胄宁阅读 515评论 8 6