Rxjava2 操作符 - Transforming Observable

本教程均是基于java的项目:

Buffer — 周期性收集Obserable产生结果到集合中,并一次性发送它。

    private static void buffer() {
        ArrayList<Integer> list = new ArrayList<>();
        for (int i = 1; i < 11; i++) {
            list.add(i);
        }
        Observable
                .fromIterable(list)
                .buffer(2, 3) //一次收集2个,下次跳过3个收集
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        for (Integer integer : integers) {
                            System.out.println("accept: " + integer);
                        }
                    }
                });
    }

输出结果:

accept: 1
accept: 2
accept: 4
accept: 5
accept: 7
accept: 8
accept: 10

FlatMap — 可以应用一个函数把Observable事件转换到Observables,然后再通过一个Obserable发射出去,需要注意flatMap 并不能保证事件的顺序。

    private static void flatMap() {
        Observable
                .just(1, 2, 3, 4, 5)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                        System.out.println(Thread.currentThread() + " apply " + integer);
                        return Observable.just("this is " + integer).delay(3, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(Thread.currentThread() + " accept: " + s);
                    }
                });
        while (true) ;
    }

输出结果:

Thread[RxCachedThreadScheduler-2,5,main] apply 1
Thread[RxCachedThreadScheduler-2,5,main] apply 2
Thread[RxCachedThreadScheduler-2,5,main] apply 3
Thread[RxCachedThreadScheduler-2,5,main] apply 4
Thread[RxCachedThreadScheduler-2,5,main] apply 5
Thread[RxComputationThreadPool-2,5,main] accept: this is 2
Thread[RxComputationThreadPool-2,5,main] accept: this is 3
Thread[RxComputationThreadPool-4,5,main] accept: this is 4
Thread[RxComputationThreadPool-1,5,main] accept: this is 1
Thread[RxComputationThreadPool-1,5,main] accept: this is 5

可以看到我们apply的时候是1 2 3 4 5,订阅收到的确是2 3 4 1 5,可能下一次运行又不是这个顺序了,需要保证顺序则可以使用 concatMap替换flatMap。

Map — 通过一个函数(apply)转换通过Observable发射的项目。

    private static void map() {
        Observable
                .just(1, 2, 3, 4, 5)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "result " + integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("accept " + s);
                    }
                });
    }

输出结果:

accept result 1
accept result 2
accept result 3
accept result 4
accept result 5

GroupBy — 对源Observable结果分组,转换成GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值。
由于GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。所以你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。

    private static void groupBy() {
        Observable
                .range(0, 9)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer value) throws Exception {
                        return value % 3;   //返回值决定组名 这里分了0 1 2三组
                    }
                })
                .subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
                    @Override
                    public void accept(final GroupedObservable<Integer, Integer> res) throws Exception {
                        res.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer aLong) throws Exception {
                                System.out.println("group: " + res.getKey() + " - " + aLong);
                            }
                        });
                    }
                });
    }

输出结果:

group: 0 - 0
group: 1 - 1
group: 2 - 2
group: 0 - 3
group: 1 - 4
group: 2 - 5
group: 0 - 6
group: 1 - 7
group: 2 - 8

Scan — scan对迭代源Observable产生的结果应用一个函数,将结果发射出去并作为下次迭代的一个参数。

    private static void scan() {
        Observable
                .just(1, 2, 3, 4, 5)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer sum, @NonNull Integer item) throws Exception {
                        System.out.println("sum " + sum);
                        return sum + item;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("Next: " + integer);
                    }
                });
    }

输出结果:

Next: 1
sum 1
Next: 3
sum 3
Next: 6
sum 6
Next: 10
sum 10
Next: 15

Window — 类似于buffer,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理

    private static void window() {
        ArrayList<Integer> list = new ArrayList<>();
        for (int i = 1; i < 11; i++) {
            list.add(i);
        }
        Observable
                .fromIterable(list)
                .window(2, 3)
                .subscribe(new Consumer<Observable<Integer>>() {
                    @Override
                    public void accept(Observable<Integer> ob) throws Exception {
                        ob.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                System.out.println("accept: " + integer);
                            }
                        });
                    }
                });

    }

输出结果:

accept: 1
accept: 2
accept: 4
accept: 5
accept: 7
accept: 8
accept: 10

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容