3章 RxJava操作符

本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处

CSDN学院课程地址

3. RxJava操作符

RxJava操作符也是其精髓之一,可以通过一个简单的操作符,实现复杂的业务逻辑,甚至还可以将操作符组合起来(即RxJava的组合过程),完成更为复杂的业务需求。比如我们前面用到的.create().subscribeOn().observeOn().subscribe()都是RxJava的操作符之一,下面我们将对RxJava的操作符进行分析

掌握RxJava操作符前,首先要学会看得懂RxJava的图片,图片是RxJava主导的精髓,下面我们通过例子说明

image

这张图片我们先要分清楚概念上的东西,上下两行横向的直线区域代表着事件流,上面一行(上游)是我们的被观察者Observable,下面一行(下游)是我们的观察者Observer,事件流就是从上游的被观察者发送给下游的观察者的。而中间一行的flatMap区域则是我们的操作符部分,它可以对我们的数据进行变换操作。最后,数据流则是图片上的圆形、方形、菱形等区域,也是从上游流向下游的,不同的形状代表着不同的数据类型

image

这张图片并不是表示没有被观察者Observable,而是Create方法本身就是创建了被观察者,所以可以将被观察者的上游省略。在进行事件的onNext()分发后,执行onComplete()事件,这样就表示事件流已经结束,后续如果上游继续发事件,则下游表示不接收。当事件流的onCompleted()或者onError()正好被调用过一次后,此后就不能再调用观察者的任何其它回调方法

在理解RxJava操作符之前,需要将这几个概念弄明白,整个操作符的章节都是围绕这几个概念进行的

  • 事件流:通过发射器发射的事件,从发射事件到结束事件的过程,这一过程称为事件流
  • 数据流:通过发射器发射的数据,从数据输入到数据输出的过程,这一过程称为数据流
  • 被观察者:事件流的上游,即Observable,事件流开始的地方和数据流发射的地方
  • 观察者:事件流的下游,即Observer,事件流结束的地方和数据流接收的地方

3.1 Creating Observables (创建操作符)

1、create

image

Observable最原始的创建方式,创建出一个最简单的事件流,可以使用发射器发射特定的数据类型

public static void main(String[] args) {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    for (int i = 1; i < 5; i++) {
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {

                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onComplete

2、from

[图片上传失败...(image-81e090-1557713572966)]

创建一个事件流并发出特定类型的数据流,其发射的数据流类型有如下几个操作符

public static void main(String[] args) {
    Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

3、just

image

just操作符和from操作符很像,只是方法的参数有所差别,它可以接受多个参数

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

4、defer

image

defer与just的区别是,just是直接将发射当前的数据流,而defer会等到订阅的时候,才会去执行它的call()回调,再去发射当前的数据流。复杂点的理解就是:defer操作符是将一组数据流在原有的事件流基础上缓存一个新的事件流,直到有人订阅的时候,才会创建它缓存的事件流

public static void main(String[] args) {

    i = 10;

    Observable<Integer> just = Observable.just(i, i);
    Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            //缓存新的事件流
            return Observable.just(i, i);
        }
    });

    i = 15;

    just.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("onNext=" + integer);
        }
    });

    defer.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + (int) o);
        }
    });

    i = 20;

    defer.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + (int) o);
        }
    });
}

输出

onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20

5、interval

image

interval操作符是按固定的时间间隔发射一个无限递增的整数数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,interval默认在computation调度器上执行

public void interval() {
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
......

6、range

image

range操作符发射一个范围内的有序整数数据流,你可以指定范围的起始和长度

public static void main(String[] args) {
    Observable.range(1, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

7、repeat

image

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行

public static void main(String[] args) {
    Observable.just(1).repeat(5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=1
onNext=1
onNext=1
onNext=1

8、timer

image

timer操作符可以创建一个延时的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,默认在computation调度器上执行

public void timer() {
    Observable.timer(5, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}

输出

onNext=0

9、小结

  1. create():创建最简单的事件流
  2. from():创建事件流,可发送不同类型的数据流
  3. just():创建事件流,可发送多个参数的数据流
  4. defer():创建事件流,可缓存可激活事件流
  5. interval():创建延时重复的事件流
  6. range():创建事件流,可发送范围内的数据流
  7. repeat():创建可重复次数的事件流
  8. timer():创建一次延时的事件流

补充:interval()、timer()、delay()的区别

  1. interval():用于创建事件流,周期性重复发送
  2. timer():用于创建事件流,延时发送一次
  3. delay():用于事件流中,可以延时某次事件流的发送

3.2 Transforming Observables (转换操作符)

1、map

image

map操作符可以将数据流进行类型转换

public static void main(String[] args) {
    Observable.just(1).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "发送过来的数据会被变成字符串" + integer;
        }
    })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("onNext=" + s);
                }
            });
}

