Rxlifecycle 源码解析

Rxlifecycle (https://github.com/trello/RxLifecycle)是基于RxJava对Android生命周期管理的一个库。
gradle引用库,components包下结构

implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'

支持Activity、Fragment以及期子类和support包下分别对应,RxFragment和RxActivity内部原理一样只是多了几个Fragment生命周期的方法,选取RxActivity进行查看源码。

public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
    private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
    public final Observable<ActivityEvent> lifecycle() {
        return lifecycleSubject.hide();
    }
    public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
    }
    public final <T> LifecycleTransformer<T> bindToLifecycle() {
        return RxLifecycleAndroid.bindActivity(lifecycleSubject);
    }
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifecycleSubject.onNext(ActivityEvent.CREATE);
    }
    protected void onStart() {
        super.onStart();
        lifecycleSubject.onNext(ActivityEvent.START);
    }
    protected void onResume() {
        super.onResume();
        lifecycleSubject.onNext(ActivityEvent.RESUME);
    }
    protected void onPause() {
        lifecycleSubject.onNext(ActivityEvent.PAUSE);
        super.onPause();
    }
    protected void onStop() {
        lifecycleSubject.onNext(ActivityEvent.STOP);
        super.onStop();
    }
    protected void onDestroy() {
        lifecycleSubject.onNext(ActivityEvent.DESTROY);
        super.onDestroy();
    }
}

RxActivity中一个BehaviorSubject对象,三个公共方法,还有根据Activity生命周期发射ActivityEvent事件。
首先分析BehaviorSubject有何作用

public final class BehaviorSubject<T> extends Subject<T> {
    ...
}
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    ...
}

BehaviorSubject继承Subject,Subject集成Observable实现Observer,Subject的官方介绍:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。也就是Subject既可以作为被观察者,也可以作为观察者。
BehaviorSubject
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

public void testBehavior() {
    final BehaviorSubject<Integer> lifecycleSubject = BehaviorSubject.createDefault(99);
    lifecycleSubject.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("accept()--" + integer);//此时还没有收到任何数据,所以收到默认值99
            //然后会受到后续发射的所有值0,1,2,3
        }
    });
    lifecycleSubject.onNext(0);
    lifecycleSubject.onNext(1);
    lifecycleSubject.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("after 1 accept()--" + integer);//只能收到最近发射的数据1
            //然后会受到后续发射的所有值2,3
        }
    });
    lifecycleSubject.onNext(2);
    lifecycleSubject.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("after 2 accept()--" + integer);//只能收到最近发射的数据2
            //然后会受到后续发射的所有值3
        }
    });
    lifecycleSubject.onNext(3);
}

RxActivity 中lifecycleSubject 在Activity生命周期的每个方法都发送了ActivityEvent数据,此时订阅lifecycleSubject 就可以收到Activity周期方法的回调
查看RxActivity中的三个方法,Observable<ActivityEvent> lifecycle(),LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event),LifecycleTransformer<T> bindToLifecycle()
首先来看Observable<ActivityEvent> lifecycle()方法

public final Observable<ActivityEvent> lifecycle() {
    return lifecycleSubject.hide();
}

Observable<T> hide()方法官方解释隐藏这个Observable的身份及其一次性。即lifecycle()方法返回一个Observable对象然后来看LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event)方法

public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
    //调用RxLifecycle.bindUntilEvent(lifecycleSubject, event)方法
    return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
                                                            @Nonnull final R event) {
    //先调用takeUntilEvent(lifecycle, event)
    //然后调用bind(@Nonnull final Observable<R> lifecycle) 
    return bind(takeUntilEvent(lifecycle, event));
}
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
    //filter操作符根据ActivityEvent 进行过滤,在指定的ActivityEvent 事件时才往下游执行
    return lifecycle.filter(new Predicate<R>() {
        @Override
        public boolean test(R lifecycleEvent) throws Exception {
            return lifecycleEvent.equals(event);
        }
    });
}
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
    //把Observable转为LifecycleTransformer,返回LifecycleTransformer对象
    return new LifecycleTransformer<>(lifecycle);
}

bindUntilEvent(@NonNull ActivityEvent event) 最终返回一个根据ActivityEvent 过滤的Observable转换为LifecycleTransformer对象,LifecycleTransformer实现了ObservableTransformer。
bindUntilEvent常用于网络请求,然后监听Activity onDestroy()时,结束网络请求,避免Activity销毁后网络请求返回成功刷新界面View空指针。模拟网络请求代码,10秒后返回数据

    BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
    Observable.timer(10, TimeUnit.SECONDS)
            .compose(RxLifecycle.bindUntilEvent(lifecycleSubject, ActivityEvent.DESTROY))
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    //返回网络请求数据
                }
            });
    lifecycleSubject.onNext(ActivityEvent.DESTROY);

然后lifecycleSubject发射一个ActivityEvent.DESTROY事件,然后网络回调事件就不会在执行下去。那么为什么compose(LifecycleTransformer)后就能停止事件不在执行呢?首先看一下compose操作符

public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
    return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}

传入一个ObservableTransformer,然后调用ObservableTransformer.apply(Observable)方法。然后调用wrap(ObservableSource<T> source)方法,返回Observable对象

