前言:
Android技能树系列:
Android基础知识
Android技能树 — Android存储路径及IO操作小结
数据结构基础知识
算法基础知识
Rx系列相关
Android技能树 — Rxjava取消订阅小结(1):自带方式
Android技能树 — Rxjava取消订阅小结(2):RxLifeCycle
现在很多项目都在使用Rxjava了,对于RxJava的使用,估计都很熟悉了,但是很多人在使用RxJava的时候容易产生内存泄漏问题,比如我们在用RxJava配合Retrofit的时候,发出请求出去,拿到数据后我们可能会去刷新界面,但是如果这时候网络比较差,返回比较慢,而我们的Activity这时候关闭了,那RxJava当拿到返回的数据的时候去刷新界面就会报空指针异常了。所以我们当Activity关闭的时候,我们这时候如果RxJava还没执行完,我们应该取消订阅。
常用的主要三种方式:(按照⭐️推荐从低到高来介绍)
- 自带取消订阅方式(⭐️)
- RxLifeCycle(⭐️⭐️)
- AutoDispose(⭐️⭐️⭐️)
本文主要讲解RxLifeCycle方式。
RxLifeCycle简述
这里肯定不会简单的介绍如何使用RxLifeCycle,github上面已经写得很清楚了,RxLifecycle github链接,我们主要是看具体的实现原理。
简单使用:
假设我们的Activity是集成RxActivity (PS: 不是一定要继承的,只是说明一种使用情况,具体可以看GitHub)
//手动设定解除订阅的时间:(ps:这里设为onPause的时候解除订阅)
myObservable
.compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscrbe();
/**
自动设定解除订阅的时间:
(ps:比如你是在onStart时候订阅,则自动会在onPause时候解除,
如果在onCreate时候订阅,则会自动在onDestory时候解除)
*/
myObservable
.compose(this.bindToLifecycle())
.subscribe();
在介绍RxLifeCycle之前,先介绍一些基础知识,加深大家的理解。
1 基础知识:
1.1 Subject
我们知道在RxBus中我们使用的是Subject ,因为它既可以是观察者又是被观察者。而Subject有很多种类:子类有PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、SerializedSubject。
具体每种的区别可以看:RxJava中常见的几种Subject
这里我们主要讲解BehaviorSubject。
1.1.1 BehaviorSubject
Subject that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.
大意是BehaviorSubject会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值(如果有的话)。
1.2 Hot Observable 和 Cold Observable
正好上面讲到了Subject,顺带提一下冷热Observable。和RxLifeCycle关系不大,但是可以当了解,不想看的可以跳过 1. 2 基础知识。
所谓的冷热和我们单例模式中的饿汉式和饱汉式有一点点像,冷Observable需要有订阅者的时候才开始发射数据(有点像饱汉式),热Observable并不是一定需要订阅者才开始发射数据(有点像饿汉式)。
PS: 大家也可以具体参考文章拥抱 RxJava(三):关于 Observable 的冷热,常见的封装方式以及误区,一些图片及说明我这边也直接引用该文章。
1.2.1 Cold Observable :
我们常见的工厂方法提供的都是Cold Observable,包括just(),fromXX,create(),interval(),defer()。 他们有订阅者的时候才会发射数据,并且他们的共同点是当你有多个Subscriber的时候,他们的事件是独立的。
Observable interval = Observable.interval(1,TimeUnit.SECONDS);
1.2.2 Hot Observable
不同于Cold Observable, Hot Observable是共享数据的。对于Hot Observable的所有subscriber,他们会在同一时刻收到相同的数据。我们通常使用publish()操作符来将Cold Observable变为Hot。或者我们在RxBus中常常用到的Subjects 也是Hot Observable。
而Hot Observable不需要有订阅者,只需要调用connect()
方法就会开始发送数据,这时候当其他订阅这个Observable的时候,并不会从头开始接受数据。
而常用的Hot Observable 是 ConnectableObservable。
1.3 takeUntil操作符
我们可以看到takeUtil操作符的功能:
在第二个Observable发射一个内容或终止后放弃第一个Observable发射的内容。
所以我们马上就可以想到假设第一个是我们的网络请求接口的Observable , 然后通过takeUntil绑定了一个其他的Observable , 比如我们是要在onDestory时候取消订阅,那只需要在onDestory方法里面使第二个Observable发送一个内容即可。
1.4 Filter操作符
就如同字面意思,起到过滤作用,你写一个条件,只有符合条件的发送信息才会被接收到。
observable.filter(new Predicate<R>() {
@Override
public boolean test(R r) throws Exception {
//根据过滤条件,来决定返回是false/true
return false/true;
}
});
1.5 hide()方法
这个方法在Rxjava 1 里面叫做asObservable() 。可能很多人没用过,主要还是用在Subject。
比如你写了一个Subject,你想暴露出去一个接口让别人使用,你可能会这么写:
public class Test {
//假设是BehaviorSubject
private BehaviorSubject subject = BehaviorSubject.create();
//把Observable这块方面通过方法分享出去,但是又不想整个Subject都分享出去。
public Observable getObservable(){
return ((Observable) subject);
}
//比如你调用play方法,按照要求只能发送1,2,3
public void play(){
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
}
}
但是这么写没啥卵用,只要获取后强制转换就可以:
//又可以发送相关数据
((BehaviorSubject) getObservable()).onNext(99999);
所以这时候需要使用asObservable方法了:这实际上只是将您的主题封装在一个可观察的对象中,这使得消费代码无法将其转换回主题,asObservable是隐藏实现细节的一种防御机制。
//改变暴露的方法:
public Observable getObservable(){
return subject.asObservable();
}
//这时候就算你强转也没用,会报错,因为这时候通过asObservable获取到的对象已经不是Subject对象了。
((BehaviorSubject) getObservable()).onNext(99999);
而在Rxjava 2 中只是把这个asObservable 方法改成了 hide方法而已。用法相同。
1.6 Transformer
Tramsformer有很多种:ObservableTransformer,FlowableTransformer,SingleTransformer,MaybeTransformer,CompletableTransformer。
我们这里已ObservableTransformer为例:
ObservableTransformer其实可以理解为Observable 转换器:可以通过它将一种类型的Observable转换成另一种类型的Observable。
比如平常时候每个observable我们都需要写上这段代码::
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(XXXXX);
明明知道大部分的observable要使用的是
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
所以有些人就会想到我写一个方法:
public Observable defaultSet(Observable ob){
return ob.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
当然这么写是没问题,但是你每个请求都要求用defaultSet方法包住了:
defaultSet(observable)
.subScribe(XXXXXX);
没有了链式调用的清爽了,所以这时候ObservableTransformer 就出现了:
public class Transformer {
public static <T> ObservableTransformer<T, T> switchSchedulers() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
}
这时候我们只需要调用:
observable.compose(Transformer.<Object>switchSchedulers()).subScribe(XXXX);
所以我们知道了,我们想把一个Observable转变成另外一个Observable可以使用ObservableTransformer。
1.7 combineLatest 操作符
两个Observable发射,合并每个Observable发射的最新内容,然后发出去,看下面的图片就很清楚。
1.7 take 与 skip
take操作符:
只发出Observable发出的前n个item。
skip操作符:
压制Observable发出的前n个item。
1.8 map
通过对每个item应用函数来转换Observable发出的item
1.9 catch
在Observable发射数据时,有时发送onError通知,导致观察者不能正常接收数据。可是,有时我们希望对Observable发射的onError通知做出响应或者从错误中恢复。具体主要有三种不同操作符来实现:
- onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止。
- onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列。
- onExceptionResumeNext:让Observable在遇到错误时继续发射后面的数据项。
具体描述可以参考:RxJava之错误处理
2 RxLife源码解析
我们已Activity中取消订阅为例:
RxActivity.java(代码说明具体查看源码里面的备注):
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
//创建一个BehaviorSubject,用来做takeUntil中的第二个Observable,让其在核实的生命周期发送信息。
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
@Override
@NonNull
@CheckResult
public final Observable<ActivityEvent> lifecycle() {
//使用hide()方法把这个subject暴露出去
return lifecycleSubject.hide();
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
@Override
@CallSuper
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleSubject.onNext(ActivityEvent.CREATE);
}
@Override
@CallSuper
protected void onStart() {
super.onStart();
//在onStart时候发送信息
lifecycleSubject.onNext(ActivityEvent.START);
}
@Override
@CallSuper
protected void onResume() {
super.onResume();
//在onResume时候发送信息
lifecycleSubject.onNext(ActivityEvent.RESUME);
}
@Override
@CallSuper
protected void onPause() {
//在onPause时候发送信息
lifecycleSubject.onNext(ActivityEvent.PAUSE);
super.onPause();
}
@Override
@CallSuper
protected void onStop() {
//在onStop时候发送信息
lifecycleSubject.onNext(ActivityEvent.STOP);
super.onStop();
}
@Override
@CallSuper
protected void onDestroy() {
//在onDestroy时候发送信息
lifecycleSubject.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
同时我们也注意到一个小细节:
在onCreate , onStart , onResume的时候,都是先调用super.XXX, 然后再用subject 发送相关Event;但是在 onPause , onStop , onDestory 里面却是先用subject 发送相关Event,然后再调用super.XXXX。为啥会有这个区别。因为一般取消订阅都是在onPause,onStop,onDestory情形下,所以优先先取消订阅,再去执行系统自己的操作。比如onDestory,先去取消订阅,再去执行super.onDestory方法。
2.1 手动设定取消订阅时间
我们先来讲解手动设定某个生命周期作为取消订阅,我们知道主要是使用:
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
我们通过上面的基础知识,应该知道我们的目的是把我们自己的Obsevable和RxActivity里面的BehaviorSubject通过takeUntil绑定在一起,因为RxActivity里面所有的生命周期都发送了相应的ActivityEvent事件,所以我们需要使用filter来过滤掉不是我们关心的生命周期事件 ,最后通过ObservableTransformer来把我们的Observable进行转换成这个合成好的《Observable & BehaviorSubject》。
所以我们整体思路知道了,我们来具体看bindUntilEvent源码:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle, @Nonnull final R event) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(event, "event == null");
return bind(takeUntilEvent(lifecycle, event));
}
可以看到主要是bind和 takeUntilEvent二个方法,我们先看takeUntilEvent方法:
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
因为我们前面提过,我们在生命周期中都会让subject发送相应的ActivityEvent事件,所以我们这里只是把这个subject通过filter过滤,然后只发送我们指定的生命周期。
我们再来看bind方法,这时候就知道bind方法的目的是为了帮我们的Observable和这个已经使用过filter的subject进行绑定并返回:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
return new LifecycleTransformer<>(lifecycle);
}
返回了LifecycleTransformer:
public final class LifecycleTransformer<T> implements
ObservableTransformer<T, T>,
FlowableTransformer<T, T>,
SingleTransformer<T, T>,
MaybeTransformer<T, T>,
CompletableTransformer
我们可以看到LifecycleTransformer实现了很多Transformer,因为这样我们使用Observable或者Single等都可以来进行转换。比如我们是Observable,那我们就会调用LifecycleTransformer里面实现的的ObservableTransformer对应的apply方法:
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(observable);
}
我们看到果然调用了takeUntil,把我们的Observable通过takeUntil与已经处理好指定ActivityEvent的subject进行绑定。
最终我们只需要:
myObservable.compose(bindUntilEvent(ActivityEvent.PAUSE));
2.1 自动设定取消订阅时间
自动取消订阅代码:
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
我们可以看到,大致其实和手动指定生命周期的是一样的,唯一的区别就是我们要根据我们设置订阅事件的生命周期推算出相对于的取消订阅生命周期。
我们来看bindActivity源码:
@NonNull
@CheckResult
public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
return bind(lifecycle, ACTIVITY_LIFECYCLE);
}
还是老样子,bind最后肯定是返回一个LifecycleTransformer:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(correspondingEvents, "correspondingEvents == null");
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
先看takeUntilCorrespondingEvent方法:
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
我们先来看combineLatest里面的二个Observable:
-
lifecycle.take(1).map(correspondingEvents):
比如我们在oncreate里面注册了订阅,我们这时候就要告诉系统我们要在onDestory里面进行取消订阅,所以我们要先take(1)获取第一个(因为onstart,onresume等都会发送相应的ActivityEvent),然后通过map操作符来转换成相对的ActivityEvent:
private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
new Function<ActivityEvent, ActivityEvent>() {
@Override
public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
switch (lastEvent) {
case CREATE:
return ActivityEvent.DESTROY;
case START:
return ActivityEvent.STOP;
case RESUME:
return ActivityEvent.PAUSE;
case PAUSE:
return ActivityEvent.STOP;
case STOP:
return ActivityEvent.DESTROY;
case DESTROY:
throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
default:
throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
}
}
};
所以总结就是第一个Observable用来记录我们等会要在那个生命周期去取消订阅。
- lifecycle.skip(1):
既然我们一个Observable是记录了要取消订阅的事件,那我们第二个Observable就是在不同生命周期发送不同事件,当二个事件相同时候就说明要取消订阅了。但是我们第一次要跳过,因为第一个事件是在订阅时候发生的 。
所以总结第二个Observable用来实时发送生命周期的事件。
然后通过combineLatest把二个绑定一起,这时候就会在指定的生命周期时候就会发送true,其余时候发送false,最后配合filter操作符,只有在true的时候才能发送即可。这样最终通过takeUntil再把我们的Observable绑定在一起,然后这时候这里发送true的时候,我们的Observable就会取消订阅了。
事后谈RxLifeCycle:
有些人会问,为什么我使用了RxLifeCycle,就算到了相应生命周期了,还是会调用onComplete方法,因为有些人可能在这个方法里面有相应逻辑处理代码。因为RxLifeCycle主要使用的是takeUntil,所以最后还是会执行onComplete,如果想取消订阅的时候不调用这个,还是可以直接使用原生的Disposable来进行取消订阅。
Why Not RxLifecycle?。这文章的作者就是 RxLifeCycle 的作者 ,说了使用RxLifeCycle会遇到一些窘境 ,而是推荐了AutoDispose: Automatic binding+disposal of RxJava 2 streams.,这是Uber公司的开源Rxjava取消订阅。而RxLifeCycle作者也参与其中,所以一些设计方式也很像,AutoDipose主要是配合了Android的LifeCycle组件。
总结:
emmmmmm.......请多多指教。