输出

onNext=发送过来的数据会被变成字符串1

2、flatMap

image

flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void flatMap() {
    Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(UserParams userParams) throws Exception {
            return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });
}

public static class UserParams {

    public UserParams(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String username;
    public String password;
}

输出

hensen登录成功

补充:

  • concatMap与flatMap功能一样,唯一的区别就是concatMap是有序的,flatMap是乱序的

3、groupBy

[图片上传失败...(image-d621c4-1557713572966)]

groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function所决定的,它可以将key和value按照Function的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()对数据项进行排序,然后输出有序的数据流。

public static void main(String[] args) {
    Observable.just("java", "c++", "c", "c#", "javaScript", "Android")
            .groupBy(new Function<String, Character>() {
                @Override
                public Character apply(String s) throws Exception {
                    return s.charAt(0);//按首字母分组
                }
            })
            .subscribe(new Consumer<GroupedObservable<Character, String>>() {
                @Override
                public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
                    //排序后,直接订阅输出key和value
                    characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
                        }
                    });
                }
            });
}

输出

onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript

4、scan

image

scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等

public static void main(String[] args) {
    Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            return integer < integer2 ? integer : integer2;
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer item) throws Exception {
                    System.out.println("onNext=" + item);
                }
            });
}

输出

onNext=8
onNext=2
onNext=2
onNext=1
onNext=1

5、buffer

image

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,然后再进行缓存和输出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .buffer(5).subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        System.out.println("onNext=" + integers.toString());
    }
});

输出

onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]

6、window

image

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
        @Override
        public void accept(Observable<Integer> integerObservable) throws Exception {
            integerObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
        }
    });
}

输出

onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9

7、小结

  1. map():对数据流的类型进行转换
  2. flatMap():对数据流的类型进行包装成另一个数据流
  3. groupby():对所有的数据流进行分组
  4. scan():对上一轮处理过后的数据流进行函数处理
  5. buffer():缓存发射的数据流到一定数量,随后发射出数据流集合
  6. window():缓存发射的数据流到一定数量,随后发射出新的事件流

3.3 Filtering Observables (过滤操作符)

1、debounce

image

debounce操作符会去过滤掉发射速率过快的数据项,下面的例子onNext事件可以想象成按钮的点击事件,如果在2秒种内频繁的点击,则其点击事件会被忽略,当i为3的除数的时候,发射的事件的时间会超过规定忽略事件的时间,那么则允许触发点击事件。这就有点像我们频繁点击按钮,但始终只会触发一次点击事件,这样就不会导致重复去响应点击事件

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 100; i++) {
                if (i % 3 == 0) {
                    Thread.sleep(3000);
                } else {
                    Thread.sleep(1000);
                }
                emitter.onNext(i);
            }
        }
    }).debounce(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......

2、distinct

image

distinct操作符会过滤重复发送的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3).distinct()
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4

3、elementAt

image

elementAt操作符只取指定的角标的事件

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1

4、filter

image

filter操作符可以过滤指定函数的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer > 2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=3
onNext=4
onNext=3

5、first

image

first操作符只发射第一项数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .first(7)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1

6、ignoreElements

image

ignoreElements操作符不发射任何数据,只发射事件流的终止通知

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .ignoreElements()
            .subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onComplete

7、last

image

last操作符只发射最后一项数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .last(7)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=3

8、sample

image

