RXjava

RXjava

实质是一个异步操作库

1. 导入

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

compile 'io.reactivex.rxjava2:rxjava:2.1.2'

2. 概念

  1. Observable(被观察者) :
  2. Observer(观察者) :
  3. subscribe(订阅) :
  4. event(事件) :
  5. ObservableEmitter(发射器)用来发送事件,通过调用emitter的onNext(),onCompleted()和onError()分别发送不同的事件。
  6. Disposable: 调用它的dispose()方法时,将中断事件接收,上游继续发送事件,但下游不再接收事件。
  • 下游Observer的onSubscribe()方法最先调用。
  • Observable和Observer通过subscribe()方法实现订阅关系,从而Observable可以在需要时发出事件来通知Observer。
  • 与传统观察者模式不同,RxJava的事件回调方法除了普通事件onNext()之外,还定义了来年改革特殊的事件:onCompleted()和onError()。
  1. onNext():
  2. onCompleted():事件队列完结,RxJava不仅把每个事件单独处理,还会把他们看作一个队列。RxJava规定,当不会再有新的onNext()发出时,需要触发onCompleted()方法作为标志。不会阻止上游Observable发送后面的事件,但是会阻止Observer接收后面的事件。
  3. onError(): 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许在有事件发生。
  • 在一个正确运行的事件序列中,onCompleted()和onError()有且只有一个,并且事件序列中的最后一个。onCompleted()和onError()二者互斥,在队列中只会调用其中一个。
  • Observer的重载方法:
  • 不带参数 表示下游不关心任何事件
  • 一个Consumer参数表示下游只关心onNext()事件
  1. Scheduler(线程控制):在不指定线程的情况下,Rxjava遵循的好是线程不变原则。如要切换线程需要用Scheduler
  • Schedulers.immediate():直接在当前线程运行,相当于不指定线程。
  • Schedulers.newThread():总是启用新线程,并在新线程只想能够操作
  • Schedulers.io():I/O操作(读写文件,读写数据库、网络信息交互等)使用的Scheduler。内部实现用一个无限量上线的线程池
  • Schedulers.comptation():计算所使用的Scheduler。这里指的是CPU密集型计算,即不能被I/O等操作限制性能的操作。如图形计算。固定的线程池,大小同cpu核数
  • AndroidSchedulers.mainThread():Android主线程运行。
  1. subscribeOn():指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程,或者叫事件产生的线程。
  2. observeOn():指定Subscriber所运行在的线程,或者叫事件消费的线程。
  3. Map:map是RxJava中最简单的变化操作符,作用是对上游发送的每个事件应用一个函数,使得每一个事件都按照指定的函数去变化。
  4. flatMap:将一个发送事件的上游Observable变化为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。flatMap不保证事件顺序,如果需要保证顺序则需要使用concatMap。
  5. concatMap:和flatMap功能相同,但是严格按照数序执行
  6. Zip 通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件,它按照严格的顺序应用这个函数。它只发射与发射数据项最少的哪个Observable一样多的数据。
  • 两个Observable如果不指定线程在同一个线程执行,此时会先将第一个observable的事件发送完之后在发送第二个observable的事件。
  • 若两个observable指定不同的线程就会同时发送,当遇到其中的一个onComplete时另外一个observable也停止发送事件。如果没有onComplete则两个observable会将事件全部发送
  • zip内部通过队列来存储接收observable发送的事件,然后按照顺序取出事件发送下去,当其中一个队列为空时,处于等待状态
  1. filter 过滤器 过滤出有用的事件发送
  2. sample 每隔指定时间就从上游取出一个事件发送给下游
  3. Subscriber
  4. Subscription
  5. Flowable
  • 默认有一个128的存储可以存放128个事件
  • BackpressureStrategy.ERROR
  • BackpressureStrategy.BUFFER 无限制大小
  • BackpressureStrategy.DROP 把存不下的事件丢弃
  • BackpressureStrategy.LATEST 保留最新的事件
  1. 方法设置
  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()
  • MissingBackpressureException 异常

3. 操作符(Operators)

其实实质是函数式编程中的高阶函数,是对响应式编程的各个过程拆分封装后的产物,以便操作数据流。按照作用分为以下几类:

  • 创建:创建一个可观察对象Observable并发射数据。
  • 过滤:从Observable发射的数据中取出特定的值。
  • 变换:对Observable发射的数据执行变换操作。
  • 组合:组合多个Observable。例如:{1,2,3} + {4,5,6} --> {1,2,3,4,5,6}
  • 聚合:聚合多个Observable。例如:{1,2,3}+{4,5,6}-->{[1,4],[2,5],[3,6]}

