RxJava lift()原理

lift()方法是RxJava中所有操作符的基础,可以通过它做各种各样的变化。弄清楚它的原理,也方便我们理解其他操作符。首先先看几个相关接口。

Func1 接口

public interface Func1<T, R> extends Function {
    R call(T t);
}

Func1接口会按照泛型参数的顺序传入T,并返回R

Operator 接口

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>

按照Func1接口的定义,Operator接口会传入一个Subscriber<? super R>参数,并返回一个Subscriber<? super T>

关于Operator和lift()中泛型顺序的问题

也许有人(is me)第一眼看到Observable<T>Operator<R, T>Func1<T, R>这几个类的泛型参数,头都大了,关键是Operator的泛型参数顺序为什么是R, T,而不是T, R

其实这里不需要关心顺序是什么,只需要记住Operator<R, T>是按照泛型参数的顺序,传入一个Subscriber<R>参数,并返回一个Subscriber<T>,写成Operator<A, B>或者Operator<M, N>是没有任何区别的。

lift()调用流程

首先需要记住lift()方法是在一个已有Observable上调用的。

lift()方法核心代码:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = operator.call(o);
            st.onStart();
            // 这里的onSubscribe是调用lift方法的Observable中的onSubscribe
            onSubscribe.call(st);
        }
    });
}

根据代码的调用流程来分析:

1、假设已有一个Observable<T>,调用lift()方法,生成一个Observable<R>,此时就有了两个Observable和两个OnSubscribe对象。

2、然后调用Observable<R>subscribe()方法,传入一个Subscriber<R>对象,此时触发Observable<R>.onSubscribe.call()方法,也就是上面lift()方法中的call()方法。

3、在该方法中会调用onSubscribe.call()方法,注意这个onSubscribeObservable<T>中的那个OnSubscribe<T>对象,它需要传入一个Subscriber<T>对象,这个对象是通过operator.call()方法生成的。正是这个Operator对象将两个Subscriber对象关联起来,OnSubscribe<T>在执行Subscriber<T>.onNext(T t)方法的时候也会执行Subscriber<R>.onNext(R r),而这里从T变成R,正好用到了传到Operator中的参数Func1<T, R>

4、如果具体化一点,上面的Observable<T>就是事件源,对它进行lift()变换得到新的Observable<R>,这个新的Observable的回调已经固定,相当于是一个模板(也就是上面lift()方法中的call()方法)。这时调用subscribe(),传入的Subscriber<R>是用户定义的事件监听者,但它监听的是新的Observable<R>,这个Observable的回调是固定的,它并不能产生新事件,所以得靠事件源Observable<T>。这个时候Operator生成一个中间的Subscriber<T>对象,该对象的作用就是接收事件源的事件,并将事件转给用户定义的Subscriber。这个Subscriber<T>并没有消耗事件,而是起着一个代理的作用。所以Operator可以看做是一个生成代理的工具类。在这个转发过程中有一个数据类型的变化过程,也是通过Operator转换器Func1实现的,想怎样转换数据,也是用户定义后传到Operator中的。

小结

1、我们需要把Observable的调用看做一条

2、对于Observable<T> -> Observable<R>这个变化,订阅者为Subscriber<R>,在subscriber()方法调用后,流的顺序为倒序的,即从Observable<R> -> Observable<T>,因为我们始终需要调用最开始的事件源。为了满足这个需求,会通过Operator<R, T>这个代理工具生成一个代理Subscriber<T>,这也解释了为什么在声明Operator时泛型参数的顺序写为R, T,正好可以和这一变化对应起来,用相同的泛型参数更便于理解。这样准备工作就都做好了。

3、Observable<T>开始向Subscriber<T>发送事件,发送的参数类型为T,这时候通过转换器Func1T变成R,这样就能顺利的通过代理Subscriber<T>将事件发送给Subscriber<R>了。

4、所以流的路线为Observable<R> -> Observable<T> -> Subscriber<T> -> Subscriber<R>。一条线分成两部分,前半部分为准备工作,后半部分为执行操作。

下图是lift()的过程,其中虚线箭头代表生成,实线箭头代表调用。也可以参考 扔物线 - 给 Android 开发者的 RxJava 详解 中的配图。

Image.png

map()方法

map()方法是RxJava中使用lift()最简单的方法,如果上面lift()方法过于抽象,可以通过该方法来加深理解。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

}

看到OperatorMap.call()方法,它直接生成一个新的Subscriber,通过上面的分析可以知道,这是一个代理Subscriber,所以它的onNext()等方法都只是直接调用了外部传进来的Subscriber

举个例子:

Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
        .map(new Func1<Float, Integer>() {
            @Override
            public Integer call(Float aFloat) {
                return Math.round(aFloat);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return Integer.toBinaryString(integer);
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                log("2 map onNext->" + s);
            }
        });

// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001

该例子是一个Float->Integer->String的转换。我们按上面的流程来分析。

1、生成一个Observable<Float>

2、调用map()生成一个Observable<Integer>

3、再调用map()生成一个Observable<String>

4、subscribe()一个Subscriber<String>。至此流的前半部分完成。

5、执行开始,Observable<String>发送事件,先生成一个Subscriber<Integer>传给Observable<Integer>Observable<Integer>.onSubscribe.call())。

6、Observable<Integer>开始发送事件,同样的生成一个Subscriber<Float>传给Observable<Float>Observable<Float>.onSubscribe.call())。

7、真正发送事件开始,Observable<Float>调用Subscriber<Float>.onNext(Float)等方法,同时Subscriber<Integer>.onNext(Integer)被调用,同时Subscriber<String>.onNext(String)被调用,事件发送完成。

8、虽然是流的模型,但其实是一堆内部类和外部类的嵌套关系。

参考资料

给 Android 开发者的 RxJava 详解 - 变换的原理:lift()

谜之RxJava (二) —— Magic Lift

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容