本篇文章已授权微信公众号 YYGeeker
独家发布转载请标明出处
CSDN学院课程地址
- RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10036
- RxJava2从入门到精通-中级篇:https://edu.csdn.net/course/detail/10037
- RxJava2从入门到精通-进阶篇:https://edu.csdn.net/course/detail/10038
- RxJava2从入门到精通-源码分析篇:https://edu.csdn.net/course/detail/10138
2. RxJava基本使用
在RxJava使用之前记得在Gradle中添加依赖引入
implementation "io.reactivex.rxjava2:rxjava:2.1.12"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
2.1 事件发射
在上面的例子中,我们可以注意到被观察者中有个Emitter(发射器),发射器的位置位于subscribe回调参数ObservableEmitter<String> e
中,通过名字我们可以知道,RxJava的事件通知就是通过它来进行发送的,所以它是一个事件发射器,发射器能发送的事件有onNext()
,onComplete()
,onError()
,在观察者的回调中,分别对应着相同方法名进行回调,这里对观察者的回调方法进行简单介绍
- onSubscribe():当观察者被订阅时回调
- onNext():当观察者收到onNext事件时回调
- onComplete():当观察者收到onComplete事件时回调
- onError():当观察者收到onError事件时回调
2.2 链式调用
人类就喜欢酷炫的东西,炫耀自身的优点,当然RxJava也少不了人类这种心理,就是链式编程,下面这段代码可以完美替代例子上面的所有代码,其效果是和上面一样的。这里需要注意的是,创建Observer过程中,会将所有的接收方法都创建出来,如果此时程序发生异常,RxJava默认会将异常信息try-catch
public static void main(String[] args) {
//创建被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默认在主线程里执行该方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但还是要保持微笑");
e.onComplete();
}
})
//创建观察者并订阅
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
2.3 选择调用
久而久之,人类喜欢简洁,喜欢定制服务,巧了,RxJava也给你满足了,下面这段代码中,实现的方法跟上面的实现方法是对应起来的,大家看参数就知道哪个对应哪个了,你可以通过创建Consumer,不需要实现的方法你可以不写,看上去更简洁,这里我为了方便大家看,都new出来了,Consumer就是消费者的意思,可以理解为消费了onNext等等等事件。这里需要注意的是,创建Consumer过程中,如果没有将第二个Throwable的Consumer创建出来,如果此时程序发生异常,程序将会崩溃
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但还是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//对应onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//对应onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//对应onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//对应onSubscribe()
}
});
}
2.4 线程切换
当然Rxjava的使用不仅仅这么简单的事件发送,他还能完成一些业务上的逻辑。比如注册登录操作,正常的逻辑是通过注册去获取用户的Token,然后通过Token进行登录,这个过程涉及到注册需要在子线程去进行网络请求,然后在UI线程中更新界面提示,然后再切换到子线程进行登录操作,最后又得切换到UI线程去更新界面,这一系列的操作,也是可以通过RxJava的线程切换来进行实现,在RxJava中的线程切换特别简单,只要下面这两句话就能自由的在子线程和UI线程中自由切换
public static void main(String[] args) {
//创建被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默认在主线程里执行该方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但还是要保持微笑");
e.onComplete();
}
})
//将被观察者切换到子线程
.subscribeOn(Schedulers.io())
//将观察者切换到主线程 需要在Android环境下运行
.observeOn(AndroidSchedulers.mainThread())
//创建观察者并订阅
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
2.5 线程调度器
说到线程切换,就必须不得不说的是RxJava的线程调度器,其调度器就是Schedulers,在调度器中封装了各式各样的线程提供给我们使用,下面举例其现有的调度器列表
调度器类型 | 效果 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate() | 在当前线程立即开始执行任务 |
Schedulers.io() | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread() | 为每个任务创建一个新线程 |
Schedulers.trampoline() | 当其它排队的任务完成后,在当前线程排队开始执行 |
2.6 事件调度器
RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会导致内存泄漏,这里的管理者我们称为事件调度器(或事件管理者)CompositeDisposable
。Rxjava的事件流发出去后,会返回Disposable
类型的对象,我们可以将该对象添加到事件调度器上,然后进行相关操作,这里的事件调度器我们可以简单的理解为事件的容器
public class Main {
private static CompositeDisposable mRxEvent = new CompositeDisposable();
public static void main(String[] args) {
Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帅");
e.onNext("你值得拥有");
e.onNext("取消关注");
e.onNext("但还是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//对应onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//对应onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//对应onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//对应onSubscribe()
}
});
mRxEvent.add(subscribe);
mRxEvent.clear();
}
}
CompositeDisposable
提供的方法中,都是对事件的管理
- dispose():释放所有事件
- clear():释放所有事件,实现同dispose()
- add():增加某个事件
- addAll():增加所有事件
- remove():移除某个事件并释放
- delete():移除某个事件
2.7 "冷"与"热"
RxJava的事件发射分为冷与热,一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列
2.8 小结
RxJava可以简单的归结为三步骤
- 创建:Rx可以创建事件流和数据流(.create的过程)
- 组合:Rx可以通过操作符的组合变换数据流(.subscribeOn过程和.observeOn过程)
- 监听:Rx可以订阅任何可观察的数据流并执行操作(.subscribe过程)