RxJava2 的4个变换操作符

前言

本文将从场景和源码剖析两个角度介绍RxJava 中的4个变换操作符 mapflatMapconcatMapzip

设定

我要为RxJava2 中几个熟悉的类做个设定,以此来更好的理解我们的场景。
子弹: T 范型
弹匣: Observable<T>
枪{扳机}: ObservableOnSubscribe<T>{ ObservableEmitter<T> }
准星{目标}: ObservableSource<T>{Observer<T>}

我们假设正在玩一款射击游戏。不同类型的子弹对应着不同类型的弹匣和枪,扣动扳机(ObservableEmitter e, e.onNext)就发射一枚子弹,准星锁定着目标,目标被打中后的事情这里不讨论。

        // 定义子弹
        class Bullet

        // 定义枪
        class Gun: ObservableOnSubscribe<Bullet>{
            override fun subscribe(e: ObservableEmitter<Bullet>) {
                //打一枪
                e.onNext(Bullet())
            }
        }

        // 造一把枪
        val gun = Gun()

        //定义一个可以被子弹打死的怪
        class Enemy: Observer<Bullet>{
            override fun onComplete() {
                TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
            }

            override fun onSubscribe(d: Disposable) {
                TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
            }

            override fun onNext(t: Bullet) {
                TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
            }

            override fun onError(e: Throwable) {
                TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
            }
        }

        // 给枪上弹匣,默认无限子弹
        val load = Observable.create(gun)

        // 刷一个怪
        val enemy = Enemy()
        
        // 瞄准
        load.subscribe(enemy)

如上我们就利用RxJava2完成了一个刷怪打怪的场景

Map

现在我们扩展一点我们的游戏。

   
        //定义银弹
        class SilverBullet
        //定义吸血鬼,只能被银弹打
        class Vampire: Observer<SilverBullet> {
            ...
        }

        //刷一个吸血鬼
        val vampire = Vampire()

我们有了一种新怪,和一种新子弹。
按照之前的讨论,我们再定义一个发射银弹的枪就可以了,但是这样我们就不能介绍我们的变换操作符了。所以我们这里设定,不能换枪,且只能用之前那把发射子弹的枪。
有了这个设定,我就可以来介绍Map这个操作符了。

        //定义一个枪口转换器,子弹转银弹
        class SilverMuzzle : Function< Bullet, SilverBullet>{
            override fun apply(t: Bullet): SilverBullet {
                return SilverBullet()
            }
        }
        //造好转换器
        val silverMuzzle = SilverMuzzle()

        load
                .map(silverMuzzle)//安装转换器
                .subscribe(vampire)//瞄准吸血鬼

首先要介绍一下枪口转换器Function

package io.reactivex.functions;

import io.reactivex.annotations.NonNull;

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

它是一个将类型T转换成类型R的接口,我们的枪口转换器实现的就是将接受一个子弹,再将子弹转换成银弹的apply方法。
而Map的作用就是接收这样一个Function,并调用这个接口实现转化,达到转换类型的目的。
我们顺便来看看Map的内部实现,讲解会在中文注释中请阅读注释。

    /**
     * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and
     * emits the results of these function applications.
     * <p>
     * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt="">
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @param <R> the output type
     * @param mapper
     *            a function to apply to each item emitted by the ObservableSource
     * @return an Observable that emits the items from the source ObservableSource, transformed by the specified
     *         function
     * @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //这只是一个判空抛异常的工具类,可以忽略
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //RxJavaPlugins 不在我们的讨论范围内,你没有设置过什么的话,在这里也是可以忽略的
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        //就本文的例子,你可理解这一段为 return new ObservableMap<T, R>(this, mapper) 下同
    }
/*
* 本文为了讲解具体的实现,需要贴一写必要的代码,但是RxJava的套路类似。
* 本文中只贴一下比较简单的Map的源码,便于讲解套路,对于后面比较复杂的操作符则以讲解思路为主,套路是一样的。
*/

现在的矛头就指向ObservableMap了。

package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.BasicFuseableObserver;