所有的操作符均可连接使用,共同对结果起作用。

3.1 创建操作

3.1.1 create

基本创建操作符,创建一个Observable。

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello World");
            }
        });
3.1.2 just

创建一个Observable,可以接受一个或多个参数,将每个参数逐一发送

Observable observable1 = Observable.just("hello World");//发送一个字符串
Observable observable2 = Observable.just(1, 2, 3, 4);//逐一发送整数
3.1.3 fromArray

创建一个Observable,接受一个数组,并将数组中的数据逐一发送

String[] strings = {"a", "b", "c", "d"};
Observable observable = Observable.fromArray(strings);
3.1.4 formIterable

创建一个Observable,接受一个可迭代的对象,并将可迭代对象中的数据逐一发送

List<String>stringList = Arrays.asList("a","b","c","d");
Observable observable4 = Observable.fromIterable(stringList);
3.1.5 range

创建一个Observable,发送一个范围内的整数序列

Observable observable = Observable.range(0,5);

3.2 过滤操作符

3.2.1 filter

使用Predicate函数接口传入条件值,判断Observable发射的没一个值是否满足这个条件,如果满足,则继续向下传递,如果不满足,则过滤掉。

//过滤掉奇数,只发送偶数。
Observable observable = Observable.range(0, 10)
    .filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer % 2 == 0;
    }
});
3.2.2 distinct

过滤掉重复的数据项,即只允许还没有发射过的数据项通过。

Observable observable = Observable.just(1, 4, 4, 5, 6, 3, 1, 3, 5)
    .distinct();

3.3 变换操作符

3.3.1 map

对Observable发射的每一项数据应用一个函数,执行变化操作。需要接收一个函数接口Funcation<T,R>的实例化对象,实现接口内apply(T t)方法,在此方法内对接收到的数据t进行变换并返回。

Observable observable = Observable.range(0, 5)
    .map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "this is a num" + integer;
        }
    });
3.3.2 flatMap

将一个Observable变成成多个Observable,然后将多个Observable发射的数据合并到一个Observable中进行发射。

Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Integer nums3[] = new Integer[]{7, 8};
Observable observable9 = Observable.just(nums1, nums2, nums3)
    .flatMap(new Function<Integer[], Observable<Integer>>() {
        @Override
        public Observable<Integer> apply(Integer[] integers) throws Exception {
            return Observable.fromArray(integers);
        }
    });

3.4 组合操作

3.4.1 mergeWith/merge

合并多个Observable发射的数据,可能会让Observable发射的数据交错。

Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Observable observable1 = Observable.fromArray(nums1);
Observable observable2 = Observable.fromArray(nums2);
Observable.merge(observable1, observable2);

//或者 
Observable.fromArray(nums1)
    .mergeWith(Observable.fromArray(nums2));
3.4.2 concatWith/concat

合并多个Observable发射的数据,不会让Observable发射的数据交错。

Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Observable observable1 = Observable.fromArray(nums1);
Observable observable2 = Observable.fromArray(nums2);
Observable.concat(observable1,observable2);

//或者
Observable.fromArray(nums1)
    .concatWith(Observable.fromArray(nums2));

3.5 聚合操作

3.5.1 zipWith/zip

将多个Observable发射的数据,通过一个函数BitFunction对对应位置的数据处理后放到一个新的Observable中发射,所发射的个数与最少的Observable中的一样多。

Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Observable.fromArray(nums1)
    .zipWith(Observable.fromArray(nums2), new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer integer, Integer integer2) throws Exception {
            return integer + " + " + integer2 + "=" + (integer + integer2);
        }
    });

Observable observable1 = Observable.fromArray(nums1);
Observable observable2 = Observable.fromArray(nums2);
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
    @Override
    public String apply(Integer integer, Integer integer2) throws Exception {
        return integer + " + " + integer2 + "=" + (integer + integer2);
    }
});

4. 线程调控 Scheduler

4.1 subscribeOn

subscribeOn通过接收一个Scheduler参数,来指定上游发射器所在的线程,若多次设定,则只有一次起作用。

4.2 observeOn

observeOn通过接收一个Scheduler参数,来指定下游操作所在的线程,若多次指定则每次均起作用。

4.3 Scheduler种类

4.3.1 Schedulers.io()

用于IO密集型的操作,例如读取SD卡文件、查询数据库、访问网络等,具有线程缓存机制。

4.3.2 Schedulers.newThread()

在每执行一次任务时创建一个新的线程,不具有线程缓存机制,效率比Scheduler.io()地。

4.3.3 Schedulers.computation()

