RxJava中的flatMap源码分析

本文的分析基于RxJava1.1.5版本,flatMap是为了一对多的转换而设计的,具体的实现运用了merge和map的操作,而最终也还是基于了lift()方法,是转换的思想,下面是具体的分析

1、首先创建一个简单的例子,代码如下

        final List<Student> students = new ArrayList<>();

        List<Course> jayList = new ArrayList<>();
        jayList.add(new Course("语文", "何炅"));
        jayList.add(new Course("英语", "谢娜"));
        jayList.add(new Course("物理", "何时风"));
        students.add(new Student(1, "周杰伦", jayList));

        List<Course> jjList = new ArrayList<>();
        jjList.add(new Course("数学", "邓军权"));
        jjList.add(new Course("生物", "摇风"));
        jjList.add(new Course("物理", "何时风"));
        jjList.add(new Course("语文", "何炅"));
        students.add(new Student(2, "林俊杰", jjList));

        List<Course> luhanList = new ArrayList<>();
        luhanList.add(new Course("英语", "谢娜"));
        luhanList.add(new Course("生物", "摇风"));
        luhanList.add(new Course("语文", "何炅"));
        students.add(new Student(3, "鹿晗", luhanList));

        Observable.create(new Observable.OnSubscribe<Student>() {
            @Override
            public void call(Subscriber<? super Student> subscriber) {
                for (Student s : students) {
                    subscriber.onNext(s);
                }
            }
        }).flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                Log.e("TAG", "学生名称为:" + student.getName());
                return Observable.from(student.getmList());
            }
        }).subscribe(new Subscriber<Course>() {
            @Override
            public void onCompleted() {
                Log.e("TAG", "---onComplete()------");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG", "---onError()------");
            }

            @Override
            public void onNext(Course course) {
                Log.e("TAG", "课程名称为:" + course.getCourseName() + ", 任课老师为:" + course.getTechName());
            }
        });

以上用到的Student类还有Course类如下

class Student {
        private int id;
        private String name;
        private List<Course> mList;

        public Student(int id, String name, List<Course> mList) {
            this.id = id;
            this.name = name;
            this.mList = mList;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<Course> getmList() {
            return mList;
        }

        public void setmList(List<Course> mList) {
            this.mList = mList;
        }
    }

    class Course {
        private String courseName;
        private String techName;

        public Course(String courseName, String techName) {
            this.courseName = courseName;
            this.techName = techName;
        }

        public String getCourseName() {
            return courseName;
        }

        public void setCourseName(String courseName) {
            this.courseName = courseName;
        }

        public String getTechName() {
            return techName;
        }

        public void setTechName(String techName) {
            this.techName = techName;
        }
    }

2、下面是具体的分析

首先进入到flatMap()方法中,flatMap的代码如下

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

判断直接跳过,主要看返回值,返回值调用了merge()方法,并且以map()方法的返回值作为参数,那么我们首先进入到map()方法中看看

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

这个方法中将会调用以func1对象为参数,创建OperatorMap对象,然后将OperatorMap对象作为参数调用lift()方法,那么进入到lift()方法看看

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

这个方法中将以初始被观察者对象中的onSubscribe(本文中我们将初始被观察者对象称为ob_init,将ob_init中的onSubscribe称为onSub_init)和OperatorMap对象为参数创建第一个OnSubscribeLift对象(称为onSublift_one),同时以onSublift_one为参数创建新的被观察者对象(称为ob_one),那么到此map完毕,它将将ob_one返回作为merge()方法的参数,那么下面进入到merge()方法中

public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }

在merge()方法中,前面判断忽略,直接看返回值,发现它将用ob_one去调用lift()方法,并且会创建OperatorMerge对象作为lift()方法的参数,那么通过看前面lift()方法的作用,我们可以知道,它将会以OperatorMerge对象和ob_one中的onSubscribe作为参数再次创建新的OnSubscribeLift对象(称为onSublift_merge),同时会以onSublift_merge作为参数,再次创建新的被观察者对象(称为ob_merge),那么现在我们就可以知道,flatMap()方法的最终返回值为ob_merge对象,那么下面ob_merge将会调用订阅方法subscribe(),并且会传入初始观察者对象(称为sub_init),那么下面进入到subscribe()中看看

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
   
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
        
    // new Subscriber so onStart it
    subscriber.onStart();
       
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
           异常忽略...
    }
}

subscribe()最终会调用静态的subscribe()方法,传入的参数为sub_init对象和ob_merge对象,忽略掉前面的判断直接到hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber)这一句,在这里onSubscribeStart方法将原路返回传入的observable.onSubscribe,那么传入的传入的observable.onSubscribe其实就是ob_merge中的onSubscribe,那么它调用的call()方法应该就是onSublift_merge对象中的call()方法,也就是OnSubscribeLift类中的call()方法,传入的参数为sub_init,下面进入到该call()方法看看

 public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

在这个方法中,onLift()方法将会将传入的参数原路返回,也就是返回值就是传入的operator,这个operator就是在创建onSublift_merge对象时保存的operator,也就是OperatorMerge对象,那么也就是会调用OperatorMerge对象中的call()方法,传入的参数是sub_init,下面进入到OperatorMerge对象中的call()方法

@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
    MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
    MergeProducer<T> producer = new MergeProducer<T>(subscriber);
    subscriber.producer = producer;
        
    child.add(subscriber);
    child.setProducer(producer);
        
    return subscriber;
}