public static <T> Observable<T> wrap(ObservableSource<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    if (source instanceof Observable) {
        return RxJavaPlugins.onAssembly((Observable<T>)source);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}

wrap方法最终返回真实ObservableFromUnsafeSource对象
此时来分析观察者对象Observer事件为何不会执行下去。订阅事件最终会调用subscribeActual(Observer)抽象方法,此方法的实现在真实Observable类型中。从wrap方法得知真实的Observable即为ObservableFromUnsafeSource。

public final class ObservableFromUnsafeSource<T> extends Observable<T> {
    final ObservableSource<T> source;
    public ObservableFromUnsafeSource(ObservableSource<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(observer);
    }
}

ObservableFromUnsafeSource中的subscribeActual方法,又调用了ObservableSource.subscribe方法,那么ObservableSource对象又是什么呢?再来看wrap方法传入的参数即ObservableTransformer.apply(Observable)方法返回的对象。此ObservableTransformer对象即LifecycleTransformer对象,那么我们看一下LifecycleTransformer源码。

//精简之后的代码
public final class LifecycleTransformer<T> implements ObservableTransformer<T, T> {
    final Observable<?> observable;
    LifecycleTransformer(Observable<?> observable) {
        checkNotNull(observable, "observable == null");
        this.observable = observable;
    }
    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return upstream.takeUntil(observable);
    }
}

这里边有两个Observable对象,回想一下ObservableTransformer.apply(Observable)中Observable传入的是this,即最原始的被观察者对象,也就是Observable<T> upstream,Observable<?> observable对象就是BehaviorSubject<ActivityEvent>。此时监听Activity生命周期BehaviorSubject对象就和原始被观察者对象(即网络请求Observable)就被关联在一起了,然后用原始被观察者对象调用takeUntil()方法,传入的对象为BehaviorSubject。我们来看一下takeUntil()操作的官方解释
TakeUntil是使用一个标志Observable是否发射数据来判断,当标志Observable没有发射数据时,正常发射数据,而一旦标志Observable发射过了数据则后面的数据都会被丢弃。
此时分析,网络请求Observable对象使用TakeUntil操作符关联BehaviorSubject对象,BehaviorSubject对象发射了数据,然后网络请求Observable则丢失数据,即不在调用观察者Observer,BehaviorSubject过滤了ActivityEvent事件,只有事件为ActivityEvent.DESTROY时,BehaviorSubject才发射,所以当Activity关闭时,BehaviorSubject发射ActivityEvent.DESTROY事件,网络请求Observable丢失数据,不在调用观察者Observer,整个网络请求和Activity生命周期绑定完成。


最后来看RxActivity最后一个LifecycleTransformer<T> bindToLifecycle()方法

public final <T> LifecycleTransformer<T> bindToLifecycle() {
    return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
    return bind(lifecycle, ACTIVITY_LIFECYCLE);
}

然后看一下ACTIVITY_LIFECYCLE

private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
    new Function<ActivityEvent, ActivityEvent>() {
        @Override
        public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
            switch (lastEvent) {
                case CREATE:
                    return ActivityEvent.DESTROY;
                case START:
                    return ActivityEvent.STOP;
                case RESUME:
                    return ActivityEvent.PAUSE;
                case PAUSE:
                    return ActivityEvent.STOP;
                case STOP:
                    return ActivityEvent.DESTROY;
                case DESTROY:
                    throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
                default:
                    throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
            }
        }
    };

Function只是一个接口,调用apply(T)方法,返回泛型R,是一个类型转换的方法

public interface Function<T, R> {
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

ACTIVITY_LIFECYCLE 中根据Activity生命周期一一对应做了ActivityEvent 转换。

public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
                                                  @Nonnull final Function<R, R> correspondingEvents) {
    return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
                                                                   final Function<R, R> correspondingEvents) {
    return Observable.combineLatest(
        lifecycle.take(1).map(correspondingEvents),
        lifecycle.skip(1),
        new BiFunction<R, R, Boolean>() {
            @Override
            public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
                return lifecycleEvent.equals(bindUntilEvent);
            }
        })
        .onErrorReturn(Functions.RESUME_FUNCTION)
        .filter(Functions.SHOULD_COMPLETE);
}

我们来看map操作符,最终调用MapObserver

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;
    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }
  @Override
    public void onNext(T t) {
           U v;
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
         actual.onNext(v);
    }
}

其中mapper为Function对象,onNext方法中调用mapper.apply(t),做了一个转换,即上面的ACTIVITY_LIFECYCLE ,ActivityEvent.CREATE和ActivityEvent.DESTROY的一个转换。
take()只发射前面的N项数据,忽略剩余的数据,take(1)即只发射前面一项数据
skip()抑制Observable发射的前N项数据,只保留之后的数据,skip(1)即忽略发射的前1项数据
combineLatest()操作符在原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
最终调用 bind(@Nonnull final Observable<R> lifecycle)方法

public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
    return new LifecycleTransformer<>(lifecycle);
}

LifecycleTransformer<T> bindToLifecycle()方法作用和LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event)方法是一样的,只是根据ActivityEvent事件做了一个转换。
加入网络请求在onStart调用,会在onStop停止。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容