RxJava之转换操作符源码介绍

转载请以链接形式标明出处:
本文出自:103style的博客

转换相关的操作符 以及 官方介绍

RxJava转换操作符 官方介绍 :Transforming Observables

以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析

buffer

  • 官方示例:
    Observable.range(0, 10)
        .buffer(4)
        .subscribe((List<Integer> buffer) -> System.out.println(buffer));
    
    输出:
    [0, 1, 2, 3]
    [4, 5, 6, 7]
    [8, 9]
    
  • 返回对象的 ObservableBuffersubscribeActual 方法:
    单参数bufferskipcount 是相等的。
    protected void subscribeActual(Observer<? super U> t) {
        if (skip == count) {
            BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
            if (bes.createBuffer()) {//1.0
                source.subscribe(bes);
            }
        } else {
            source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
        }
    }
    
    • (1.0) createBuffer即新创建了一个ArrayList对象 buffer
  • onNext(T t)onComplete()方法:
    public void onNext(T t) {
        U b = buffer;
        if (b != null) {
            b.add(t);
            if (++size >= count) {//1.0
                downstream.onNext(b);
                size = 0;
                createBuffer();
            }
        }
    }
    
    public void onComplete() {
        U b = buffer;
        if (b != null) {//2.0
            buffer = null;
            if (!b.isEmpty()) {
                downstream.onNext(b);
            }
            downstream.onComplete();
        }
    }
    
    • (1.0) 每次调用onNext 就检查缓存的事件数是否 不小于 buffer操作符设置的 值,成立则将缓存的 buffer 数组 传给观察者的 onNext
    • (2.0) onComplete 是检查缓存的事件数是否不为空,成立则将缓存的 buffer 数组 传给观察者的 onNext,再调用观察者的 onComplete

cast

  • 官方示例:
    Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
    numbers.filter((Number x) -> x instanceof Integer)
            .cast(Integer.class)
            .subscribe((Integer x) -> System.out.println(x));
    
    输出:
    1
    7
    12
    5
    
  • cast 是通过map操作符来实现的,我们直接看map
    public final <U> Observable<U> cast(final Class<U> clazz) {
        ObjectHelper.requireNonNull(clazz, "clazz is null");
        return map(Functions.castFunction(clazz));
    }
    
    • apply方法:
      public U apply(T t) throws Exception {
          return clazz.cast(t);
      } 
      

map

  • 官方示例:
    Observable.just(1, 2, 3)
            .map(x -> x * x)
            .subscribe(System.out::println);
    
    输出:
    1
    4
    9
    
  • 返回对象的 ObservableMapsubscribeActual 方法:
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
  • 继续看 MapObserveronNext(T t)
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != NONE) {
            downstream.onNext(null);
            return;
        }
        U v;
        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        downstream.onNext(v);
    }
    
    • 即将 map 操作符 传入Function对象的 返回值 传递给 链式调用上一步的返回对象的 onNext(T t)

concatMap

RxJava之concatMap系列转换操作符介绍


flatMap

RxJava之flatMap系列转换操作符介绍