在这个方法中的主要作用就是将sub_init对象进行包装,重新创建一个观察者对象(称为sub_merge),并且返回该对象,那么在OnSubscribeLift类中的call()方法中的Subscriber<? super T> st = hook.onLift(operator).call(o)这个操作所创建的观察者对象就为sub_merge,接着call()方法会执行,parent.call(st),这里传入的参数就是sub_merge,但是这里需要特别注意,parent的值为ob_one对象中的onSubscribe,也就是在利用map()方法创建的被观察者对象中的onSubscribe,那么它调用的call()方法就是OnSubscribeLift类中的call()方法,所以程序将再次执行OnSubscribeLift类中的call()方法,这次传入的参数是sub_merge,那么这次的operator就是OperatorMap对象,那么它以sub_merge为参数调用call()方法,调用的就是OperatorMap类中的方法,下面进入到该方法

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
    o.add(parent);
    return parent;
}

在这个方法中,将会以sub_merge和func1对象(transformer保存的就是func1对象)为参数创建新的观察者对象(称为sub_one),并且返回,那么在OnSubscribeLift类中的call()方法中返回的对象将是sub_one,那么继续往下执行,将再次来到parent.call(st),那么这次的st就是sub_one,parent就是ob_init对象中的onSubscribe,也就是初始被观察者对象中的onSubscribe,那么它调用的call()方法,将会回到一下代码

public void call(Subscriber<? super Student> subscriber) {
                for (Student s : students) {
                    subscriber.onNext(s);
                }
            }

现在的观察者对象已经是sub_one,那么它调用的onNext()方法就是OperatorMap类中的静态内部类MapSubscriber中的onNext()方法,那么进入到该方法

@Override
public void onNext(T t) {
    R result;
            
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }
            
    actual.onNext(result);
}

在这个方法中,主要就是result = mapper.call(t)这个操作,这里的mapper就是func1对象,那么func1对象调用的call()方法有回到了我们开始flatMap中的回调call()方法,它将会返回一个Observable对象,那么接着会调用actual.onNext(result),这里的actual就是sub_merge对象,也就是OperatorMerge类中创建的MergeSubscriber对象,那么调用它的onNext()方法,我们进入到它的onNext()方法看看,传入的参数是func1对象返回的Observable对象

@Override
public void onNext(Observable<? extends T> t) {
    if (t == null) {
        return;
    }
    if (t == Observable.empty()) {
        emitEmpty();
    } else
    if (t instanceof ScalarSynchronousObservable) {
        tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
    } else {
        InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
        addInner(inner);
        t.unsafeSubscribe(inner);
        emit();
    }
}

在这个方法中,忽略掉前面的判断,直接进入else分析,这里InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++)将会以sub_merge和uniqueId为参数再次创建一个观察者对象(称为inner_sub),然后t.unsafeSubscribe(inner)这个操作,因为t为func1对象所返回的Observable对象,所以将会将inner_sub为参数,调用unsafeSubscribe()方法,那么进入到unsafeSubscribe()方法

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

在这个方法中主要看hook.onSubscribeStart(this, onSubscribe).call(subscriber)这里,这里的onSubscribe就是func1对象返回的Observable对象中的onSubscribe,所以调用它的call()方法,那么在这个call()方法中肯定会调用subscribe.onNext()方法,那么这个subscriber就是传进来的参数,也就是inner_sub,那么将会调用inner_sub中的onNext()方法,也就是InnerSubscriber类中的onNext()方法,下面进入到该方法

public void onNext(T t) {   
      parent.tryEmit(this, t);
}

在这个方法中的parent就是创建inner_sub时的传入的父级观察者对象,也就是MergeSubscriber对象,也就是sub_merge,那么调用该对象的tryEmit()方法,下面进入该方法,传入的参数是inner_sub,t ( t为最终输出的数据)

void tryEmit(T value) {
    boolean success = false;
    long r = producer.get();
    if (r != 0L) {
        synchronized (this) {
            // if nobody is emitting and child has available requests
            r = producer.get();
            if (!emitting && r != 0L) {
                emitting = true;
                success = true;
            }
        }
    }
    if (success) {
        emitScalar(value, r);
    } else {
        queueScalar(value);
    }
}

这里的关键句在 emitScalar(value, r)这里,它将会将最终需要输出的值和r作为参数调用emitScalar()方法,下面进入到emitScalar()方法

protected void emitScalar(T value, long r) {
            boolean skipFinal = false;
            try {
                try {
                    child.onNext(value);
                } catch (Throwable t) {
                    if (!delayErrors) {
                        Exceptions.throwIfFatal(t);
                        skipFinal = true;
                        this.unsubscribe();
                        this.onError(t);
                        return;
                    }
                    getOrCreateErrorQueue().offer(t);
                }
                if (r != Long.MAX_VALUE) {
                    producer.produced(1);
                }
                
                int produced = scalarEmissionCount + 1;
                if (produced == scalarEmissionLimit) {
                    scalarEmissionCount = 0;
                    this.requestMore(produced);
                } else {
                    scalarEmissionCount = produced;
                }
                
                // check if some state changed while emitting
                synchronized (this) {
                    skipFinal = true;
                    if (!missed) {
                        emitting = false;
                        return;
                    }
                    missed = false;
                }
            } finally {
                if (!skipFinal) {
                    synchronized (this) {
                        emitting = false;
                    }
                }
            }
            emitLoop();
        }

在这个方法中的最主要的操作就是child.onNext(value)这个了,在这里,终于看到了child,这个child就是初始的观察者,也就是我们一开始创建的观察者,那么它调用onNext()方法就是以下代码

public void onNext(Course course) {
                Log.e("TAG", "课程名称为:" + course.getCourseName() + ", 任课老师为:" + course.getTechName());
            }

这个方法就是我们自己创建观察者对象时的回调方法,就是最终的调用方法,到这里,整个流程也就打通了,因为这是正常情况下的流程,所以忽略了很多的判断和特殊的情况,最后,这个过程实在是有点复杂,所以,可能描写的有点乱,望见谅

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容