-
简述
因为Android UI组件通常会频繁性的创建销毁,在搭配异步操作时,子线程持有UI组件引用,而子线程通常用来执行耗时操作,当子线程运行期间UI组件被销毁时,UI组件实例会因为被引用而无法被释放内存占用,所以很容易导致内存泄露。
之前分析过RxJava和Retrofit结合使用的源码,其中最后调用了compose(ObservableTransFormer)方法,传入lifecycleProvider.bindToLifecycle()实现和Android UI组件生命周期绑定,以解决UI组件内存泄漏问题,这篇文章就来分析一下是如何实现的。
-
源码分析
前面分析过,ViewModel中调用subscribe的是一个compose返回的Observable,我们就从这里开始分析:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) { return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this)); }
wrap:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> wrap(ObservableSource<T> source) { ObjectHelper.requireNonNull(source, "source is null"); if (source instanceof Observable) { return RxJavaPlugins.onAssembly((Observable<T>)source); } return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source)); }
会返回一个ObservableFromUnsafeSource对象,当我们调用subscribe的时候就是调用它的subscribeActual:
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(observer); }
observer就是前面ViewModel传入的HttpObserver,source是((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this),composer也就是前面传入的lifecycle.bindToLifecycle(),lifecycle是啥,是LifecycleProvider接口的实例,那LifecycleProvider的实现类是什么呢?RxJava针对AndroidUI组件(Activity、Fragment)实现了继承自AppCompatActivity、Fragment等的LifecycleProvider的实现类RxAppCompatActivity、RxFragment等。
RxAppCompatActivity中的bindToLifecycle是:
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create(); @Override @NonNull @CheckResult public final <T> LifecycleTransformer<T> bindToLifecycle() { return RxLifecycleAndroid.bindActivity(lifecycleSubject); }
RxFragment中的是:
private final BehaviorSubject<FragmentEvent> lifecycleSubject = BehaviorSubject.create(); @Override @NonNull @CheckResult public final <T> LifecycleTransformer<T> bindToLifecycle() { return RxLifecycleAndroid.bindFragment(lifecycleSubject); }
BehaviorSubject.create:
@CheckReturnValue @NonNull public static <T> BehaviorSubject<T> create() { return new BehaviorSubject<T>(); }
再看bindActivity和bindFragment:
@NonNull @CheckResult public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) { return bind(lifecycle, ACTIVITY_LIFECYCLE); } @NonNull @CheckResult public static <T> LifecycleTransformer<T> bindFragment(@NonNull final Observable<FragmentEvent> lifecycle) { return bind(lifecycle, FRAGMENT_LIFECYCLE); }
ACTIVITY_LIFECYCLE是:
private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE = new Function<ActivityEvent, ActivityEvent>() { @Override public ActivityEvent apply(ActivityEvent lastEvent) throws Exception { switch (lastEvent) { case CREATE: return ActivityEvent.DESTROY; case START: return ActivityEvent.STOP; case RESUME: return ActivityEvent.PAUSE; case PAUSE: return ActivityEvent.STOP; case STOP: return ActivityEvent.DESTROY; case DESTROY: throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it."); default: throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented"); } } };
FRAGMENT_LIFECYCLE是:
private static final Function<FragmentEvent, FragmentEvent> FRAGMENT_LIFECYCLE = new Function<FragmentEvent, FragmentEvent>() { @Override public FragmentEvent apply(FragmentEvent lastEvent) throws Exception { switch (lastEvent) { case ATTACH: return FragmentEvent.DETACH; case CREATE: return FragmentEvent.DESTROY; case CREATE_VIEW: return FragmentEvent.DESTROY_VIEW; case START: return FragmentEvent.STOP; case RESUME: return FragmentEvent.PAUSE; case PAUSE: return FragmentEvent.STOP; case STOP: return FragmentEvent.DESTROY_VIEW; case DESTROY_VIEW: return FragmentEvent.DESTROY; case DESTROY: return FragmentEvent.DETACH; case DETACH: throw new OutsideLifecycleException("Cannot bind to Fragment lifecycle when outside of it."); default: throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented"); } } };
bind方法:
@Nonnull @CheckReturnValue public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle, @Nonnull final Function<R, R> correspondingEvents) { checkNotNull(lifecycle, "lifecycle == null"); checkNotNull(correspondingEvents, "correspondingEvents == null"); return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents)); } private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle, final Function<R, R> correspondingEvents) { return Observable.combineLatest( lifecycle.take(1).map(correspondingEvents), lifecycle.skip(1), new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }) .onErrorReturn(Functions.RESUME_FUNCTION) .filter(Functions.SHOULD_COMPLETE); }
lifecycle.share()实际上调用的是其父类Observable的share方法:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable<T> share() { return publish().refCount(); }
最终返回的是ObservableRefCount:
@NonNull @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public Observable<T> refCount() { return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(onRefCount())); }
封装的太多了,我们直接看filter返回的:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); }
最终bind(Observable)返回:
@Nonnull @CheckReturnValue public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) { return new LifecycleTransformer<>(lifecycle); }
调用LifecycleTransformer的apply方法:
@Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.takeUntil(observable); }
upstream就是ObservableObserveOn,observable是takeUntilCorrespondingEvent返回的对象。所以看一下ObservableObserveOn的takeUntil(在父类Observable中):
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <U> Observable<T> takeUntil(ObservableSource<U> other) { ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new ObservableTakeUntil<T, U>(this, other)); }
所以ObservableTakeUntil就是ObservableFromUnsafeSource的source,那么在其subscribeActual中就是调用了ObservableTakeUntil的subscribeActual:
@Override public void subscribeActual(Observer<? super T> child) { TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child); child.onSubscribe(parent); other.subscribe(parent.otherObserver); source.subscribe(parent); }
child是自定义的HttpObserver,根据ObservableTakeUntil构造方法追溯到source就是ObservableObserveOn,other是takeUntilCorrespondingEvent返回的对象。
首先看other.subscribe方法,就是调用ObservableFilter的subscribeActual方法,parent.otherObserver是一个new OtherObserver():
@Override public void subscribeActual(Observer<? super T> observer) { source.subscribe(new FilterObserver<T>(observer, predicate)); }
source是调用filter方法的ObservableOnErrorReturn:
@Override public void subscribeActual(Observer<? super T> t) { source.subscribe(new OnErrorReturnObserver<T>(t, valueSupplier)); }
再往上就是:
Observable.combineLatest( lifecycle.take(1).map(correspondingEvents), lifecycle.skip(1), new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } })
@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); } ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); // the queue holds a pair of values so we need to double the capacity int s = bufferSize << 1; return RxJavaPlugins.onAssembly(new ObservableCombineLatest<T, R>(sources, null, combiner, s, false)); }
ObservableCombineLatest的subscribeActual:
@Override @SuppressWarnings("unchecked") public void subscribeActual(Observer<? super R> observer) { ObservableSource<? extends T>[] sources = this.sources; int count = 0; if (sources == null) { sources = new ObservableSource[8]; for (ObservableSource<? extends T> p : sourcesIterable) { if (count == sources.length) { ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)]; System.arraycopy(sources, 0, b, 0, count); sources = b; } sources[count++] = p; } } else { count = sources.length; } if (count == 0) { EmptyDisposable.complete(observer); return; } LatestCoordinator<T, R> lc = new LatestCoordinator<T, R>(observer, combiner, count, bufferSize, delayError); lc.subscribe(sources); }
整理一下它持有的属性,combiner是:
static final class Array2Func<T1, T2, R> implements Function<Object[], R> { final BiFunction<? super T1, ? super T2, ? extends R> f; Array2Func(BiFunction<? super T1, ? super T2, ? extends R> f) { this.f = f; } @SuppressWarnings("unchecked") @Override public R apply(Object[] a) throws Exception { if (a.length != 2) { throw new IllegalArgumentException("Array of size 2 expected but got " + a.length); } return f.apply((T1)a[0], (T2)a[1]); } }
它的f是:
new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }
sources有两个,一个是ObservableMap(通过lifecycle.take(1).map(correspondingEvents)获得),一个是ObservableSkip(通过lifecycle.skip(1)获得)。
最后会走到LatestCoordinator.subscribe:
public void subscribe(ObservableSource<? extends T>[] sources) { Observer<T>[] as = observers; int len = as.length; downstream.onSubscribe(this); for (int i = 0; i < len; i++) { if (done || cancelled) { return; } sources[i].subscribe(as[i]); } }
downstream.onSubscribe会沿着调用链最终调用HttpObserver中的onSubscribe里,这里就不贴了,主要看一下sources[i].subscribe(as[i])。as是在构造时生成的一个同sources一样长度的数组:
CombinerObserver<T, R>[] as = new CombinerObserver[count]; for (int i = 0; i < count; i++) { as[i] = new CombinerObserver<T, R>(this, i); }
先看第一个source,ObservableMap:
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
它的source是ObservableTake:
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new TakeObserver<T>(observer, limit)); }
而它的source又是ObservableRefCount:
@Override protected void subscribeActual(Observer<? super T> observer) { RefConnection conn; boolean connect = false; synchronized (this) { conn = connection; if (conn == null) { conn = new RefConnection(this); connection = conn; } long c = conn.subscriberCount; if (c == 0L && conn.timer != null) { conn.timer.dispose(); } conn.subscriberCount = c + 1; if (!conn.connected && c + 1 == n) { connect = true; conn.connected = true; } } source.subscribe(new RefCountObserver<T>(observer, this, conn)); if (connect) { source.connect(conn); } }
它的source是ObservablePublish:
@Override protected void subscribeActual(Observer<? super T> observer) { onSubscribe.subscribe(observer); }
onSubscribe是PublishSource:
@Override public void subscribe(Observer<? super T> child) { // create the backpressure-managing producer for this child InnerDisposable<T> inner = new InnerDisposable<T>(child); child.onSubscribe(inner); // concurrent connection/disconnection may change the state, // we loop to be atomic while the child subscribes for (;;) { // get the current subscriber-to-source PublishObserver<T> r = curr.get(); // if there isn't one or it is disposed if (r == null || r.isDisposed()) { // create a new subscriber to source PublishObserver<T> u = new PublishObserver<T>(curr); // let's try setting it as the current subscriber-to-source if (!curr.compareAndSet(r, u)) { // didn't work, maybe someone else did it or the current subscriber // to source has just finished continue; } // we won, let's use it going onwards r = u; } /* * Try adding it to the current subscriber-to-source, add is atomic in respect * to other adds and the termination of the subscriber-to-source. */ if (r.add(inner)) { inner.setParent(r); break; // NOPMD } /* * The current PublishObserver has been terminated, try with a newer one. */ /* * Note: although technically correct, concurrent disconnects can cause * unexpected behavior such as child observers never receiving anything * (unless connected again). An alternative approach, similar to * PublishSubject would be to immediately terminate such child * observers as well: * * Object term = r.terminalEvent; * if (r.nl.isCompleted(term)) { * child.onComplete(); * } else { * child.onError(r.nl.getError(term)); * } * return; * * The original concurrent behavior was non-deterministic in this regard as well. * Allowing this behavior, however, may introduce another unexpected behavior: * after disconnecting a previous connection, one might not be able to prepare * a new connection right after a previous termination by subscribing new child * observers asynchronously before a connect call. */ } }
很遗憾,走到这里就结束了,并没有发现和我们主题相关的验证代码,我们得再回到ObservableRefCount的subscribeActual,下面会执行source.connect(conn),这里的source是ObservablePublish,它的connect方法如下:
@Override public void connect(Consumer<? super Disposable> connection) { boolean doConnect; PublishObserver<T> ps; // we loop because concurrent connect/disconnect and termination may change the state for (;;) { // retrieve the current subscriber-to-source instance ps = current.get(); // if there is none yet or the current has been disposed if (ps == null || ps.isDisposed()) { // create a new subscriber-to-source PublishObserver<T> u = new PublishObserver<T>(current); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived // and created a new subscriber-to-source as well, retry continue; } ps = u; } // if connect() was called concurrently, only one of them should actually // connect to the source doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); break; // NOPMD } /* * Notify the callback that we have a (new) connection which it can dispose * but since ps is unique to a connection, multiple calls to connect() will return the * same Disposable and even if there was a connect-disconnect-connect pair, the older * references won't disconnect the newer connection. * Synchronous source consumers have the opportunity to disconnect via dispose on the * Disposable as subscribe() may never return in its own. * * Note however, that asynchronously disconnecting a running source might leave * child observers without any terminal event; PublishSubject does not have this * issue because the dispose() was always triggered by the child observers * themselves. */ try { connection.accept(ps); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } if (doConnect) { source.subscribe(ps); } }
最终会走到source.subscribe(ps),source是调用share()的那个对象,也就是BehaviorSubject:
@Override protected void subscribeActual(Observer<? super T> observer) { BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this); observer.onSubscribe(bs); if (add(bs)) { if (bs.cancelled) { remove(bs); } else { bs.emitFirst(); } } else { Throwable ex = terminalEvent.get(); if (ex == ExceptionHelper.TERMINATED) { observer.onComplete(); } else { observer.onError(ex); } } }
关键代码bs.emitFirst():
void emitFirst() { if (cancelled) { return; } Object o; synchronized (this) { if (cancelled) { return; } if (next) { return; } BehaviorSubject<T> s = state; Lock lock = s.readLock; lock.lock(); index = s.index; o = s.value.get(); lock.unlock(); emitting = o != null; next = true; } if (o != null) { if (test(o)) { return; } emitLoop(); } }
关键代码test(o),往下看:
@Override public boolean test(Object o) { return cancelled || NotificationLite.accept(o, downstream); }
NotificationLite.accept(o, downstream)中:
@SuppressWarnings("unchecked") public static <T> boolean accept(Object o, Observer<? super T> observer) { if (o == COMPLETE) { observer.onComplete(); return true; } else if (o instanceof ErrorNotification) { observer.onError(((ErrorNotification)o).e); return true; } observer.onNext((T)o); return false; }
observer.onNext((T)o),observer就是ps,ps就是PublishObserver:
@Override public void onNext(T t) { for (InnerDisposable<T> inner : observers.get()) { inner.child.onNext(t); } }
observers是什么时候赋值的呢,就是在source.connect之前的subscribe的调用中:
if (r.add(inner)) { inner.setParent(r); break; // NOPMD }
最终经过RefCountObserver、TakeObserver、MapObserver会回调到MapObserver的onNext中:
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); }
根据ObservableMap的subscribeActual中MapObserver的构造方法可知mapper就是function,function就是前面构造ObservableMap时map方法的参数correspondingEvents,correspondingEvents就是ACTIVITY_LIFECYCLE或FRAGMENT_LIFECYCLE,所以把t代入到他们的apply方法中,test(o)的o是什么,发现它来自this.value,this.value在setCurrent中通过value.lazySet(o)存值,setCurrent在BehaviorSubject的onNext中调用,onNext又在哪调用呢?看一下RxAppCompatActivity:
public abstract class RxAppCompatActivity extends AppCompatActivity implements LifecycleProvider<ActivityEvent> { private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create(); @Override @NonNull @CheckResult public final Observable<ActivityEvent> lifecycle() { return lifecycleSubject.hide(); } @Override @NonNull @CheckResult public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) { return RxLifecycle.bindUntilEvent(lifecycleSubject, event); } @Override @NonNull @CheckResult public final <T> LifecycleTransformer<T> bindToLifecycle() { return RxLifecycleAndroid.bindActivity(lifecycleSubject); } @Override @CallSuper protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); lifecycleSubject.onNext(ActivityEvent.CREATE); } @Override @CallSuper protected void onStart() { super.onStart(); lifecycleSubject.onNext(ActivityEvent.START); } @Override @CallSuper protected void onResume() { super.onResume(); lifecycleSubject.onNext(ActivityEvent.RESUME); } @Override @CallSuper protected void onPause() { lifecycleSubject.onNext(ActivityEvent.PAUSE); super.onPause(); } @Override @CallSuper protected void onStop() { lifecycleSubject.onNext(ActivityEvent.STOP); super.onStop(); } @Override @CallSuper protected void onDestroy() { lifecycleSubject.onNext(ActivityEvent.DESTROY); super.onDestroy(); } }
所以伴随着Activity或者Fragment的生命周期方法回调,对应的BehaviorSubject会调用onNext方法:
@Override public void onNext(T t) { ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); if (terminalEvent.get() != null) { return; } Object o = NotificationLite.next(t); setCurrent(o); for (BehaviorDisposable<T> bs : subscribers.get()) { bs.emitNext(o, index); } }
setCurrent的意义也就是保存当前Activity的状态,所以当发送请求的时候,会通过apply方法返回一个Event,然后调用downstream.onNext(event),downstream就是CombinerObserver,它的onNext是:
@Override public void onNext(T t) { parent.innerNext(index, t); }
构造的时候传入的parent是LatestCoordinator.this,所以看一下LatestCoordinator的innerNext:
void innerNext(int index, T item) { boolean shouldDrain = false; synchronized (this) { //初始值空数组,但不是null Object[] latest = this.latest; if (latest == null) { return; } Object o = latest[index]; int a = active; if (o == null) { active = ++a; } //若是第一次,此时active == a == 1,latest.length == 2 latest[index] = item; if (a == latest.length) { //入队列 queue.offer(latest.clone()); shouldDrain = true; } } //drain if (shouldDrain) { drain(); } }
所以因为第一次innerNext的时候latest.length是2(初始化就是和sources长度一致,这里是两个source)而a此时是1,也就是ObservableMap.subscribe的时候是不会进行drain()的,当ObservableSkip调用subscribe第二次进来的时候才会走到drain()。这里记住this.latest有了第一个值,是ACTIVITY_LIFECYCLE的apply返回的值(以Activity为例)。
ObservableSkip的subscribeActual:
@Override public void subscribeActual(Observer<? super T> observer) { source.subscribe(new SkipObserver<T>(observer, n)); }
直接调用其source(ObservableRefCount)的subscribeActual,流程和之前的ObservableMap一样,最终回到CombinerObserver,和之前相比,少了MapObserver的mapper(也就是ACTIVITY_LIFECYCLE或FRAGMENT_LIFECYCLE)的apply操作,所以此时的innerNext中的item就是生命周期组件当前的event。
drain操作:
void drain() { if (getAndIncrement() != 0) { return; } final SpscLinkedArrayQueue<Object[]> q = queue; final Observer<? super R> a = downstream; final boolean delayError = this.delayError; int missed = 1; for (;;) { for (;;) { if (cancelled) { clear(q); return; } if (!delayError && errors.get() != null) { cancelSources(); clear(q); a.onError(errors.terminate()); return; } boolean d = done; Object[] s = q.poll(); boolean empty = s == null; if (d && empty) { clear(q); Throwable ex = errors.terminate(); if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } if (empty) { break; } R v; try { v = ObjectHelper.requireNonNull(combiner.apply(s), "The combiner returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); errors.addThrowable(ex); cancelSources(); clear(q); ex = errors.terminate(); a.onError(ex); return; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
看到前面的验证通过后有一句combiner.apply(s),combiner是前面的Functions.toFunction(combiner),Functions.toFunction(combiner)返回的是Array2Func:
static final class Array2Func<T1, T2, R> implements Function<Object[], R> { final BiFunction<? super T1, ? super T2, ? extends R> f; Array2Func(BiFunction<? super T1, ? super T2, ? extends R> f) { this.f = f; } @SuppressWarnings("unchecked") @Override public R apply(Object[] a) throws Exception { if (a.length != 2) { throw new IllegalArgumentException("Array of size 2 expected but got " + a.length); } return f.apply((T1)a[0], (T2)a[1]); } }
持有的f是Observable.combineLatest()方法的第三个参数,传入的是:
new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } })
a.onNext(v)往下传到了FilterObserver中的onNext中:
@Override public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { downstream.onNext(t); } } else { downstream.onNext(null); } }
filter为:
static final Predicate<Boolean> SHOULD_COMPLETE = new Predicate<Boolean>() { @Override public boolean test(Boolean shouldComplete) throws Exception { return shouldComplete; } };
没有额外操作直接返回apply()传进来的参数。
这个时候如果apply判断为false,则说明当前状态还不是需要被取消的状态,则到此这个调用链就结束了,还记得调用开始的地方吗?就是ObservableTakeUntil的subscribeActual中的other.subscribe(parent.otherObserver):
@Override public void subscribeActual(Observer<? super T> child) { TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child); child.onSubscribe(parent); other.subscribe(parent.otherObserver); source.subscribe(parent); }
接下来就会去执行下面的source.subscribe(parent)了,就是正常订阅接收回调的过程了。
如果apply判断为true,则说明当前状态是需要被取消订阅的状态了,上面的代码也就是b是true,就会执行downstream.onNext(t)操作,这个downstream就是上面的parent.otherObserver,parent是TakeUntilMainObserver,otherOnserver在它的构造方法里创建:
TakeUntilMainObserver(Observer<? super T> downstream) { this.downstream = downstream; this.upstream = new AtomicReference<Disposable>(); this.otherObserver = new OtherObserver(); this.error = new AtomicThrowable(); }
所以downstream.onNext(t)就是OtherObserver的onNext方法:
@Override public void onNext(U t) { DisposableHelper.dispose(this); otherComplete(); }
void otherComplete() { DisposableHelper.dispose(upstream); HalfSerializer.onComplete(downstream, this, error); }
可以看到,在这里使用dispose取消了订阅。
-
总结
通过前面的分析,我们可以得到这样的结论:
- compose创建了一个ObservableFromUnsafeSource对象,我们调用它的subscribe方法开启一个异步调用;
- 它的subscribe方法实际上调用的是((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this)的这个对象的subscribe方法,也就是composer的subscribe方法;
- composer是通过lifecycle.bindToLifecycle()创建的,lifecycle对应RxAppCompatActivity,bindToLifecycle()里通过RxLifecycleAndroid.bindActivity(lifecycleSubject)得到LifecycleTransformer<>(lifecycle);
- LifecycleTransformer的apply返回一个ObservableTakeUntil对象,它的source就是compose的调用者ObservableObserveOn,other就是RxLifecycleAndroid.bindActivity的调用链最终返回的ObservableFilter对象;
- ObservableFilter的调用链是:ObservableCombineLatest->ObservableOnErrorReturn->ObservableFilter;
- ObservableFromUnsafeSource的subscribeActual中source就是ObservableTakeUntil,它的subsribeActual中先后调用了other.subscribe和source.subscribe(parent);
- other.subscribe可以理解为验证当前订阅是否还在有效生命周期范围内,如果不在则进行dispose操作;
- source.subscribe(parent)在执行过程中如果检查到已经dispose了则结束调用,从而释放UI组件引用,达到解决内存泄露的目的。
RxJava网络异步调用绑定UI生命周期
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 目录 【RxJava】- 创建操作符源码分析【RxJava】- 变换操作符源码分析【RxJava】- 过滤操作符源...
- 本文的所有分析都是基于 RxJava2 进行的。以下的 RxJava 指 RxJava2阅读本文你将会知道: Rx...
- 1、什么是响应式编程 ? 部分内容参考自:《RxJava Essentials 中文版》by yuxingxin ...