前言
本文将从场景和源码剖析两个角度介绍RxJava 中的4个变换操作符 map
、flatMap
、concatMap
、zip
。
设定
我要为RxJava2 中几个熟悉的类做个设定,以此来更好的理解我们的场景。
子弹: T 范型
弹匣: Observable<T>
枪{扳机}: ObservableOnSubscribe<T>{ ObservableEmitter<T> }
准星{目标}: ObservableSource<T>{Observer<T>}
我们假设正在玩一款射击游戏。不同类型的子弹对应着不同类型的弹匣和枪,扣动扳机(ObservableEmitter e, e.onNext)就发射一枚子弹,准星锁定着目标,目标被打中后的事情这里不讨论。
// 定义子弹
class Bullet
// 定义枪
class Gun: ObservableOnSubscribe<Bullet>{
override fun subscribe(e: ObservableEmitter<Bullet>) {
//打一枪
e.onNext(Bullet())
}
}
// 造一把枪
val gun = Gun()
//定义一个可以被子弹打死的怪
class Enemy: Observer<Bullet>{
override fun onComplete() {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun onSubscribe(d: Disposable) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun onNext(t: Bullet) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun onError(e: Throwable) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
}
// 给枪上弹匣,默认无限子弹
val load = Observable.create(gun)
// 刷一个怪
val enemy = Enemy()
// 瞄准
load.subscribe(enemy)
如上我们就利用RxJava2完成了一个刷怪打怪的场景
Map
现在我们扩展一点我们的游戏。
//定义银弹
class SilverBullet
//定义吸血鬼,只能被银弹打
class Vampire: Observer<SilverBullet> {
...
}
//刷一个吸血鬼
val vampire = Vampire()
我们有了一种新怪,和一种新子弹。
按照之前的讨论,我们再定义一个发射银弹的枪就可以了,但是这样我们就不能介绍我们的变换操作符了。所以我们这里设定,不能换枪,且只能用之前那把发射子弹的枪。
有了这个设定,我就可以来介绍Map
这个操作符了。
//定义一个枪口转换器,子弹转银弹
class SilverMuzzle : Function< Bullet, SilverBullet>{
override fun apply(t: Bullet): SilverBullet {
return SilverBullet()
}
}
//造好转换器
val silverMuzzle = SilverMuzzle()
load
.map(silverMuzzle)//安装转换器
.subscribe(vampire)//瞄准吸血鬼
首先要介绍一下枪口转换器Function
package io.reactivex.functions;
import io.reactivex.annotations.NonNull;
/**
* A functional interface that takes a value and returns another value, possibly with a
* different type and allows throwing a checked exception.
*
* @param <T> the input value type
* @param <R> the output value type
*/
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
它是一个将类型T转换成类型R的接口,我们的枪口转换器实现的就是将接受一个子弹,再将子弹转换成银弹的apply方法。
而Map的作用就是接收这样一个Function
,并调用这个接口实现转化,达到转换类型的目的。
我们顺便来看看Map的内部实现,讲解会在中文注释中请阅读注释。
/**
* Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and
* emits the results of these function applications.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the output type
* @param mapper
* a function to apply to each item emitted by the ObservableSource
* @return an Observable that emits the items from the source ObservableSource, transformed by the specified
* function
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//这只是一个判空抛异常的工具类,可以忽略
ObjectHelper.requireNonNull(mapper, "mapper is null");
//RxJavaPlugins 不在我们的讨论范围内,你没有设置过什么的话,在这里也是可以忽略的
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
//就本文的例子,你可理解这一段为 return new ObservableMap<T, R>(this, mapper) 下同
}
/*
* 本文为了讲解具体的实现,需要贴一写必要的代码,但是RxJava的套路类似。
* 本文中只贴一下比较简单的Map的源码,便于讲解套路,对于后面比较复杂的操作符则以讲解思路为主,套路是一样的。
*/
现在的矛头就指向ObservableMap了。
package io.reactivex.internal.operators.observable;
import io.reactivex.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.BasicFuseableObserver;
//AbstractObservableWithUpstream 是 Observable的一个子类,除了接受两个类型外,内部还有一个ObservableSource<T>的实例,也就是我们“枪”的准星,它可以直接找到我们瞄准的"怪"。
//同时Observable 也需要实现ObservableSource<T>这个接口,所有上一段代码中的this就是这里的ObservableSource<T>
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//这是枪口转换器的实例
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
//枪口转换器赋值
this.function = function;
}
//Observable 是一个抽象类,而subscribeActual是Observable的抽象方法,
//所以作为子类,ObservableMap实现了subscribeActual
//而对于这个场景,subscribeActual才是实质上的瞄准。
@Override
public void subscribeActual(Observer<? super U> t) {
//我们枪口瞄准的其实不是吸血鬼,而是一个叫MapObserver的NPC
//这个NPC接收了吸血鬼t,和枪口转换器
source.subscribe(new MapObserver<T, U>(t, function));
}
//BasicFuseableObserver是实现了Observer的一个抽象类,这里就吧MapObserver当作Observer的一个实现即可。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//这个NPC把吸血鬼actual传递给了父类,但是它是protected的,所以我们仍然可以使用到
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//这个工具类我们前面提过,可以忽略,mapper.apply即调用了枪口转换器,这个v实质上是我们的银弹
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//这个actual 我们之前提过就是吸血鬼,onNext不必多说,是Observer中的一个方法,这里的效果等同于开枪。
//这里的调用其实可以看到,这个NPC拿到了我们瞄准的吸血鬼,并把子弹转成银弹,开了枪。
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
看完上面的注释,这里我们结合场景总结一下RxJava2的套路。
我们虽然瞄准的是吸血鬼,但是子弹出去必然要经过枪口转换器,而 枪口转换器里面有一个NPC,直接堵住了我们的枪口,所以,实质上,我们开枪打的是NPC,而NPC它知道我们要打吸血鬼,还要把子弹转成银弹。它调用了我们实现的方法把子弹转成银弹,并且瞄准了吸血鬼开了枪。
这里的Map
讲解完成,下面的几个操作符,不再贴完整代码,因为太长了。如果可以,你现在打开源码对照着读也许更容易理解哟(只是也许)。
FlatMap
我们再扩展一下游戏,这会儿子我们考虑一下中单后的事情。
// 定义子弹,增加子弹类型, 原来的银弹就不用了
class Bullet(val type: Type){
enum class Type{
NORMAL,
SILVER
}
}
//随机怪
class RandomEnemy: Observer<Bullet> {
private val r = Random()
private val type: Bullet.Type
init {
//初始化能打怪的子弹类型
type = getType()
}
private fun getType(): Bullet.Type{
return if(r.nextBoolean())
Bullet.Type.SILVER
else
Bullet.Type.NORMAL
}
...
override fun onNext(t: Bullet) {
Log.i("isOK",if(t.type == type) "OK" else "NO")
}
}
//刷一个随机怪
val r = RandomEnemy()
这下子完了,设定我们只能用那把枪,而那把枪只能打一枪啊,刷了一个随机怪,我不知道该用什么子弹打。
这个时候 ,就需要NPC的帮助了。
//定义一把npc的枪,但是这不是枪,它不发射子弹,只发射子弹类型的信号,是信号枪
class NPCGun : ObservableOnSubscribe<Bullet.Type>{
override fun subscribe(e: ObservableEmitter<Bullet.Type>) {
e.onNext(Bullet.Type.NORMAL)
e.onNext(Bullet.Type.SILVER)
}
}
//定义一个枪口转换器,把子弹转换成信号枪
class DoubleBullet: Function< Bullet, ObservableSource<Bullet.Type>>{
override fun apply(t: Bullet): ObservableSource<Bullet.Type> {
return Observable.create(NPCGun())
}
}
//定义一个把信号转成子弹的转换器
class IntMuzzle: Function<Bullet.Type,Bullet> {
override fun apply(t: Bullet.Type): Bullet {
return Bullet(t)
}
}
load.flatMap(DoubleBullet())//装好信号转换器
.map(IntMuzzle())//装好信号转子弹的
.subscribe(r)//瞄准随机怪
我们先装一个flatMap
的转换器,这个转换器里的NPC没有枪,我们给了它一把,这个NPC也有转化能力,把子弹转成信号,然后开了2枪。这个时候,我们再在flatMap
上装一个map
转换器,让信号变成子弹,然后打怪。这样虽然我只开了一枪,但是在NPC的帮助下,随机怪是被打了两枪,而且是被2种子弹打了2枪,这样必然能打死随机怪。
看到这里,不少小伙伴应该有2个疑问:
a) 既然flatMap
可以发射信号弹,为什么不直接发射子弹?
b) map
和flatMap
都是接收一个Function
接口,flatMap
是不是多余了,我也可以用map取实现。
这里我来解答一下,flatMap
当然可以直接发子弹,这里a)是为了解释b)即是剧情需要。
flatMap
里的NPC没有枪,而map
里的NPC有枪。如果你给map的Function
定义一把枪Function< Bullet, ObservableSource<Bullet>>
,那么它发射的不是子弹,而是枪啊。map里的NPC的枪就会变成ObservableSource<ObservableSource<Bullet>>
。
而flatMap接收的Function
相对较严格必须是Function<T, ObservableSource <R>>
。也就是你必须给到NPC一把枪,因为它没有枪。所以map
里的NPC会跟着你开枪,是听从你的意志,而flatMap
里的NPC会听从枪的意志(虽然本质上也是你的意志,毕竟枪是你给的)。所以,对于开枪map是1对1 的关系,而flatMap是1对n的关系,这取决于你给他的枪是什么样的。
而flatMap
里的实现很鸡贼,这里我们来看一下关键代码,请阅读注释。
//这里我们删去了太多废代码和与本场景无关的代码,主要看思路
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
...
@Override
public void subscribeActual(Observer<? super U> t) {
...
//这里的NPC叫 MergeObserver,至于它的参数,你只要知道t是我们的目标“随机怪”,mapper是我们的Function转换函数,把子弹转成信号枪的。
//其他参数和本场景无关
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
...
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
//准备拿枪了
ObservableSource<? extends U> p;
try {
//这里的t是我们发射出去的子弹,被NPC拿到了
//而NPC利用我们实现的接口,也就是mapper,把这个子弹转换成了枪
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
...
//这个函数就是鸡贼的地方,请认真阅读注释
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
//这个无限循环在本场景中无意义
for (;;) {
//这个if判断就是段废代码,这个p从出生到现在和callable没有半毛钱关系,请直接看else
if (p instanceof Callable) {
tryEmitScalar(((Callable<? extends U>)p));
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
break;
}
}
} else {
break;
}
} else {
//NPC自己造了一个怪
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
//这个addInner也和本场景无关,它只是把这个怪加到一个线性表中。
if (addInner(inner)) {
//关键在于这个订阅也就是"瞄准"
//我们给NPC的枪它并不瞄准我们指定的怪,而是瞄准了它自己造的怪
//而inner再中弹后调用了tryEmit,在tryEmit中再开枪打我们指定的怪
//这里可以停下来想想为什么,然后继续阅读下面的注释
p.subscribe(inner);
}
break;
}
}
}
...
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
//真正的射击我们指定的怪
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
...
}
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
...
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
//调用tryEmit,这里的parent再初始化的时候已经指向了我们的NPC
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
...
}
}
在flatMap
中的NPC,它拿到枪是打自己造的怪,再自己的怪中枪后会告诉NPC,我中枪了,这个时候NPC才会打我们指定的怪。
在前的map中提到过,onNext虽然可以等效成开枪射击,但是,它并不是真正意思上的开枪,因为我们的怪Observer已经订阅了我们的枪,这里NPC其实也是个Observer,但是它持有怪的Observer接口也就是actual,这里只是调用actual.onNext来模拟打枪,但实际上订阅的行为actual已经交给我们一开始定义的枪了,只是枪把接口给了NPC,让它可以把子弹投递到怪的脑袋里。这里你也许可以回头看看map的源码。会更好的理解。
而flatMap里的NPC它转化的不是子弹,而是枪,把枪投递到怪的脑子里没有用,只有子弹才有用,所以这把枪里的子弹要打出去。这才有了NPC自己造怪的事情,而NPC自己造的怪中弹后调用了parent.tryEmit,同时把自己中的弹吐出来给NPC,这个时候NPC就有子弹了,就可以调用actual.onNext来模拟打枪。
由于突然有了比较忙的工作安排,所以剩下2个变换操作符先留着下回再讲吧。也不知道我讲的大家看不看得懂。