什么是RxJava?
要知道什么是RxJava首先要去了解下Rx。Rx全称Reactive Extensions,直译过来就是响应式扩展。
RxJava是基于观察者模式,他是一种编程模型,目标是提供一致的编程接口,帮助开发者方便的处理异步数据流。
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。Rx已经渗透到了各个语言中,有了Rx所以才有了 RxJava,Rx.NET、RxJS、RxSwift、Rx.rb、RxPHP等等,更详细的可以去这里看看languages
那么RxJava到底是什么呢?我对他的理解就是对于java语言的异步的响应式编程库。
简单说下什么是响应式编程
举个简单的栗子;
A=B+C
A被赋值为B和C的和,这是如果B的值发生变化,A的值并不会发生变化。而如果我们运用一种机制,当B或者C发送变化,A的值也随之变化,这就是响应式。
关于响应式的更多更多了解点击这里传送门
聊聊RxJava的观察者模式
刚刚已经了解了RxJava是有RX衍生过来的,所以RxJava也是基于观察者模式。就好比一个害羞的小男孩,暗恋一个小女孩,女生的一举一动他都认真仔细的观察,女孩饿了,渴了。。。最后终于看着女孩被别人牵走啦。所以不能只说观察,也要行动啊。
上例中的女孩就是被观察者,男孩则是观察者。对应RxJava中,有这几个基本概念:
- Observable (可观察者,即被观察者)
- Observer (观察者)
- subscribe (订阅)
- 事件
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext()
(相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxJava API的使用
(1) 创建 Observer
(2) 创建 Observable
也可以利用just或者from快速创建observable
- just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted(); - from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
(3) Subscribe (订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
(4)线程控制Scheduler
在调用Scheduler的时候有这两个方法
- subscribeOn() : 设置的是被观察者所做的一些操作的线程(发送事件的线程)
- observeOn() : 设置的是观察者被通知之后在哪个线程操作(接收事件的线程)
多次指定发送事件(上游)的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定接收(下游)的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
RxJava 操作符
- map
Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。
例:得到多个Student对象中的name,保存到nameList中
Observable.just(student1, student2, student2)
//使用map进行转换,参数1:转换前的类型,参数2:转换后的类型
.map(new Func1<Student, String>() {
@Override
public String call(Student i) {
String name = i.getName();//获取Student对象中的name
return name;//返回name
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
nameList.add(s);
}
});
上例,map将原来的student数据源转化为String name 发送出去了,观察者拿到的就是name
FlatMap
FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的ObservableconcatMap
它类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
例如,上面是打印每个学生的名字,这边我们需要打印出每个学生的课程名称,一个学生有多个课程
/**
* 学生类
*/
class Student {
private String name;//姓名
private List<Course> coursesList;//所修的课程
...
}
/**
* 课程类
*/
class Course {
private String name;//课程名
private String id;
...
}
利用flatMap变化一下:
List<Student> students = new ArrayList<Student>();
students.add...
...
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCoursesList());
}
})
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
Log.i(TAG, course.getName());
}
});
上述例子中,数据源转化是这样的:
student -> course1,course2... -> 利用from()将所有的course放入一个observerble中发送出去
接收消息的数据源就变成 List<Course>课程的列表,直接打印名称
- zip
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
使用场景:
比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了:
Compose操作符是将源Observable按照自定义的方式转化成另外一个新的Observable。可以这么说compose是对Observable进行操作的而lift是对Subscriber进行操作的,作用点是不同的。
(这里的lift类似map,将数据类型转化)
- fliter
只发射通过了谓词测试的数据项