终于到了分析源码的部分了。很多朋友在使用过RxJava之后都会觉得这个库很玄妙,竟然能把事件发生的源不停的通过不同的操作符改变。比如说这次介绍的map就是,在抽象的概念上,我们经常要求使用者要把map操作符当成改变源stream的一个方法,也就是说map把整个事件的发射流重新构造了一次。
但是其实map操作符真的创造了一个新的流么?
这个答案是非常明显的,如果RxJava的操作符都是这样要等源发射流全部分析完再重新构造的话(一个map重新遍历一次的话),效率上肯定说不过去。所以我们可以肯定一点,即使再多个map(),我们肯定也是一边发射源stream的元素一边进行map()里面的转换的。RxJava并不是一个新的语言,java没有的它也没有,一切你们看到的操作符,只是对java现有的一些常用api的整合而已。尤其是我们经常用的subscribeOn,observeOn,map,等等,都只是把引用,线程池,ExecutorService玩耍出来的组合。
当然,我的意思不是说RxJava很简单。。。。只是作者们在Java的基础上,非常优雅的把java现有的东西完美的结合起来,使得这个库使用起来就像函数式编程的感觉。
那RxJava到底是怎么优雅的实现的呢?
在我们深入代码之前,我们先来补充一些细节:
1.使用链式调用产生新对象,优雅的让新对象持有源对象的引用的。
比如 下面一个例子:
大家可以看到line 13,这个方法generateNext()返回了一个新的Ob对象,但是把自己-this,传给了新生成的对象,新的对象就获取了源对象的引用了。于是在我不断的链式调用方法generateNext()的时候,虽然最后获取的是最后一个Ob对象,但是我的新对象已经拥有一条完整的,通向源对象的链了。不同于以往我们一般的setNext()这种方法,generateNext()返回一个新的Ob对象,所以程序可以以一个链式的结构写出来。
这里可以提前告诉大家,RxJava里面的Observable就是这样的,每一次链式的调用操作符,比如map,返回的都是一个新的Observable,而新的Observable是持有旧的Observable的引用的(其实也不是整个Observable,是Observable的一个成员对象)。
2.一个Observable所需要的元素
一个Observable需要两个最重要的元素,一个是OnSubscribe对象,一个是Subscriber。前者负责定义整个Observable所要执行的action,后者则是定义怎样处理每一个发射的元素,一般来说后者都是用户自己实现的。
比如以下例子:
这个例子很简单,我们创建了一个发射数字1和2的流,在create方法里面,我们创建了一个新的对象OnSubscribe,它定义了我们的事件发生的顺序的元素的个数种类。而subscribe()方法里面我们创建了一个新的Subscriber对象。让我们看看create()方法做了些啥。
仅仅是return了一个Observable,然后把OnSubscribe对象的引用拿到手而已。
然后每次我们call subscribe()方法的时候发生了啥呢?我们把代码跟踪到subscribe(),
看到了嘛?每次在subscribe()的时候,我们会调用我们Observable里面的onSubscribe对象的call方法,参数就是我们传进subscribe()的subscriber对象(当然严格意义上来讲并不是同一个subscriber,subscribe()方法在前面一点的地方稍微包装了一下我们传进的subscriber,但是我们可以理解为同一个)。
所以总结一句,每一个要发射元素的Observable,都必须会有一个OnSubscribe对象和Subscriber。前者定义执行的顺序、事件,后者处理。
3.分析!
ok,在了解了这些先决知识之后,我们可以深入分析map到底是怎么工作的了。
让我们重点看看图2,lift()方法生成了一个新的Observable,新的Observable是干嘛的?
1.新的Observable的OnSubscribe对象的call()方法里面,我们把用户定义的Subscriber对象,line 156-158 ,通过我们的转换操作符map,转换成了一个新的subscriber
2.生成新的subscriber,传给上一个Observable的onSubscribe对象执行call(). 大家一定要仔细看清楚,line 162的这个OnSubscribe,是新的Observable的对象还是旧的Observable的对象。
简单的用图来解释一下可能更清楚:
新生成的Observable会在用户call subscribe()的时候,把原有的,用户自己定义的subscriber修改了之后(lift()里面做的),传给上一个Observable的OnSubscribe对象处理。
那么在第一步的时候,我们对用户原有的subscriber动了什么手脚?
这是第二重要的点,用户原有的subscriber会在lift()里面,被map这个Operator改造
这个Operator就是OperatorMap。让我们看看这个Operator的call方法究竟做了些啥,让原有的subscriber变成一个怎样的新的subscriber。
乍一看!好简单。
的确,map这个操作符的确很简单,让我们来看看它把原来的,用户定义的subscriber怎么改造了。
1.line 39, 我们可以看到call()方法返回一个新的subscriber
2.line 54, 这个o对象是啥?没错,就是原来旧的subscriber,用户自己创建的subscriber。
3.line 54, transformer就是我们转换的方法,也就是map()里面的Func1对象。
也就是说,新的subscriber每次在执行onNext的时候,其实是把上一层Observable发射的元素先用map里面在转换方程转换成新的类型,然后再用下一层的subscriber去发射新的类型的元素。
比如我们把一组Integer转换成String对象,在上图里面的新的Subscriber的泛型T就是Integer,旧的Susbcriber(用户自己定义的Subscriber)的泛型就是R。
这里就很有意思了,这个过程里面发生了很多有趣的事情。
我们在subscribe()之后,会把原有的subscriber不停的包装,每map()一次就会包装一次,一路传给最顶端,最原始的Observable,然后在发射元素的时候,会不停的调用转化方程,发送给下一级的subscriber。
为什么说这个过程很有趣呢?有趣的地方在于,这个过程像在爬山一样,最开始我们要一路往上走,最下面的Observable持有上一层的Observable的引用,而在最顶端的原始Observable要发射元素的时候,subscriber已经是包装过好多层的subscriber了,执行onNext() 的时候,会一路往下调用下一层的subscriber的onNext().,所以回到我们在文章最开头讲的,每一次一个元素在发射之后,我们会即刻执行该元素的转换(也就是不停的执行下一级的onNext()),而不是重新构造一个新的流。
换句话说,Observable的执行链一路向上,Subscriber的执行链一路向下,他们产生链的方向恰好是相反的。
不过想想也是合情合理的,因为从逻辑上来讲,我们create的一个Observable的是一个原始的Observable,只拥有原始数据,但是用户却是提供的一个不同参数类型的,最终类型版本的Subscriber,原本方向就是相反的。
让我们再把思维拓展一下,发生多层map的过程是咋样的:
再看完整篇文章之后,再来体会一下这张图,是不是豁然开朗了呢?产生新的Observable向下,产生新的Subscriber向上,执行onNext向下。
三个字总结,
下上下!
创建新的Observable一路向下,创建新的Subscriber一路向上,最后每次发射元素的时候一路向下直到到达用户创建的subscriber为止。
希望大家能好好理解一下这里设计的精妙之处。下周会带来subscribeOn()操作符的源码讲解。