此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著
一、什么是RxJava?
RxJava是一个非常著名的开源库,是ReactiveX(Reactive Extensions)的一种Java实现。ReactiveX是一种响应式扩展框架,有很多种实现,如RxAndroid, RxJS,RxSwift,RxRuby,RxCpp,RxGo等。目前RxJava有1.x和2.x两个主要分支,分别代表RxJava1和RxJava2。RxJava1发布的时间比较早, 使用也更广泛,所以先来了解下RxJava1。
RxJava可看作由Observable、Subscriber和Scheduler组成的。Subscriber订阅到Observable,Obervable会在默认或者指定的Scheduler上工作并产生数据流返回给Subscriber,Subscriber也会在默认或者指定的Scheduler上接收Observable发送过来的数据流。Scheduler是对线程的一种抽象,不同的Scheduler代表 了不同的线程。
1.1 Observable和Subscriber
Observable提供了subscribe方法, 当有Subscriber通过subscribe方法订阅到Observable时, Observable就可以向Subscriber发送数据流。响应式编程中的事件分为三类:普通事件 ,错误事件和结束事件,在Subscriber中有三个方法与这三件事一一对应,Observable会通过调用Subscriber的这三个方法来发送对应的事件。
- onNext:当Observable要发送普通事件时,就会调用这个方法。这个方法可被调用0~N次。
- onError: 当在Observable内部有异常或者错误产生时,就可以调用这个方法来向Subscriber发送错误事件。这个方法只能被调用一次。
- onComplete: 如果Observable已经发送完所有数据,并且没有发生错误,这时就需要调用这个方法来向Subscriber发送结束事件。这个方法只能调用一次,而且和onError是互斥的关系,也就是说调用了onError后就不能调用onComplete 反之亦然。在onError或者onComplete被调用之后, Observable就失去了作用, 不能再调用onNext来发送数据了。
Subscriber还提供了unsubscribe方法, 当Subscriber订阅到Observable之后, 可以随时调用这个方法来终止对Observable的订阅。
二、在Android工程中引入RxJava
只需要在Gradle配置文件中加入对RxJava的依赖。但是Android开发中一般会添加对RxAndroid的依赖,而RxAndroid已经依赖于RxJava,且一般难以及时更新到最新版的RxJava,所以如果想使用RxAndroid和最新版本的RxJava,则可以通过下面的配置来导入依赖。
api ('io.reactivex.rxjava2:rxandroid:2.1.0'){
exclude module:'rxjava2'
}
api 'io.reactivex.rxjava2:rxjava:2.2.1'
三、RxJava中的操作符
3.1 创建Observable的操作符
3.1.1 range
range操作符创建的Observable将会发送一个范围内的数据
Observable.range(0,4)
.subscribe(new Action1<Integer>(){
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
结果是:
0 1 2 3
3.1.2 defer和just
defer操作符,只有当Subscriber来订阅的时候才会创建一个新的Observable对象, 也就是说每次订阅都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的。
just操作符接收某个对象作为输入,然后会创建一个发送该对象的Observable。这个对象可以是一个数字,一个字符串,数组,Iterate对象等。just是一种非常快捷的创建Observable对象的方法,在后面的例子中会大量使用。
我们通过这两个操作符创建两个Observable,这两个Observable都会将Android系统中的当前时间作为数据发送。我们将这两个Observable分别保存在变量deferObservable和justObservable中。
Observable<Long> deferObservable = getDefer();
Observable<Long> justObservable = getJust();
private Observable<Long> getJust() {
return Observable.just(System.currentTimeMillis());
}
private Observable<Long> getDefer() {
return Observable.defer(new Func0<Observable<Long>>() {
@Override
public Observable<Long> call() {
return getJust();
}
});
}
现在分别对这两个Observable进行订阅
private void Just_Defer(){
for(int i = 0; i<3;i++) {
deferObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("defer: " + aLong);
}
});
justObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("just: " + aLong);
}
});
}
}
输出结果可以看到defer每次订阅都会得到Observable发送的一个全新的当前时间, 而just创建的操作符即使订阅多次也都会发出和首次订阅一样的数据。
defer: 1554532535942
just: 1554532535905
defer: 1554532535954
just: 1554532535905
defer: 1554532535954
just: 1554532535905
3.1.3 from
from操作符接收一个对象作为参数来创建Observable,这个参数对象可以是Iterable、Callable、Future和数组等。 from操作符创建的Observable将发送参数对象里的数据, 其创建方式类似于just操作符,但是just操作符创建的Observable会将整个参数对象作为数据一下子发送出去。 比如说参数对象是一个含有10个数字的数组,使用from创建的Observable就会发送10次, 每次发送一个数字,而使用just创建的Observable会一次就将整个数组发送出去。
首先创建一个数组和一个List, 分别存储0~5的整数,然后使用from操作符分别以数组和List作为输入参数创建两个Observable对象。之后分别订阅,会输出什么结果呢?
Integer[] arrays = {0,1,2,3,4,5};
List<Integer> list = new ArrayList<>();
private Observable<Integer> FromArray() {
return Observable.from(arrays);
}
private Observable<Integer> FromIterable() {
for(int i = 0;i <= 5; i++) {
list.add(i);
}
return Observable.from(list);
}
private void From(){
FromArray().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("FromArray: " + integer);
}
});
FromIterable().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("FromIterable: " + integer);
}
});
}
FromArray: 0
FromArray: 1
FromArray: 2
FromArray: 3
FromArray: 4
FromArray: 5
FromIterable: 0
FromIterable: 1
FromIterable: 2
FromIterable: 3
FromIterable: 4
FromIterable: 5
3.1.4 interval
interval所创建的Observable对象会从0开始, 每隔固定的时间发送一个数字。需要注意的是, 这个对象时运行在computation Scheduler中的,所以在Android开发中 ,如果需要在UI中显示结果, 则需要在主线程中订阅。
代码通过interval操作符创建了一个Observable, 创建的Observable将会以1秒为间隔不断地发送数据,因为我们需要更新UI,所以在主线程中进行订阅。这里的AndroidSchedulers.mainThread()属于RxAndroid库, RxAndroid是Jake Wharton在Android平台上开发的一个对RxJava的扩展。最后创建一个Subscriber对象对这个Observable对象进行订阅, 就会每秒输出一个从0开始递增的数据。
private void interval(){
Subscriber<Long> subscriber = new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("onNext:" + aLong);
}
};
Observable.interval(0,10, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
最后结果是,原因不知道
onError: null
最后,由于interval操作符创建的Observable对象会不停地发送数据,所以当我们不再需要它的数据时要调用unSubscribe方法进行反订阅,反订阅后Observable将会停止发送数据
subscriber.unsubscribe();
3.1.5 repeat和timer
repeat操作符可以让Observable对象发送的数据重复发送N次, 我们可以指定其发送的次数。timer操作符创建的Observable会在指定时间后发送一个数字0, 注意其默认也是运行在computation scheduler 上。
下面我们来创建一个发送整数1,2,3的源Observable, 并使用repeat操作符在源Observable的基础上创建一个新的Observable,使其重复发送数据3次。另外使用timer创建一个会在1秒后发送数据的Observable。
private void Repeat_Timer() {
Observable.just(1,2,3).repeat(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("repeat: " + integer);
}
});
Observable.timer(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("timer: " + aLong);
}
});
}
结果呢,后面的这个报空指针异常,还没找到原因;正常timer操作符创建的Observable在订阅1秒后发送一个数据0.
repeat: 1
repeat: 2
repeat: 3
repeat: 1
repeat: 2
repeat: 3
repeat: 1
repeat: 2
repeat: 3
另外还有几个非常简单的创建操作符,如naver、empty、throw等,请移步查看官方文档。