【RxJava】- 创建操作符源码分析

【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 结合操作符源码分析
【RxJava】- 连接操作符源码分析

简介

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。简单一点就是创建一个事件,注册一个观察者,当事件发生改变时,及时通知观察者。同时RxJava可以把一序列的异步事件按照一定规则组合成新的事件序列。

RxAndroid里面就几个类,是在RxJava基础上针对Android开发封装了一些使用方法而已。

参考

GitHub:RxJava
GitHub:RxAndroid
ReactiveX:reactivex
中文文档地址:ReactiveX/RxJava文档中文版

版本

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

订阅

  • subscribe(@NonNull Observer<? super T> observer)
    执行被观察者subscribeActual方法并传入观察者实例。

调度器

下面将大体介绍一下RaJava里面的调度器(Scheduler)。Schedulers类用于返回标准Scheduler实例的静态工厂方法。

Observable

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) { return apply(f, source);}
   return source;
 }

下面基本都是在变量“f”等于null情况下分析。

Creating Observables

创建操作

create

返回ObservableCreate实例

Observable.create((ObservableOnSubscribe<Integer>) emitter -> {            
    try {
        if (!emitter.isDisposed()) {
              for (int i = 1; i < 5; i++) {emitter.onNext(i);}
              emitter.onComplete();
         }
    } catch (Exception e) {
                emitter.onError(e);
    }}.subscribe(new Observer<Integer>() {...}

看一下create方法

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
   Objects.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

看一下onAssembly方法

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

onObservableAssembly默认是null,所以create最终返回的是持有被观察者的source(ObservableCreate类型)。

然后调用ObservableCreate对象的subscribe方法,并传入观察者实例,在subscribe方法中继续调用ObservableCreate的subscribeActual方法并传入观察者实例observer。

看一下subscribeActual方法做了什么

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
    observer.onSubscribe(parent);
    try {
       source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

创建事件发射器CreateEmitter并传入观察者实例observer,调用onSubscribe方法,通知观察者,观察关系已经准备好,并传入事件发射器,以便观察者可以主动选择放弃对事件通知的接受。

在被观察者中,通过调用CreateEmitter中的onNext,onError,onComplete等方法,如果事件观察没有被取消,那么会调用观察者(observer)中对应的方法来通知观察者。

  • 发射器
    在ObservableCreate有那个发射器,CreateEmitter和SerializedEmitter,一般的发射器和序列发射器。

    SerializedEmitter维护了一个队列数组,保证有序的调用onNext, onError 和 onComplete,调用onNext时,会把传入的值插入到队列,然后循环从队列获取,然后执行观察者的onNext方法。

    至于有序调用,我查看源码后,我的理解是这样的:有序指的是在执行完所以队列里面的事件(即队列全部值都被取出并通知观察者),然后才会执行onError或者onComplete方法,onError和onComplete只有其中一个方法被执行了,另一个不会再被执行。

    如果执行了onError和onComplete其中一个,那么剩余队列里面的事件将不会被执行。
    当调用onError时异常为ExceptionHelper.TERMINATED时,观察者的onError不会被调用,更多请直接查看源码。

defer

返回ObservableDefer实例

延期创建被观察者对象,直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable。

调用defer传入的实例(Supplier类型)的get()方法,获取被观察者对象(ObservableSource类型)

Observable.defer((Supplier<ObservableSource<Integer>>) () -> observer -> observer.onNext(1)).subscribe(...)

具体的可以看一下ObservableDefer这个类,至于subscribe方法怎么传参数,大致看一下调用逻辑就明白了。

Empty/Never/Throw
  • Empty
    返回ObservableEmpty实例,创建一个不发射任何数据,但是正常终止的Observable,会调用观察者onSubscribe和onComplete方法

  • Never
    返回ObservableNever实例,创建一个不发射数据也不终止的Observable,调用观察者onSubscribe方法。

  • Throw
    用error方法实现,返回ObservableError实例,参数为持有异常实例的JustValue实例。

    创建一个不发射数据以一个错误终止的Observable,调用观察者onSubscribe和onError方法。

From

数据转换,实现有:fromAction,fromArray,fromCallable,fromCompletable,fromFuture,fromFuture,fromIterable,fromMaybe,fromPublisher,fromRunnable,fromSingle,fromSupplier。对应Observable实现请自己查看源码,里面代码不多,阅读完基本知道怎么用。

Interval

创建一个按固定时间间隔发射整数序列的Observable,返回ObservableInterval实例,同时会默认传入一个ComputationScheduler实例作为调度器。从0开始自加,每发射一次加1。

interval(long initialDelay, long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler)
  • initialDelay
    延迟发射第一个值,即“0”,后面按period间隔时间正常发射。
  • period
    两次发射直接的间隔时间
  • unit
    initialDelay和period时间单位
  • scheduler
    调度器
    如果想在主线程接收事件,需要用observeOn(AndroidSchedulers.mainThread())转换。AndroidSchedulers就是RxAndroid里面的类了。
Just

创建一个发射指定值的Observable,可以传入多个参数,如果传入一个参数返回ObservableJust实例,多个返回ObservableFromArray实例,前者把传入的一个参数原封不动发射后就完成,后者需要把数组中的数据一个一个原封不动发射后完成,前者完成可以看成是后者的一种特殊情况。

Range

创建一个发射特定整数序列的Observable,参数如果是int返回ObservableRange实例,如果是long返回ObservableRangeLong实例。

这个和上面的interval差不多,不同的interval从0开始,如果不取消,就一直发射,而Range只发射自定参数范围的整数,发射完就停止,发射一次自加1。比如range(1,3)将收到1,2,3.

Repeat

创建一个发射特定数据重复多次的Observable,返回ObservableRepeat实例。比如对just操作做重复操作:

Observable.just(1,3).repeat().subscribe(...)

repeat可接收一个参数用作重复次数。

  • repeatUntil
    返回ObservableRepeatUntil实例,需要传入BooleanSupplier类型参数。

    每个操作完成后都会调用ObservableRepeatUntil中的onComplete方法,而在该方法中又会调用BooleanSupplier的getAsBoolean方法来判断是否需要再次执行操作,如果不需要者操作终止。

  • repeatWhen
    返回ObservableRepeatUntil实例,需要传入Function类型参数。有条件的重新订阅和发射原来的Observable。比如:

    Observable.just("A", "B").repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
          @Override
          public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
              return Observable.timer(10000, TimeUnit.MILLISECONDS);
          }
      }).subscribe(new Consumer<String>() {...}
    

    代码中设置了下次执行的时间是10秒之后,这种方式只能执行两次

Start

返回一个Observable,它发射一个类似于函数声明的值。

Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,返回ObservableTimer实例。

Transforming Observables

转换操作

Buffer
Observable.create(emitter -> {}).buffer(1)

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

  • ObservableBuffer

    buffer(int count),buffer(int count, int skip)
    buffer(int count, @NonNull Supplier<U> bufferSupplier)
    buffer(int count, int skip, @NonNull Supplier<U> bufferSupplier)
    

    都返回ObservableBuffer实例,bufferSupplier默认ArrayListSupplier实例。

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
         for (int i = 0 ; i < 10 ; i++){
              emitter.onNext(i);
         }
     }).buffer(5).subscribe(...)
    

    当count == skip时,收集数据策略由BufferExactObserver完成,当count != skip由BufferSkipObserver完成。

    • BufferExactObserver
      当buffer收集的数据到达count时发射一次

    • BufferSkipObserver
      就有点意思了,BufferSkipObserver中有个队列数组buffers,每采集一次数据时,当if (index++ % skip == 0)时,会创建一个新的数据收集数组,然后放到buffers中。

      然后遍历buffers中的数据,并将数据放入数据收集数组里面,当前一个数据收集数组收集到的数据个数等于count便会发射一次通知观察者,然后从buffers移除这个数据收集数组,如果buffers还存在数据收集数组,那么接下来的数据便会插到这个数组中。

  • ObservableBufferTimed

    buffer(long timespan, long timeskip, @NonNull TimeUnit unit)
    buffer(long timespan, long timeskip, @NonNull TimeUnit unit, @NonNull Scheduler scheduler)
    buffer(long timespan, @NonNull TimeUnit unit) 
    buffer(long timespan, @NonNull TimeUnit unit, int count)
    buffer(long timespan, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int count)
    

    都返回ObservableBufferTimed实例,bufferSupplier默认ArrayListSupplier实例。

    1. 当timespan == timeskip && maxSize == Integer.MAX_VALUE,收集策略BufferExactUnboundedObserver完成。
    2. timespan == timeskip,BufferExactBoundedObserver完成。
    3. 其它情况BufferSkipBoundedObserver完成。

    定期以List的形式发射新的数据,采集数据的时候会把数据放到集合中,达到count停止收集,当间隔时间达到timespan或者timeskip时候,发射一次数据,具体算法可以查看上面收集策略实现类。

  • ObservableBufferBoundary
    不贴方法了,自己看源码。监视这个叫openingIndicator的Observable(它发射bufferOpen对象),每当openingIndicator发射了一个数据时,它就创建一个新的List开始收集原始Observable的数据,并将openingIndicator传递给closingIndicator函数。这个函数返回一个Observable。buffer监视这个Observable,当它检测到一个来自这个Observable的数据时,就关闭List并且发射它自己的数据(之前的那个List)。

  • ObservableBufferExactBoundary
    不贴方法了,自己看源码。和ObservableBufferBoundary差不多。

总结

首先需要了解,RxJava被观察者和观察者之间的调度流程,个一定要清楚,这样对分析RxJava的操作符源码很有帮助,否则你将会陷入代码无尽的调用中。

Observable的subscribeActual就是做中转作用,调用到下一个ObservableOnSubscribe的subscribe方法中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容