FluxJava 最初的设计就是以 Add-on 的方式来提供对于 RxJava 的支持,所以这次增加 RxJava2 的部份也依照相同的模式,在 Project 中加上了 fluxjava-rx2 的 Module。新的 Module 功能上与 fluxjava-rx 大致上相同,只是原本以 RxJava 规格运作的部份,改为使用 RxJava2。
由于 RxJava 与 RxJava2 不太有机会共存在同一个 Module 里,所以 fluxjava-rx2 沿用了 fluxjava-rx 的 Package 名称,在使用上这二个 Add-on 必须要择一引用。不过也带来了一个额外的好处,如果想要由 fluxjava-rx 升级到 fluxjava-rx2 时,只要修改成 RxJava2 的调用规格,不用再特别调整 Import 的内容。
配合 RxJava2 的更新,按照惯例增加了一篇文章手把手教你如何一起用 FluxJava 与 RxJava2 做为使用上的参考。更多深入的文章请参阅简书 //www.greatytc.com/u/fea63707e07f 或 wznote.blogspot.com。
在 fluxjava-rx2 中与 fluxjava-rx 最大的差异,主要是因应 RxJava2 把原本的 Observable 分成了有背压版本的 Flowable 与没有背压版本的 Observable。因此 RxBus 与 RxStore 中都分别再提供了 toFlowable
的 Method 来取得 Flowable,不过 Observable 本来就可以再转换为 Flowable,此处的功能只是为了增加便利性、简化源代码之用。
同时,利用这篇文章补充说明一下一些使用上的技术细节。在 RxStore 中使用的是没有背压版本的 Observable 来接收外部传来的信息,只是接收到之后就会被分派到不同的 Thread 上去处理后续的工作,所以在这个部份是不大有机会遇上 MissingBackpressureException 问题的。但是,这并不代表背压所造成的情况就不存在了,只是瓶颈移到了 ThreadPool 的承受能力或是执行的环境可以产生 Thread 的数量上。
就算是使用有背压功能的 Flowable,也不代表就可以高枕无忧了。说穿了,背压本身不是什么神奇的黑科技,原理上只是在上下游中间加了个水池,让下游有喘息的空间。水池毕竟还是有一定的物理限制,没有控制好,依旧会让水池承载不下而出现错误。
因此,不论使用哪一种方法,前端发送的数量仍然应该要被谨慎地控制,避免海量的信息把接收端给淹没了。像是把 RecyclerView 滚动时产生位移的 Event 毫无选择地往 RxStore 送。
在发送端就要筛选信息,除了要减少 MissingBackpressureException 可能会出现的机会外,还有一个目的是要节省执行成本。当信息数量大到无法处理时,能做的只有挑选值得处理的部份来进行。当挑选的工作被移到发送后,RxJava 的传送机制表面上看起来就是简单地转了一手,但是如果去追踪其源代码,就可以发现其实底下做了不少的工作。而每一次传送都要运行这些内容,可以想见在数量到达一定的程度之后,就会显现出可观的效能差距。
由于 Observable 在定义上所形成的限制使然,同一个发送源无法把的信息分配至不同的 Thread 上送出。RxStore 为了要让每个资料处理要求可以独立、同步地进行,所以才会在接收到信息后,以不同的 Thread 进行后续的工作。
在 RxStore 里提供了一个 getExecutor
Method,可以使用 ThreadPool 来做为背压的替代方案,但是并没有像背压一样有可以控制上游的功能。在实作 getExecutor
时要注意,不要直接在回传时 New 一个 ThreadPool 的 Instance。因为 getExecutor
是在每一次收到信息后调用一次。以上的做法,也等同于每一次收到信息就拿到一个 ThreadPool 的 Instance,每一个 ThreadPool 都只会产生一个 Thread,这就失去了使用 ThreadPool 的用意。
最容易出现以上问题的情况是使用 Executors 来取得 ThreadPool,一般的情况下很容易忽略 Executors 其实是每次调用就产生一个 Instance。所以当以 return Executors.newFixedThreadPool(poolSize);
的方式回传 getExecutor
时,一开始也许显示不出问题而被略过,但是一但信息爆量后,就会因为产生 Thread 的数量到达上限而中止运作。
以上,是这次补充的内容,让使用 FluxJava 的朋友做为参考。