flattenAsFlowable

  • 官方示例:
    Single<Double> source = Single.just(2.0);
    Flowable<Double> flowable = source.flattenAsFlowable(x -> {
        return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
    });
    flowable.subscribe(x -> System.out.println("onNext: " + x));
    
    输出:
    onNext: 2.0
    onNext: 4.0
    onNext: 8.0
    
  • 我们先看Single.just(2.0)
    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    
  • SingleJustsubscribeActual
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }
    
  • flattenAsFlowable返回对象的 SingleFlatMapIterableFlowablesubscribeActual 方法:
    protected void subscribeActual(Subscriber<? super R> s) {
        source.subscribe(new FlatMapIterableObserver<T, R>(s, mapper));
    }
    
  • 继续看 FlatMapIterableObserveronSubscribe(Disposable d)onSuccess(T value)
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);//1.0
        }
    }
    public void onSuccess(T value) {
        Iterator<? extends R> iterator;
        boolean has;
        try {
            iterator = mapper.apply(value).iterator();//2.0 调用apply返回的Iterable对象的 iterator()方法。
            has = iterator.hasNext();//3.0
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            downstream.onError(ex);
            return;
        }
        if (!has) {
            downstream.onComplete();//3.1
            return;
        }
        this.it = iterator;//3.2
        drain();
    }
    
    • (1.0) 通过Flowable subscribe流程介绍 我们知道downstream.onSubscribe(this)即调用 FlowableInternalHelper.RequestMax.INSTANCEaccept方法:
      public enum RequestMax implements Consumer<Subscription> {
          INSTANCE;
          @Override
          public void accept(Subscription t) throws Exception {
              t.request(Long.MAX_VALUE);
          }
      }
      
      即:FlatMapIterableObserver.request(Long.MAX_VALUE): 即为设置变量requestedvalueLong.MAX_VALUEdrain()因为it 变量还是null,所以没做什么操作。
      public void request(long n) {
          if (SubscriptionHelper.validate(n)) {
              BackpressureHelper.add(requested, n);
              drain();
          }
      }
      
    • (2.0) 调用flattenAsFlowable传入的Functionapply返回的Iterable对象的 iterator()方法。
    • (3.0) 检查Iterable时候为空,(3.1) 为空直接onComplete()(3.2) 不为空则将 iterator()返回值赋值给当前的 it 变量,继续执行drain()
  • drain():
    void drain() {
        ...
        Subscriber<? super R> a = downstream;
        Iterator<? extends R> iterator = this.it;
        ...
        int missed = 1;
        for (; ; ) {
            if (iterator != null) {
                long r = requested.get();
                long e = 0L;
                if (r == Long.MAX_VALUE) {//1.0
                    slowPath(a, iterator);
                    return;
                }
                ...
            }
            ...
        }
    }
    
    • 因为上一步downstream.onSubscribe(this)调用了request(Long.MAX_VALUE), 所以 (1.0) 这里条件成立,执行slowPath(downstream iterator)
  • slowPath(downstream iterator)
    void slowPath(Subscriber<? super R> a, Iterator<? extends R> iterator) {
        for (;;) {
            if (cancelled) {
                return;
            }
            R v;
            try {
                v = iterator.next();//1.0
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                a.onError(ex);
                return;
            }
            a.onNext(v);//1.1
            if (cancelled) {
                return;
            }
            boolean b;
            try {
                b = iterator.hasNext();//1.2
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                a.onError(ex);
                return;
            }
            if (!b) {
                a.onComplete();//1.3
                return;
            }
        }
    }
    
    • (1.0) 获取到的元素,(1.1)传递给downstreamonNext(1.2)然后判断是否还有其他元素,如果有则循环继续,没有的话即调用 downstreamonComplete 结束。

flattenAsObservable

  • 官方示例:
    Single<Double> source = Single.just(2.0);
    Observable<Double> observable = source.flattenAsObservable(x -> {
        return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
    });
    observable.subscribe(x -> System.out.println("onNext: " + x));
    
    输出:
    onNext: 2.0
    onNext: 4.0
    onNext: 8.0
    

subscribeActual实现的逻辑和 flattenAsFlowable 类似,只是返回的对象为 SingleFlatMapIterableObservable,就不再赘述了。


