4. Rxjava2 : 变换操作符

1. RxJava2 : 什么是观察者模式
2. RxJava2 : 创建操作符(无关时间)
3. Rxjava2 : 创建操作符(有关时间)
4. Rxjava2 : 变换操作符
5. Rxjava2 : 判断操作符
6. Rxjava2 : 筛选操作符
7. Rxjava2 : 合并操作符
8. Rxjava2 : do操作符
9. Rxjava2 : error处理
10. Rxjava2 : 重试
11. Rxjava2 : 线程切换

api use
map {{map}}
flatmap / concatMap {{flatmap}}
buffer {{buffer}}

map

  1. 操作的是每一个元素,所有元素均会发生改变
  2. 但元素的个数不会改变
  3. 是否会发送onComplete,取决于创建方式是否会发送
Observable.just(1, 2, 3)
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        return integer + 10;
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "integer:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

log

02-12 15:39:20.253 31399-31399/... D/SplashActivity: integer:11
02-12 15:39:20.253 31399-31399/... D/SplashActivity: integer:12
02-12 15:39:20.253 31399-31399/... D/SplashActivity: integer:13
02-12 15:39:20.253 31399-31399/... D/SplashActivity: onComplete

flatmap / concatMap

  • flatmap
    1.与map类似,操作的同样是每一个元素
    2.区别是,可以将元素转换成Oberservable,然后再继续发送,也就是说数量可变
    3.元素是无序的
    4.是否会发送onComplete,取决于创建方式是否会发送

  • concatMap
    均于flatmap相同,只是元素是有序的

流程

Observable.just(1,2,3)
->
元素 1,元素 2, 元素 3
他们全部来自于一个Observable
->
元素 1 -> Observable (元素:1 子元素0, 元素:1 子元素1, 元素:1 子元素2)
...
->
现在一共有3*3,9个元素
->
subscribe
->
Observer
这9个元素可能会是无序的
Disposable disposable = Observable
                .just(1, 2, 3)
                .flatMap((Function<Integer, ObservableSource<String>>) integer -> {
                    List<String> strings = new ArrayList<>();
                    strings.add("event: " + integer + " list: 0");
                    strings.add("event: " + integer + " list: 1");
                    strings.add("event: " + integer + " list: 2");
                    return Observable.fromIterable(strings);
                }).subscribe(
                        s -> {
                            Log.d(TAG, s);
                        });

log

01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 1 list: 0
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 1 list: 1
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 1 list: 2
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 2 list: 0
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 3 list: 1
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 2 list: 2
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 2 list: 1
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 3 list: 0
01-30 15:03:19.086 15381-15381/... D/ShellAppApplication: event: 3 list: 2

buffer

  • 基于数量
    1.参数1: 每回取的数量,参数2: 跳过的数量
    2.如果未指定跳过的数量,则取了几个就跳过几个
    3.如果指定了跳过的数量,则按照指定跳过,并且list.size() = 发送数量/skip+1
    4.是否会发送onComplete,取决于创建方式是否会发送
//不采用跳过
Observable.just(1,2,3,4,5)
                .buffer(3)
.subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<Integer> integers) {
                Log.d(TAG, "integers:" + integers);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

log

02-12 16:08:07.723 32515-32515/... D/SplashActivity: integers:[1, 2, 3]
02-12 16:08:07.723 32515-32515/... D/SplashActivity: integers:[4, 5]
02-12 16:08:07.723 32515-32515/... D/SplashActivity: onComplete
//采用跳过,即可看到,最后一个集合只包含一个[5]
Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Integer> integers) {
                        Log.d(TAG, "integers:" + integers);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-12 16:12:56.273 531-531/... D/SplashActivity: integers:[1, 2, 3]
02-12 16:12:56.273 531-531/... D/SplashActivity: integers:[3, 4, 5]
02-12 16:12:56.273 531-531/... D/SplashActivity: integers:[5]
02-12 16:12:56.273 531-531/... D/SplashActivity: onComplete
  • 基于时间
    1.发送的元素,并不会直接用于Observer,而是会暂存在一个缓存区当中,以指定的时间去取
    2.可能会存在取不到值的情况,缓存区当中没有值
    3.最后一次取值时间会于之前不同
    4.是否会发送onComplete,取决于创建方式是否会发送

流程

元素 时间 所取值
in 0 没取
0 1 没取
1 2 第一次取值,1还没有进入缓存区,只取到0
2 3 没取
3 4 第二次取值,现在缓存区中存在1,2
4 5 因为这是最后两个值了,所以会提前取,全部取出
Log.d(TAG, "in");
        Observable.interval(1, TimeUnit.SECONDS)
                .take(5)
                .buffer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<List<Long>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Long> longs) {
                        Log.d(TAG, "longs:" + longs);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-12 16:19:12.293 1345-1345/... D/SplashActivity: in
02-12 16:19:14.303 1345-1401/... D/SplashActivity: longs:[0]
02-12 16:19:16.303 1345-1401/... D/SplashActivity: longs:[1, 2]
02-12 16:19:17.303 1345-1402/... D/SplashActivity: longs:[3, 4]
02-12 16:19:17.303 1345-1402/... D/SplashActivity: onComplete
  • 数量和时间同时作用

流程

元素 时间 所取值
in 0 没取
0 1 没取
1 2 第一次取值,满足的是数量,取0,1
2 3 第二次取值,满足的是时间,这个时候,缓存区中的元素已经通过数量取走了,因此什么也没有取到
3 4 没取
4 5 第三次取值,满足的是数量,取2,3
5 6 第四次取值,满足的是时间,这个时候,缓存区中的元素只剩下一个4
6 7 第五次取值,满足的是数量,取5,6 同时进行第六次取值,因为结束,触发时间取值,但什么也没有取到
Log.d(TAG, "in");
        Observable.interval(1, TimeUnit.SECONDS)
                .take(7)
                .buffer(3, TimeUnit.SECONDS,2)
                .subscribe(new Observer<List<Long>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Long> longs) {
                        Log.d(TAG, "longs:" + longs);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

log

02-12 16:31:47.043 2152-2152/... D/SplashActivity: in
02-12 16:31:49.063 2152-2225/... D/SplashActivity: longs:[0, 1]
02-12 16:31:50.053 2152-2224/... D/SplashActivity: longs:[]
02-12 16:31:51.053 2152-2225/... D/SplashActivity: longs:[2, 3]
02-12 16:31:53.053 2152-2224/... D/SplashActivity: longs:[4]
02-12 16:31:54.053 2152-2225/... D/SplashActivity: longs:[5, 6]
02-12 16:31:54.053 2152-2225/... D/SplashActivity: longs:[]
02-12 16:31:54.053 2152-2225/... D/SplashActivity: onComplete
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容