RXjava
实质是一个异步操作库
1. 导入
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.2'
2. 概念
- Observable(被观察者) :
- Observer(观察者) :
- subscribe(订阅) :
- event(事件) :
- ObservableEmitter(发射器)用来发送事件,通过调用emitter的onNext(),onCompleted()和onError()分别发送不同的事件。
- Disposable: 调用它的dispose()方法时,将中断事件接收,上游继续发送事件,但下游不再接收事件。
- 下游Observer的onSubscribe()方法最先调用。
- Observable和Observer通过subscribe()方法实现订阅关系,从而Observable可以在需要时发出事件来通知Observer。
- 与传统观察者模式不同,RxJava的事件回调方法除了普通事件onNext()之外,还定义了来年改革特殊的事件:onCompleted()和onError()。
- onNext():
- onCompleted():事件队列完结,RxJava不仅把每个事件单独处理,还会把他们看作一个队列。RxJava规定,当不会再有新的onNext()发出时,需要触发onCompleted()方法作为标志。不会阻止上游Observable发送后面的事件,但是会阻止Observer接收后面的事件。
- onError(): 事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许在有事件发生。
- 在一个正确运行的事件序列中,onCompleted()和onError()有且只有一个,并且事件序列中的最后一个。onCompleted()和onError()二者互斥,在队列中只会调用其中一个。
- Observer的重载方法:
- 不带参数 表示下游不关心任何事件
- 一个Consumer参数表示下游只关心onNext()事件
- Scheduler(线程控制):在不指定线程的情况下,Rxjava遵循的好是线程不变原则。如要切换线程需要用Scheduler
- Schedulers.immediate():直接在当前线程运行,相当于不指定线程。
- Schedulers.newThread():总是启用新线程,并在新线程只想能够操作
- Schedulers.io():I/O操作(读写文件,读写数据库、网络信息交互等)使用的Scheduler。内部实现用一个无限量上线的线程池
- Schedulers.comptation():计算所使用的Scheduler。这里指的是CPU密集型计算,即不能被I/O等操作限制性能的操作。如图形计算。固定的线程池,大小同cpu核数
- AndroidSchedulers.mainThread():Android主线程运行。
- subscribeOn():指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程,或者叫事件产生的线程。
- observeOn():指定Subscriber所运行在的线程,或者叫事件消费的线程。
- Map:map是RxJava中最简单的变化操作符,作用是对上游发送的每个事件应用一个函数,使得每一个事件都按照指定的函数去变化。
- flatMap:将一个发送事件的上游Observable变化为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。flatMap不保证事件顺序,如果需要保证顺序则需要使用concatMap。
- concatMap:和flatMap功能相同,但是严格按照数序执行
- Zip 通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件,它按照严格的顺序应用这个函数。它只发射与发射数据项最少的哪个Observable一样多的数据。
- 两个Observable如果不指定线程在同一个线程执行,此时会先将第一个observable的事件发送完之后在发送第二个observable的事件。
- 若两个observable指定不同的线程就会同时发送,当遇到其中的一个onComplete时另外一个observable也停止发送事件。如果没有onComplete则两个observable会将事件全部发送
- zip内部通过队列来存储接收observable发送的事件,然后按照顺序取出事件发送下去,当其中一个队列为空时,处于等待状态
- filter 过滤器 过滤出有用的事件发送
- sample 每隔指定时间就从上游取出一个事件发送给下游
- Subscriber
- Subscription
- Flowable
- 默认有一个128的存储可以存放128个事件
- BackpressureStrategy.ERROR
- BackpressureStrategy.BUFFER 无限制大小
- BackpressureStrategy.DROP 把存不下的事件丢弃
- BackpressureStrategy.LATEST 保留最新的事件
- 方法设置
- onBackpressureBuffer()
- onBackpressureDrop()
- onBackpressureLatest()
- MissingBackpressureException 异常
3. 操作符(Operators)
其实实质是函数式编程中的高阶函数,是对响应式编程的各个过程拆分封装后的产物,以便操作数据流。按照作用分为以下几类:
- 创建:创建一个可观察对象Observable并发射数据。
- 过滤:从Observable发射的数据中取出特定的值。
- 变换:对Observable发射的数据执行变换操作。
- 组合:组合多个Observable。例如:{1,2,3} + {4,5,6} --> {1,2,3,4,5,6}
- 聚合:聚合多个Observable。例如:{1,2,3}+{4,5,6}-->{[1,4],[2,5],[3,6]}
所有的操作符均可连接使用,共同对结果起作用。
3.1 创建操作
3.1.1 create
基本创建操作符,创建一个Observable。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello World");
}
});
3.1.2 just
创建一个Observable,可以接受一个或多个参数,将每个参数逐一发送
Observable observable1 = Observable.just("hello World");//发送一个字符串
Observable observable2 = Observable.just(1, 2, 3, 4);//逐一发送整数
3.1.3 fromArray
创建一个Observable,接受一个数组,并将数组中的数据逐一发送
String[] strings = {"a", "b", "c", "d"};
Observable observable = Observable.fromArray(strings);
3.1.4 formIterable
创建一个Observable,接受一个可迭代的对象,并将可迭代对象中的数据逐一发送
List<String>stringList = Arrays.asList("a","b","c","d");
Observable observable4 = Observable.fromIterable(stringList);
3.1.5 range
创建一个Observable,发送一个范围内的整数序列
Observable observable = Observable.range(0,5);
3.2 过滤操作符
3.2.1 filter
使用Predicate函数接口传入条件值,判断Observable发射的没一个值是否满足这个条件,如果满足,则继续向下传递,如果不满足,则过滤掉。
//过滤掉奇数,只发送偶数。
Observable observable = Observable.range(0, 10)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
});
3.2.2 distinct
过滤掉重复的数据项,即只允许还没有发射过的数据项通过。
Observable observable = Observable.just(1, 4, 4, 5, 6, 3, 1, 3, 5)
.distinct();
3.3 变换操作符
3.3.1 map
对Observable发射的每一项数据应用一个函数,执行变化操作。需要接收一个函数接口Funcation<T,R>的实例化对象,实现接口内apply(T t)方法,在此方法内对接收到的数据t进行变换并返回。
Observable observable = Observable.range(0, 5)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "this is a num" + integer;
}
});
3.3.2 flatMap
将一个Observable变成成多个Observable,然后将多个Observable发射的数据合并到一个Observable中进行发射。
Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Integer nums3[] = new Integer[]{7, 8};
Observable observable9 = Observable.just(nums1, nums2, nums3)
.flatMap(new Function<Integer[], Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer[] integers) throws Exception {
return Observable.fromArray(integers);
}
});
3.4 组合操作
3.4.1 mergeWith/merge
合并多个Observable发射的数据,可能会让Observable发射的数据交错。
Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Observable observable1 = Observable.fromArray(nums1);
Observable observable2 = Observable.fromArray(nums2);
Observable.merge(observable1, observable2);
//或者
Observable.fromArray(nums1)
.mergeWith(Observable.fromArray(nums2));
3.4.2 concatWith/concat
合并多个Observable发射的数据,不会让Observable发射的数据交错。
Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Observable observable1 = Observable.fromArray(nums1);
Observable observable2 = Observable.fromArray(nums2);
Observable.concat(observable1,observable2);
//或者
Observable.fromArray(nums1)
.concatWith(Observable.fromArray(nums2));
3.5 聚合操作
3.5.1 zipWith/zip
将多个Observable发射的数据,通过一个函数BitFunction对对应位置的数据处理后放到一个新的Observable中发射,所发射的个数与最少的Observable中的一样多。
Integer nums1[] = new Integer[]{1, 2, 3};
Integer nums2[] = new Integer[]{4, 5, 6};
Observable.fromArray(nums1)
.zipWith(Observable.fromArray(nums2), new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer + " + " + integer2 + "=" + (integer + integer2);
}
});
Observable observable1 = Observable.fromArray(nums1);
Observable observable2 = Observable.fromArray(nums2);
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer + " + " + integer2 + "=" + (integer + integer2);
}
});
4. 线程调控 Scheduler
4.1 subscribeOn
subscribeOn通过接收一个Scheduler参数,来指定上游发射器所在的线程,若多次设定,则只有一次起作用。
4.2 observeOn
observeOn通过接收一个Scheduler参数,来指定下游操作所在的线程,若多次指定则每次均起作用。
4.3 Scheduler种类
4.3.1 Schedulers.io()
用于IO密集型的操作,例如读取SD卡文件、查询数据库、访问网络等,具有线程缓存机制。
4.3.2 Schedulers.newThread()
在每执行一次任务时创建一个新的线程,不具有线程缓存机制,效率比Scheduler.io()地。
4.3.3 Schedulers.computation()
用于CPU密集型计算任务,即不会被I/O等操作限制性能的耗时操作,例如xml,json文件解析,Bitmap图片压缩取样等。具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间浪费CPU。
4.3.4 Schedulers.trampoline()
在当前线程立即执行任务,如果当前线程有任务在这执行,则将其停止,等插入进来的任务执行完成之后,在将未执行完成的任务接着执行。
4.3.5 Schedulers.single()
拥有一个线程单例,所有任务都在一个线程中执行,在此线程中有任务执行时,其他任务按照先进先出的顺序依次执行。
4.3.6 Schedulers.from(@NonNull Executor executor)
指定一个线程调度器,由此调度器来控制任务的执行策略。
4.3.7 AndroidSchedulers.mainThread()
在Android UI线程中执行任务。
4.4 实例
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())//设置可观察对象在Schedulers.io()的线程中发射数据
.observeOn(Schedulers.newThread()) //设置map操作符在Schedulers.newThread()线程中处理数据
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "this number is " + integer;
}
})
.observeOn(AndroidSchedulers.mainThread())//设置观察者在AndroidSchedulers.mainThread()的线程中处理数据
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Logger.d(s);
}
});
5. 背压
当上下游处于不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射的速度快于下游处理数据的速度,这样没有处理的数据就会造成积压,这些数据不会丢失,也不会被垃圾回收机制收回,而是存放在一个异步缓存池中,如果缓存池中数据一直得不到处理,越积越多,最后就造成内存溢出,这就是响应式编程中的背压问题。
5.1 Flowable
为解决背压问题,Rxjava2中,用Flowable来解决。在使用Flowable的时候,可观察对象不再是Observable,而是Flowable,观察者不再是Observer,而是Subscriber.Flowable与Subscriber之间依然通过subscribe()进行订阅。
因为Flowable效率要比Observable低,因此,只有在需要处理背压问题时,才需要使用Flowable。
一定不会出现背压问题:
- 上下游运行在同一个线程中。
- 上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度。
- 上限由工作在不同的线程中,但是数据流中只有一条数据。
Flowable与Observable不同点:
- create()方法多了一个BackpressureStrategy类型的参数。
- 订阅器Subscriber中,方法onSubscribe回调的参数不是Disposable而是Subscription,多个一行代码:
- Flowable通过自身特有的异步缓存池,来缓存没来得及处理的数据,缓存池的容量上限128。Observable缓存没有容量限制,对于没有来得及处理的数据可以一直向里面添加,数据过多就会产生内存溢出(OOM)。
s.request(Long.MAX_VALUE)
- Flowable发射数据时,使用特有的发射器FlowableEmitter,不同于Observable的ObservableEmitter
5.2 BackpressureStrategy背压策略
5.2.1 ERROR
在此策略下,如果放入Flowable的异步缓存池中的数据超限,则会抛出MissingBackpressureException异常。
5.2.2 DROP
在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据。缓存池中数据的清理,并不是subscriber接受一条,便清理一条,而是存在一个延迟,等累积一段时间后统一清理一次。
5.2.3 LATEST
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察值接收到完成通知之前,能够接收到FLowable最新发射的一条数据。(加入发射500个数据,最后一定能接收到最后一条数据。)
5.2.4 BUFFER
默认策略。在其构造方法中可以发现,其内部维护了一个缓存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。
5.2.5 MISSING
通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
5.3 onBackpressureXXX背压操作符
Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略。
onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST
5.4 Subscription
Flowable使用的是响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。默认下游需求量为零。Subscription.request(1)拉取数据。数据会先全部发送到缓存池中,然后根据拉取发送数据。
5.5 FlowableEmitter
FlowableEmitter.requested()获取一个动态的值,会随着下哟偶已经接收的数据的数量而递减,上游通过e.requested()获取的值也就变成了0,如果此时,再发送数据的话,则会根据BackpressureStrategy背压策略的不同,抛出MissingBackpressureException异常,或者丢掉这条数据。
Flowable
.create(new FlowableOnSubscribe<Integer>() {
private Subscription subscription;
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
int i = 0;
while (true) {
if (emitter.requested() == 0) continue;
System.out.println("发送 --->" + i);
emitter.onNext(i);
i++;
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(500);
System.out.println("接受 --->" + integer);
subscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
6. 简化版Observable
6.1 Single
只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。
6.2 Completable
只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个
6.3 Maybe
可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。