groupBy

  • 官方示例:

    Observable<String> animals = Observable.just(
        "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
    animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
        .concatMapSingle(Observable::toList)
        .subscribe(System.out::println);
    

    输出:

    [TIGER, TURTLE]
    [ELEPHANT]
    [CAT, CHAMELEON]
    [FROG, FISH, FLAMINGO]
    
  • 我们来看看返回对象的ObservableGroupBy:

    public GroupByObserver(Observer<? super GroupedObservable<K, V>> actual, 
                           Function<? super T, ? extends K> keySelector, 
                           Function<? super T, ? extends V> valueSelector, 
                           int bufferSize, boolean delayError) {
        this.downstream = actual;
        this.keySelector = keySelector;
        this.valueSelector = valueSelector;
        this.bufferSize = bufferSize;
        this.delayError = delayError;
        this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
        this.lazySet(1);
    }
    
    public void subscribeActual(Observer<? super GroupedObservable<K, V>> t) {
        source.subscribe(new GroupByObserver<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError));
    } 
    
  • 继续看 GroupByObserveronNext(T t)

    public void onNext(T t) {
        K key;
        try {
            key = keySelector.apply(t);//1.0
        } catch (Throwable e) {
            ...
            return;
        }
        Object mapKey = key != null ? key : NULL_KEY;
        GroupedUnicast<K, V> group = groups.get(mapKey);
        if (group == null) {
            if (cancelled.get()) {
                return;
            }
            group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
            groups.put(mapKey, group);//2.0
            getAndIncrement();
            downstream.onNext(group);//3.0
        }
    
        V v;
        try {
            v = ObjectHelper.requireNonNull(valueSelector.apply(t), "The value supplied is null");//4.0
        } catch (Throwable e) {
            ...
            return;
        }
        group.onNext(v);//4.1
    }
    
    • (1.0) 通过keySelector.apply(t)即官方示例中的 animal.charAt(0)获取分组的 key
    • (2.0) 如果GroupedUnicast不存再这个key,则保存进去。
    • (3.0) 然后继续调用上一步操作符的 onNext方法,即官方示例中的just
    • (4.0) 通过valueSelector.apply(t)即官方示例中的 String::toUpperCase)获取值,(4.1)添加到ToListObservercollection中。
  • 最后通过 onComplete()输出:

    public void onComplete() {
        List<ObservableGroupBy.GroupedUnicast<K, V>> list = new ArrayList<ObservableGroupBy.GroupedUnicast<K, V>>(groups.values());
        groups.clear();
        for (ObservableGroupBy.GroupedUnicast<K, V> e : list) {
            e.onComplete();
        }
        downstream.onComplete();
    }
    

    ToListObserveronComplete():

    public void onComplete() {
        U c = collection;
        collection = null;
        downstream.onNext(c);
        downstream.onComplete();
    }
    

scan

  • 官方示例:
    Observable.just(5, 3, 8, 1, 7)
            .scan(0, (partialSum, x) -> partialSum + x)
            .subscribe(System.out::println);
    
    输出:
    0
    5
    8
    16
    17
    24
    
  • 我们来看看返回对象的ObservableScanSeed:
    public ObservableScanSeed(ObservableSource<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> accumulator) {
        super(source);
        this.accumulator = accumulator;
        this.seedSupplier = seedSupplier;
    }
    
    @Override
    public void subscribeActual(Observer<? super R> t) {
        R r;
        try {
            r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null");//1.0
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, t);
            return;
        }
        source.subscribe(new ScanSeedObserver<T, R>(t, accumulator, r));
    }
    
    • (1.0) seedSupplier.call()即官方示例中的 0,即设置 r 的值为0.
  • 继续看ScanSeedObserveronNext(T t):
    public void onNext(T t) {
        if (done) {
            return;
        }
        R v = value;//1.0
        R u;
        try {
            u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");//2.0
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            upstream.dispose();
            onError(e);
            return;
        }
        value = u;//3.0
        downstream.onNext(u);//4.0
    }
    
    • (1.0) value 即为上一步设置的 r0.
    • (2.0) accumulator.apply(v, t) 即为官方示例中的 partialSum + x
    • (3.0) 更新value 的值
    • (4.0)accumulator.apply(v, t)传递给观察者的onNext