//AbstractObservableWithUpstream 是 Observable的一个子类,除了接受两个类型外,内部还有一个ObservableSource<T>的实例,也就是我们“枪”的准星,它可以直接找到我们瞄准的"怪"。
//同时Observable 也需要实现ObservableSource<T>这个接口,所有上一段代码中的this就是这里的ObservableSource<T>
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //这是枪口转换器的实例
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        //枪口转换器赋值
        this.function = function;
    }

    //Observable 是一个抽象类,而subscribeActual是Observable的抽象方法,
    //所以作为子类,ObservableMap实现了subscribeActual
    //而对于这个场景,subscribeActual才是实质上的瞄准。
    @Override
    public void subscribeActual(Observer<? super U> t) {
        //我们枪口瞄准的其实不是吸血鬼,而是一个叫MapObserver的NPC
        //这个NPC接收了吸血鬼t,和枪口转换器
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    //BasicFuseableObserver是实现了Observer的一个抽象类,这里就吧MapObserver当作Observer的一个实现即可。
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            //这个NPC把吸血鬼actual传递给了父类,但是它是protected的,所以我们仍然可以使用到
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                //这个工具类我们前面提过,可以忽略,mapper.apply即调用了枪口转换器,这个v实质上是我们的银弹
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //这个actual 我们之前提过就是吸血鬼,onNext不必多说,是Observer中的一个方法,这里的效果等同于开枪。
            //这里的调用其实可以看到,这个NPC拿到了我们瞄准的吸血鬼,并把子弹转成银弹,开了枪。
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

看完上面的注释,这里我们结合场景总结一下RxJava2的套路。
我们虽然瞄准的是吸血鬼,但是子弹出去必然要经过枪口转换器,而 枪口转换器里面有一个NPC,直接堵住了我们的枪口,所以,实质上,我们开枪打的是NPC,而NPC它知道我们要打吸血鬼,还要把子弹转成银弹。它调用了我们实现的方法把子弹转成银弹,并且瞄准了吸血鬼开了枪。
这里的Map讲解完成,下面的几个操作符,不再贴完整代码,因为太长了。如果可以,你现在打开源码对照着读也许更容易理解哟(只是也许)。

FlatMap

我们再扩展一下游戏,这会儿子我们考虑一下中单后的事情。

        // 定义子弹,增加子弹类型, 原来的银弹就不用了
        class Bullet(val type: Type){
            enum class Type{
                NORMAL,
                SILVER
            }
        }
        //随机怪
        class RandomEnemy: Observer<Bullet> {
            private val r = Random()
            private val type: Bullet.Type
            init {
                //初始化能打怪的子弹类型
                type = getType()
            }
            private fun getType(): Bullet.Type{
                return if(r.nextBoolean())
                    Bullet.Type.SILVER
                else
                    Bullet.Type.NORMAL
            }

           ...

            override fun onNext(t: Bullet) {

                Log.i("isOK",if(t.type == type) "OK" else "NO")

            }

        }
        //刷一个随机怪
        val r = RandomEnemy()

这下子完了,设定我们只能用那把枪,而那把枪只能打一枪啊,刷了一个随机怪,我不知道该用什么子弹打。
这个时候 ,就需要NPC的帮助了。

        //定义一把npc的枪,但是这不是枪,它不发射子弹,只发射子弹类型的信号,是信号枪
        class NPCGun : ObservableOnSubscribe<Bullet.Type>{
            override fun subscribe(e: ObservableEmitter<Bullet.Type>) {
                e.onNext(Bullet.Type.NORMAL)
                e.onNext(Bullet.Type.SILVER)
            }
        }

        //定义一个枪口转换器,把子弹转换成信号枪
        class DoubleBullet: Function< Bullet, ObservableSource<Bullet.Type>>{
            override fun apply(t: Bullet): ObservableSource<Bullet.Type> {
                return Observable.create(NPCGun())
            }
        }

        //定义一个把信号转成子弹的转换器
        class IntMuzzle: Function<Bullet.Type,Bullet> {
            override fun apply(t: Bullet.Type): Bullet {
                return Bullet(t)
            }
        }

        load.flatMap(DoubleBullet())//装好信号转换器
                .map(IntMuzzle())//装好信号转子弹的
                .subscribe(r)//瞄准随机怪

我们先装一个flatMap的转换器,这个转换器里的NPC没有枪,我们给了它一把,这个NPC也有转化能力,把子弹转成信号,然后开了2枪。这个时候,我们再在flatMap上装一个map转换器,让信号变成子弹,然后打怪。这样虽然我只开了一枪,但是在NPC的帮助下,随机怪是被打了两枪,而且是被2种子弹打了2枪,这样必然能打死随机怪。

看到这里,不少小伙伴应该有2个疑问:
a) 既然flatMap可以发射信号弹,为什么不直接发射子弹?
b) mapflatMap 都是接收一个Function接口,flatMap是不是多余了,我也可以用map取实现。
这里我来解答一下,flatMap当然可以直接发子弹,这里a)是为了解释b)即是剧情需要。
flatMap里的NPC没有枪,而map里的NPC有枪。如果你给map的Function定义一把枪Function< Bullet, ObservableSource<Bullet>>,那么它发射的不是子弹,而是枪啊。map里的NPC的枪就会变成ObservableSource<ObservableSource<Bullet>>
而flatMap接收的Function相对较严格必须是Function<T, ObservableSource <R>>。也就是你必须给到NPC一把枪,因为它没有枪。所以map里的NPC会跟着你开枪,是听从你的意志,而flatMap里的NPC会听从枪的意志(虽然本质上也是你的意志,毕竟枪是你给的)。所以,对于开枪map是1对1 的关系,而flatMap是1对n的关系,这取决于你给他的枪是什么样的。
flatMap里的实现很鸡贼,这里我们来看一下关键代码,请阅读注释。