用于CPU密集型计算任务,即不会被I/O等操作限制性能的耗时操作,例如xml,json文件解析,Bitmap图片压缩取样等。具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间浪费CPU。

4.3.4 Schedulers.trampoline()

在当前线程立即执行任务,如果当前线程有任务在这执行,则将其停止,等插入进来的任务执行完成之后,在将未执行完成的任务接着执行。

4.3.5 Schedulers.single()

拥有一个线程单例,所有任务都在一个线程中执行,在此线程中有任务执行时,其他任务按照先进先出的顺序依次执行。

4.3.6 Schedulers.from(@NonNull Executor executor)

指定一个线程调度器,由此调度器来控制任务的执行策略。

4.3.7 AndroidSchedulers.mainThread()

在Android UI线程中执行任务。

4.4 实例

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
})
    .subscribeOn(Schedulers.io())//设置可观察对象在Schedulers.io()的线程中发射数据
    .observeOn(Schedulers.newThread()) //设置map操作符在Schedulers.newThread()线程中处理数据
    .map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "this number is  " + integer;
        }
    })
    .observeOn(AndroidSchedulers.mainThread())//设置观察者在AndroidSchedulers.mainThread()的线程中处理数据
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Logger.d(s);
        }
    });

5. 背压

当上下游处于不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射的速度快于下游处理数据的速度,这样没有处理的数据就会造成积压,这些数据不会丢失,也不会被垃圾回收机制收回,而是存放在一个异步缓存池中,如果缓存池中数据一直得不到处理,越积越多,最后就造成内存溢出,这就是响应式编程中的背压问题。

5.1 Flowable

为解决背压问题,Rxjava2中,用Flowable来解决。在使用Flowable的时候,可观察对象不再是Observable,而是Flowable,观察者不再是Observer,而是Subscriber.Flowable与Subscriber之间依然通过subscribe()进行订阅。
因为Flowable效率要比Observable低,因此,只有在需要处理背压问题时,才需要使用Flowable。
一定不会出现背压问题:

  1. 上下游运行在同一个线程中。
  2. 上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度。
  3. 上限由工作在不同的线程中,但是数据流中只有一条数据。
Flowable与Observable不同点:
  • create()方法多了一个BackpressureStrategy类型的参数。
  • 订阅器Subscriber中,方法onSubscribe回调的参数不是Disposable而是Subscription,多个一行代码:
  • Flowable通过自身特有的异步缓存池,来缓存没来得及处理的数据,缓存池的容量上限128。Observable缓存没有容量限制,对于没有来得及处理的数据可以一直向里面添加,数据过多就会产生内存溢出(OOM)。
s.request(Long.MAX_VALUE)
  • Flowable发射数据时,使用特有的发射器FlowableEmitter,不同于Observable的ObservableEmitter

5.2 BackpressureStrategy背压策略

5.2.1 ERROR

在此策略下,如果放入Flowable的异步缓存池中的数据超限,则会抛出MissingBackpressureException异常。

5.2.2 DROP

在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据。缓存池中数据的清理,并不是subscriber接受一条,便清理一条,而是存在一个延迟,等累积一段时间后统一清理一次。

5.2.3 LATEST

与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察值接收到完成通知之前,能够接收到FLowable最新发射的一条数据。(加入发射500个数据,最后一定能接收到最后一条数据。)

5.2.4 BUFFER

默认策略。在其构造方法中可以发现,其内部维护了一个缓存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。

5.2.5 MISSING

通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。

5.3 onBackpressureXXX背压操作符

Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略。
onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST

5.4 Subscription

Flowable使用的是响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。默认下游需求量为零。Subscription.request(1)拉取数据。数据会先全部发送到缓存池中,然后根据拉取发送数据。

5.5 FlowableEmitter

FlowableEmitter.requested()获取一个动态的值,会随着下哟偶已经接收的数据的数量而递减,上游通过e.requested()获取的值也就变成了0,如果此时,再发送数据的话,则会根据BackpressureStrategy背压策略的不同,抛出MissingBackpressureException异常,或者丢掉这条数据。

Flowable
    .create(new FlowableOnSubscribe<Integer>() {
        private Subscription subscription;

        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            int i = 0;
            while (true) {
                if (emitter.requested() == 0) continue;
                System.out.println("发送 --->" + i);
                emitter.onNext(i);
                i++;
            }
        }
    }, BackpressureStrategy.MISSING)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.newThread())
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            subscription.request(1);
        }

        @Override
        public void onNext(Integer integer) {
            try {
                Thread.sleep(500);
                System.out.println("接受 --->" + integer);
                subscription.request(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    });

6. 简化版Observable

6.1 Single

只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。

6.2 Completable

只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个

6.3 Maybe

可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容