switchMap

  • 官方示例:
    Observable.interval(0, 1, TimeUnit.SECONDS)
            .switchMap(x -> {
                return Observable.interval(0, 750, TimeUnit.MILLISECONDS)
                        .map(y -> x);
            })
            .takeWhile(x -> x < 3)
            .blockingSubscribe(System.out::print);
    
    
    输出:
    001122
    
  • 我们来看看返回对象的ObservableSwitchMap:
    public ObservableSwitchMap(ObservableSource<T> source,
                               Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize,
                                       boolean delayErrors) {
        super(source);
        this.mapper = mapper;
        this.bufferSize = bufferSize;
        this.delayErrors = delayErrors;
    }
    
    @Override
    public void subscribeActual(Observer<? super R> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(new SwitchMapObserver<T, R>(t, mapper, bufferSize, delayErrors));
    }
    
  • 继续看SwitchMapObserveronNext:
    public void onNext(T t) {
        long c = unique + 1;
        unique = c;
        SwitchMapInnerObserver<T, R> inner = active.get();
        if (inner != null) {
            inner.cancel();
        }
        ObservableSource<? extends R> p;
        try {
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The ObservableSource returned is null");//1.0
        } catch (Throwable e) {
            ...
            return;
        }
        SwitchMapInnerObserver<T, R> nextInner = new SwitchMapInnerObserver<T, R>(this, c, bufferSize);//2.0
        for (;;) {
            inner = active.get();
            if (inner == CANCELLED) {
                break;
            }
            if (active.compareAndSet(inner, nextInner)) {
                p.subscribe(nextInner);//3.0
                break;
            }
        }
    }
    
    • (1.0) 通过mapper.apply(t)即官方示例中的 Observable.interval(0, 750, TimeUnit.MILLISECONDS).map(y -> x)返回的ObservableMap对象。
    • (2.0) 构建SwitchMapInnerObserver对象
    • (3.0) 用返回的ObservableMap订阅SwitchMapInnerObserver对象

window

  • 官方示例:
    Observable.range(1, 10)
            // Create windows containing at most 2 items, and skip 3 items before starting a new window.
            .window(2, 3)
            .flatMapSingle(window -> {
                return window.map(String::valueOf)
                        .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
            })
            .subscribe(System.out::println);
    
    输出:
    [1, 2]
    [4, 5]
    [7, 8]
    [10]
    
  • 我们来看看返回对象的ObservableWindow:
    public ObservableWindow(ObservableSource<T> source, long count, long skip, int capacityHint) {
        super(source);
        this.count = count;
        this.skip = skip;
        this.capacityHint = capacityHint;
    }
    
    @Override
    public void subscribeActual(Observer<? super Observable<T>> t) {
        if (count == skip) {
            source.subscribe(new WindowExactObserver<T>(t, count, capacityHint));
        } else {
            source.subscribe(new WindowSkipObserver<T>(t, count, skip, capacityHint));
        }
    }
    
  • 继续看WindowSkipObserveronNext:
    public void onNext(T t) {
        final ArrayDeque<UnicastSubject<T>> ws = windows;
        long i = index;
        long s = skip;
        if (i % s == 0 && !cancelled) {//3.0
            wip.getAndIncrement();
            UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
            ws.offer(w);
            downstream.onNext(w);
        }
        long c = firstEmission + 1;
        for (UnicastSubject<T> w : ws) {
            w.onNext(t);//1.0
        }
        if (c >= count) {
            ws.poll().onComplete();//2.0
            if (ws.isEmpty() && cancelled) {
                this.upstream.dispose();
                return;
            }
            firstEmission = c - s;
        } else {
            firstEmission = c;
        }
        index = i + 1;
    }
    
    • (1.0) 将元素存入 queue
    • (2.0) 当元素个数到达count时,就之前的元素全部输出
    • (3.0) 当元素个数到达skip时,就重新创建一个UnicastSubject来存储元素

以上

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容

  • 怎么如此平静, 感觉像是走错了片场.为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emit...
    Young1657阅读 1,466评论 2 1
  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,020评论 1 9
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 927评论 0 3
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,190评论 2 8
  • 分针每一步走的都有一些沉重,我有时害怕这种声音,在床上蜷作一团;有时在睡梦中依靠朦胧感壮起胆子将头探出被子外面,...
    一张大油饼阅读 152评论 0 0