//这里我们删去了太多废代码和与本场景无关的代码,主要看思路
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
   
    ...

    @Override
    public void subscribeActual(Observer<? super U> t) {

        ...

        //这里的NPC叫 MergeObserver,至于它的参数,你只要知道t是我们的目标“随机怪”,mapper是我们的Function转换函数,把子弹转成信号枪的。
        //其他参数和本场景无关
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {

        ...

        @Override
        public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            //准备拿枪了
            ObservableSource<? extends U> p;
            try {
                //这里的t是我们发射出去的子弹,被NPC拿到了
                //而NPC利用我们实现的接口,也就是mapper,把这个子弹转换成了枪
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }
            ...
            //这个函数就是鸡贼的地方,请认真阅读注释
            subscribeInner(p);
        }
        
        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
              //这个无限循环在本场景中无意义
              for (;;) {
                //这个if判断就是段废代码,这个p从出生到现在和callable没有半毛钱关系,请直接看else
                if (p instanceof Callable) {
                    tryEmitScalar(((Callable<? extends U>)p));

                    if (maxConcurrency != Integer.MAX_VALUE) {
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                break;
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //NPC自己造了一个怪
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    //这个addInner也和本场景无关,它只是把这个怪加到一个线性表中。
                    if (addInner(inner)) {
                        //关键在于这个订阅也就是"瞄准"
                        //我们给NPC的枪它并不瞄准我们指定的怪,而是瞄准了它自己造的怪
                        //而inner再中弹后调用了tryEmit,在tryEmit中再开枪打我们指定的怪
                        //这里可以停下来想想为什么,然后继续阅读下面的注释
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }

       ...

        void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                //真正的射击我们指定的怪
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

        ...

    }

    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        ...

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                //调用tryEmit,这里的parent再初始化的时候已经指向了我们的NPC
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
       ...
    }
}

flatMap中的NPC,它拿到枪是打自己造的怪,再自己的怪中枪后会告诉NPC,我中枪了,这个时候NPC才会打我们指定的怪。
在前的map中提到过,onNext虽然可以等效成开枪射击,但是,它并不是真正意思上的开枪,因为我们的怪Observer已经订阅了我们的枪,这里NPC其实也是个Observer,但是它持有怪的Observer接口也就是actual,这里只是调用actual.onNext来模拟打枪,但实际上订阅的行为actual已经交给我们一开始定义的枪了,只是枪把接口给了NPC,让它可以把子弹投递到怪的脑袋里。这里你也许可以回头看看map的源码。会更好的理解。
而flatMap里的NPC它转化的不是子弹,而是枪,把枪投递到怪的脑子里没有用,只有子弹才有用,所以这把枪里的子弹要打出去。这才有了NPC自己造怪的事情,而NPC自己造的怪中弹后调用了parent.tryEmit,同时把自己中的弹吐出来给NPC,这个时候NPC就有子弹了,就可以调用actual.onNext来模拟打枪。

由于突然有了比较忙的工作安排,所以剩下2个变换操作符先留着下回再讲吧。也不知道我讲的大家看不看得懂。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354