sample操作符会在指定的事件内从数据项中采集所需要的数据,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void sample() {
    Observable.interval(1, TimeUnit.SECONDS)
            .sample(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}

输出

onNext=2
onNext=4
onNext=6
onNext=8

9、skip

image

skip操作符可以忽略事件流发射的前N项数据项,只保留之后的数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .skip(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

输出

onNext=4
onNext=5
onNext=6
onNext=7
onNext=8

10、skipLast

image

skipLast操作符可以抑制事件流发射的后N项数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .skipLast(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

11、take

image

take操作符可以在事件流中只发射前面的N项数据

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .take(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3

12、takeLast

image

takeLast操作符事件流只发射数据流的后N项数据项,忽略前面的数据项

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .takeLast(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

输出

onNext=6
onNext=7
onNext=8

还有一个操作符叫takeLastBuffer,它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个List再发射,而不是依次发射一个

13、小结

  1. debounce():事件流只发射规定范围时间内的数据项
  2. distinct():事件流只发射不重复的数据项
  3. elementAt():事件流只发射第N个数据项
  4. filter():事件流只发射符合规定函数的数据项
  5. first():事件流只发射第一个数据项
  6. ignoreElements():忽略事件流的发射,只发射事件流的终止事件
  7. last():事件流只发射最后一项数据项
  8. sample():事件流对指定的时间间隔进行数据项的采样
  9. skip():事件流忽略前N个数据项
  10. skipLast():事件流忽略后N个数据项
  11. take():事件流只发射前N个数据项
  12. takeLast():事件流只发射后N个数据项

3.4 Combining Observables (组合操作符)

1、merge/concat

image

merge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。由于concat操作符和merge操作符的效果是一样的,这里只举一例

merge和concat的区别

  • merge():合并后发射的数据项是无序的
  • concat():合并后发射的数据项是有序的
public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
        @Override
        public void accept(Serializable serializable) throws Exception {
            System.out.println("onNext=" + serializable.toString());
        }
    });
}

输出

onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

2、zip

image

zip操作符是将两个数据流进行指定的函数规则合并

public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    Observable.zip(just1, just2, new BiFunction<String, String, String>() {
        @Override
        public String apply(String s, String s2) throws Exception {
            return s + s2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

输出

onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5

3、startWith

image

startWith操作符是将另一个数据流合并到原数据流的开头

public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    just1.startWith(just2).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E

4、join

image

join操作符是有时间期限的合并操作符,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void join() {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

    just1.join(just2, new Function<String, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(String s) throws Exception {
            return Observable.timer(3, TimeUnit.SECONDS);
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long l) throws Exception {
            return Observable.timer(8, TimeUnit.SECONDS);
        }
    }, new BiFunction<String, Long, String>() {
        @Override
        public String apply(String s, Long l) throws Exception {
            return s + l;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

join操作符有三个函数需要设置

  • 第一个函数:规定just2的过期期限
  • 第二个函数:规定just1的过期期限
  • 第三个函数:规定just1和just2的合并规则

由于just2的期限只有3秒的时间,而just2延时1秒发送一次,所以just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点类似我们的排列组合

onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1

5、combineLatest

image

conbineLatest操作符会寻找其他事件流最近发射的数据流进行合并,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public static String[] str = {"A", "B", "C", "D", "E"};

public void combineLatest() {
    Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
        @Override
        public String apply(Long aLong) throws Exception {
            return str[(int) (aLong % 5)];
        }
    });
    Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

    Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
        @Override
        public String apply(String s, Long l) throws Exception {
            return s + l;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

输出

onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5

6、小结

  1. merge()/concat():无序/有序的合并两个数据流
  2. zip():两个数据流的数据项合并成一个数据流一同发出
  3. startWith():将待合并的数据流放在自身前面一同发出
  4. join():将数据流进行排列组合发出,不过数据流都是有时间期限的
  5. combineLatest():合并最近发射出的数据项成数据流一同发出

3.5 Error Handling Operators(错误处理操作符)

1、onErrorReturn

image

onErrorReturn操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

2、onErrorResumeNext

image

onErrorResumeNext操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                @Override
                public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                    return Observable.just(-1);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

3、onExceptionResumeNext

image

onExceptionResumeNext操作符表示当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onException crash"));
                    //e.onError(new Error("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onExceptionResumeNext(new ObservableSource<Integer>() {
                @Override
                public void subscribe(Observer<? super Integer> observer) {
                    //备用事件流
                    observer.onNext(8);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=8

4、retry

image

retry操作符表示当错误发生时,发射器会重新发射

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if (i == 4) {
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .retry(1)
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
  • retry():表示重试无限次
  • retry(long times):表示重试指定次数
  • retry(Func predicate):可以根据函数参数中的Throwable类型和重试次数决定本次需不需要重试

5、retryWhen

image

retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

private static int retryCount = 0;
private static int maxRetries = 2;

public void retryWhen(){
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if (i == 4) {
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {
                            if (++retryCount <= maxRetries) {
                                // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
                                return Observable.timer(1, TimeUnit.SECONDS);
                            }
                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

6、小结

  • onErrorReturn():当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
  • onErrorResumeNext():当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
  • onExceptionResumeNext():当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
  • retry():当错误发生时,发射器会重新发射
  • retryWhen():当错误发生时,根据Tharowble类型决定发射器是否重新发射

3.6 Observable Utility Operators(辅助性操作符)

1、delay

image

delay操作符可以延时某次事件发送的数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void deley() {
    Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

delay和delaySubscription的效果是一样的,只不过delay是对数据流的延时,而delaySubscription是对事件流的延时

2、do

image

do操作符可以监听整个事件流的生命周期,do操作符分为多个类型,而且每个类型的作用都不同

  1. doOnNext():接收每次发送的数据项
  2. doOnEach():接收每次发送的数据项
  3. doOnSubscribe():当事件流被订阅时被调用
  4. doOnDispose():当事件流被释放时被调用
  5. doOnComplete():当事件流被正常终止时被调用
  6. doOnError():当事件流被异常终止时被调用
  7. doOnTerminate():当事件流被终止之前被调用,无论正常终止还是异常终止都会调用
  8. doFinally():当事件流被终止之后被调用,无论正常终止还是异常终止都会调用
public static void main(String[] args) {
    Observable.just(1, 2, 3)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("doOnNext");
                }
            })
            .doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("doOnEach");
                }
            })
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("doOnSubscribe");
                }
            })
            .doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnDispose");
                }
            })
            .doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnTerminate");
                }
            })
            .doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("doOnError");
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnComplete");
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doFinally");
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally

3、materialize/dematerialize

image
image

materialize操作符将发射出的数据项转换成为一个Notification对象,而dematerialize操作符则是跟materialize操作符相反,这两个操作符有点类似我们Java对象的装箱和拆箱功能

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).materialize()
            .subscribe(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("onNext=" + integerNotification.getValue());
                }
            });
    
    Observable.just(1, 2, 3, 4, 5).materialize().dematerialize()
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object object) throws Exception {
                    System.out.println("onNext=" + object.toString());
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

输出的时候,materialize会输出多个null,是因为null的事件为onCompleted事件,而dematerialize把onCompleted事件给去掉了,这个原因也可以从图片中看出来

4、serialize

image

serialize操作符可以将异步执行的事件流进行同步操作,直到事件流结束

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).serialize()
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

5、timeInterval

image

timeInterval操作符可以将发射的数据项转换为带有时间间隔的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeInterval(){
    Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS)
            .subscribe(new Consumer<Timed<Long>>() {
                @Override
                public void accept(Timed<Long> longTimed) throws Exception {
                    System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
                }
            });
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 我习以为常地走过去后,听到不远处传来一句轻声的话语。 “跟我回家吧。” 回过头,男生已经把猫咪抱了起来,湿漉漉的猫...
    那么幸福吧阅读 137评论 0 0
  • I like Spring Festval. Their first day lunar year is Spri...
    浅言_b299阅读 308评论 0 1
  • 我将思念编织成风筝儿 放飞去远方 我还没来的及牵住她的手 就任由随风的翱翔 在繁花不知落叶的地方 在光芒愤恨而去的...
    一字亦文阅读 243评论 5 3
  • 今天砍柴书院讲以后的日更挪到简书。其实我简单的日更都没有做到,要把简书用起来了。 刚才书院提倡大家一起得简书钻,我...
    风飘啊飘阅读 232评论 2 1