目录
【RxJava】- 创建操作符源码分析
【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 结合操作符源码分析
【RxJava】- 连接操作符源码分析
简介
在Activity中使用RxJava的时候,由于回调,RxJava持有Activity引用。当Activity销毁时,RxJava中的耗时任务还没有完成,如果这时候没有收到调用对应的dispose()方法,那么RxJava中持有的资源得不到释放,从而引起Activity的内存泄露。如果在Activity中手动调用,这样麻烦又不优雅,所以这时候可以使用RxLifecycle来解决。
使用
具体使用可以参考RxLifecycle,下面注意讲解RxLifecycle源码,带你一步步了解RxLifecycle实现的真相。
流程
分析之前,先用RxJava创建一个观察者模型任务。
Observable.create(emitter -> {}).compose(bindToLifecycle()).subscribe();
这里只分析bindToLifecycle()方法中的内容,其它都是RxJava中的操作符。
跟踪到
public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
}
- ACTIVITY_LIFECYCLE:Function实例,对Activity生命周期时间做映射。
- lifecycle:是新创建的BehaviorSubject实例。
下一步
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
...
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents)方法构建一个新的Observable实例,即ObservableFilter对象。·
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
参数lifecycle.share()
public final Observable<T> share() {return publish().refCount();}
public Observable<T> refCount() {
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(onRefCount()));
}
private ConnectableObservable<T> onRefCount() {
if (this instanceof ObservablePublishClassic) {
return RxJavaPlugins.onAssembly(
new ObservablePublishAlt<T>(((ObservablePublishClassic<T>)this).publishSource())
);
}
return this;
}
最后调用bind(@Nonnull final Observable<R> lifecycle)返回一个持有上面创建的Observable实例的LifecycleTransformer对象。
- subscribe()
订阅
发射Activity生命周期事件
发射Activity生命周期事件,封装RxLifecycle中的RxAppCompatActivity类里面(我继承的是RxAppCompatActivity)。比如onCreate:
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
this.lifecycleSubject.onNext(ActivityEvent.CREATE);
}
onNext方法
public void onNext(T t) {
...
Object o = NotificationLite.next(t);
setCurrent(o);
for (BehaviorDisposable<T> bs : subscribers.get()) {
bs.emitNext(o, index);
}
}
subscribers中的值是一个BehaviorDisposable数组,在subscribeActual方法中进行添加。
protected void subscribeActual(Observer<? super T> observer) {
BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
if (add(bs)) {...};
...
}
emitNext方法中调用 test(value)来发射数据,如果已经有数据处于发射中,者将数据保存起来,然后返回。
public boolean test(Object o) {
return cancelled || NotificationLite.accept(o, downstream);
}
public static <T> boolean accept(Object o, Observer<? super T> observer) {
if (o == COMPLETE) {
observer.onComplete();
return true;
} elseif (o instanceof ErrorNotification) {
observer.onError(((ErrorNotification)o).e);
return true;
}
observer.onNext((T)o);
return false;
}
调用
Observable.create(emitter -> {}).compose(bindToLifecycle()).subscribe();
.subscribe()
执行.compose中返回实例的subscribeActual(Observer<? super T> observer)方法,observer是.subscribe()传入的订阅实例,即观察者。-
.compose(bindToLifecycle())
bindToLifecycle()返回的是LifecycleTransformer实例。看一下compose(bindToLifecycle())实现。
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) { return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this)); }
public static <T> Observable<T> wrap(ObservableSource<T> source) { ObjectHelper.requireNonNull(source, "source is null"); if (source instanceof Observable) { return RxJavaPlugins.onAssembly((Observable<T>)source); } return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source)); }
首先调用LifecycleTransformer的apply方法,传入被观察者对象,即.create返回的实例。
apply方法做了什么public ObservableSource<T> apply(Observable<T> upstream) { return upstream.takeUntil(observable); }
对被观察者使用takeUntil操作符,传入值为takeUntilCorrespondingEvent返回的ObservableFilter实例,新创建一个ObservableTakeUntil实例并返回。
由于ObservableTakeUntil是Observable的子类,所以.compose返回ObservableTakeUntil对象。
-
ObservableTakeUntil
接下来就执行到ObservableTakeUntil中的subscribeActual(Observer<? super T> child)方法。ObservableTakeUntil中的两个参数:
- other
takeUntilCorrespondingEvent返回的ObservableFilter实例 - source
被观察者,即.create返回的实例。
public void subscribeActual(Observer<? super T> child) { TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child); child.onSubscribe(parent); other.subscribe(parent.otherObserver); source.subscribe(parent); }
child是观察者实例,即.subscribe()中的订阅者。
调用观察者的onSubscribe方法,传入持有观察者实例TakeUntilMainObserver实例。
调用ObservableFilter的subscribe方法,调用被观察者(即.create返回的实例)中的subscribe方法。前者传入OtherObserver实例,后者传入TakeUntilMainObserver实例。
看一下takeUntilCorrespondingEvent方法:
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle, final Function<R, R> correspondingEvents) { return Observable.combineLatest( lifecycle.take(1).map(correspondingEvents), lifecycle.skip(1), new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }) .onErrorReturn(Functions.RESUME_FUNCTION) .filter(Functions.SHOULD_COMPLETE); }
这里面的逻辑什么触发呢,当在Activity生命周期发射事件时,也就是上面讲的RxAppCompatActivity生命周期发射事件。
.filter返回的是ObservableFilter对象。而ObservableFilter中的downstream是ObservableTakeUntil中的OtherObserver实例。看一下ObservableFilter中的onNext方法。
public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { downstream.onNext(t); } } else { downstream.onNext(null); } }
boolean b = filter.test(t)是使用combineLatest操作符返回的值,规则是,当需要观察的生命周期事件(即ifecycle.take(1))和在Activity中发射的生命周期事件相等时返回true,否则返回false。
比如观察的是onStop生命周期,那么观察的事件就是ActivityEvent.STOP,当Activity调用onStop时,那么filter.test(t)返回true。
downstream.onNext(t)调用ObservableTakeUntil中的onNext方法。
public void onNext(U t) { DisposableHelper.dispose(this); otherComplete(); }
这里基本上就是去清理RxJava的相关数据了。其它情况类似。
- other
bindUntilEvent(@NonNull ActivityEvent event)
指定观察那个生命周期事件。
其它
// If you want pre-written support preference Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle3:rxlifecycle-components-preference:3.1.0'
// If you want to use Android Lifecycle for providers
implementation 'com.trello.rxlifecycle3:rxlifecycle-android-lifecycle:3.1.0'
这两个是在其它情况下事情 ,具体自己看库里面的代码,很少。