此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著
3.2 转化Observable的操作符
如果说创建Observable是运用RxJava的基础的话, 那么转化Observable则是主干。创建一个Observable只是完成了工作的第一步,要想完成一些复杂的工作场景,还需要将创建的Observable按照一定规则进行转化。
3.2.1 buffer
buffer操作符所做的事情就是将数据按照规定的大小做一下缓存,当缓存的数据量达到设置的上限后就将缓存的数据作为一个集合发送出去。
buffer操作符还可以设置跳过的数目。加入一个skip参数,用来指定每次发送一个集合需要跳过几个数据。指定count为2,skip为3,这样每隔3个数据发送一个包含2个数据的集合。如果count大于或者等于skip的话,skip就失去了效果
buffer操作符不仅可以通过数量规则来缓存, 还可以通过时间等规则来缓存,如规定每3秒缓存发送一次。
private void buffer(){
Observable.just(1,2,3,4,5,6,7,8,9)
.buffer(2,3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println("buffer: "+ integers);
}
});
Observable.interval(1, TimeUnit.SECONDS)
.buffer(3, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> longs) {
System.out.println("bufferTime: "+ longs);
}
});
}
结果可以看到第一个Observable会每隔3个数字就发送出前2个数字;第二个Observable会每隔3秒输出2~4个数字
buffer: [1, 2]
buffer: [4, 5]
buffer: [7, 8]
bufferTime: [0, 1, 2]
bufferTime: [3, 4]
bufferTime: [5, 6, 7]
bufferTime: [8, 9, 10]
3.2.2 flatMap
flatMap是一个用处非常多的操作符。可以将数据根据我们想要的规则进行转发后再发送出去。其原理就是将这个Observable转化为多个以源Observable发送的数据作为源数据的Observable,然后将这多个Observable发送的数据整合并发送出来。需要注意的是,数据租后的顺序可能会有交错,如果对顺序有严格要求的话,可以使用concatmap操作符。flatMap还有一个扩展操作符flatMapIterable,基本相同,不同之处为flatMapIterable转化的多个Observable是使用Iterable作为源数据的。
private void flatmap() {
Observable.just(1, 2, 3)
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.just("flat map: " + integer);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String o) {
System.out.println(o);
}
});
Observable.just(1, 2, 3)
.flatMapIterable(new Func1<Integer, Iterable<String>>() {
@Override
public Iterable<String> call(Integer integer) {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
arrayList.add("flatmapIterable: " + integer);
}
return arrayList;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
flat map: 1
flat map: 2
flat map: 3
flatmapIterable: 1
flatmapIterable: 1
flatmapIterable: 1
flatmapIterable: 2
flatmapIterable: 2
flatmapIterable: 2
flatmapIterable: 3
flatmapIterable: 3
flatmapIterable: 3
3.2.3 groupBy
groupBy操作符会将源Observable发送的数据按照key来拆分成一些小的Observable,然后这些小的Observable分别发送其所包含的数据,类似SQL里面的groupBy。在使用时我们需要提供一个生成key的规则,所有key相同的数据会包含在同一个小的Observable中。另外我们还可以提供一个额外的函数来对这些数据进行转化,有点像集成了flatMap。
创建两个经过groupBy转化的Observable对象,第一个按照奇数偶数分组,第二个添加额外的函数,以实现分组后为数字加上一个字符串前缀。
private void groupBy() {
Observable.just(1,2,3,4,5,6,7,8,9)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer%2;
}
}).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
@Override
public void call(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
integerIntegerGroupedObservable.count().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("key"+ integerIntegerGroupedObservable.getKey()
+" contains:" + integer + " numbers");
}
});
}
});
Observable.just(1,2,3,4,5,6,7,8,9)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 2;
}
}, new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "groupByKeyValue: " + integer;
}
}).subscribe(new Action1<GroupedObservable<Integer, String>>() {
@Override
public void call(GroupedObservable<Integer, String> integerStringGroupedObservable) {
if(integerStringGroupedObservable.getKey() == 0) {
integerStringGroupedObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
}
});
}
需要注意的是,源Observable经过groupBy转化后发送出来的每个数据是一种特殊的Observable:GroupedObservable。它是Observable的一个子类,有一个自己的方法getKey(),可以返回当前GroupedObservable的key。在前一个订阅中,我们将每个GroupedObservable通过count操作符计算出其大小,然后将key和大小一起输出;在后一个订阅中,我们为所有的数据都加上了字符串前缀,并将key为0的GroupedObservable的所有数据都输出。结果如下
key0 contains:4 numbers
key1 contains:5 numbers
groupByKeyValue: 2
groupByKeyValue: 4
groupByKeyValue: 6
groupByKeyValue: 8
有点不太好理解,多看几遍吧
3.2.4 map
map操作符将源Observable发送的每个数据都按照给定的函数进行转化,并将转化后的数据发送出来。map操作符的功能类似于flatMap,不同之处在于map对数据直接进行函数转化,输入和输出是一对一的关系;而flatMap则将每个数据作为输入来创建一个新的Observable,并将所有的Observable再组合起来发送数据,输入和输出是一对多关系。所以我们在需要进行数据转化的时候应优先考虑使用map,这样可以得到更好的性能。
创建一个Observable,它能发送1~3的整数,然后只用map操作符将所有的数据都转化成乘以10之后的结果。
private void map(){
Observable.just(1,2,3).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer*10;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("map: "+ integer);
}
});
}
map: 10
map: 20
map: 30
3.2.5 cast
cast操作符将Observable发送的数据强制转化为另外一种类型,和Java Class类的cast方法很相似,属于map的一种具体实现。
要实验一下cast操作符的用处,我们先创建一个Animal类,然后创建一个继承自Animal类的Dog,最后通过一个返回类型为Animal的方法getAnimal创建一个Dog对象。但是因为getAnimal的返回类型为Animal,所以我们直接得到的只是一个Animal对象,这时可以使用cast操作符将其转化为Dog对象。
class Animal{
protected String name = "Animal";
Animal(){
System.out.println("create: " + name);
}
String getName(){
return name;
}
}
class Dog extends Animal{
Dog(){
name = getClass().getSimpleName();
System.out.println("create: " + name);
}
}
Animal getAnimal(){
return new Dog();
}
private void cast(){
Observable.just(getAnimal())
.cast(Dog.class)
.subscribe(new Action1<Dog>() {
@Override
public void call(Dog dog) {
System.out.println("cast: " + dog.name);
}
});
}
create: Animal
create: Dog
cast: Dog
但是经过实验,没有使用这个操作符的时候,输出结果同上呢
Observable.just(getAnimal())
.subscribe(new Action1<Animal>() {
@Override
public void call(Animal animal) {
System.out.println("no cast: " + animal.getName());
}
});
结果
create: Animal
create: Dog
no cast: Dog
总之呢,书上说说明cast操作符将Animal类型的对象强制转化成Dog类型的对象(有待考证)。还可以验证一个小知识点,有继承的情况下创建对象会先调用父类的构造方法,然后调用子类的。
3.2.6 scan
scan操作符对一个序列的数据应用同一个函数进行计算,并将这个函数的结果发送出去,作为下一个数据应用这个函数时的第一个参数使用。也就是说任何一个要发送出来的数据都是以上次发送出来的数据和这次源Observable发送的数据作为输入,并应用同一个函数计算后的结果。所以scan操作符非常类似于递归操作符。由于scan所使用的这个函数需要输入两个数据作为参数,而第一个发送出来的数据只有一个,所以不会对第一个数据做任何计算,会直接将它发送出来,并作为一个参数和第二个数据一同传入函数中进行计算。
/**
* 6. scan
*/
private ArrayList<Integer> arrayList = new ArrayList<>();
private void scan(){
for(int i= 1; i<=5; i++){
arrayList.add(i);
}
Observable.from(arrayList)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer * integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("scan: "+ integer);
}
});
}
结果如下
scan: 1
scan: 2
scan: 6
scan: 24
scan: 120
3.2.7 window
window操作符类似于buffer,不同之处在于buffer是将收集的数据打包作为整体发送出去,如发送一个List等,而window发送的是一些小的Observable对象,每个Observable对象包含window操作符规定的窗口里所收集到的数据,然后由这些小的Observable对象将其内部包含的数据一个个发送出来。如同buffer一样, window不仅可以通过数目来分组,还可以通过时间等规则来分组。
下面我们创建两个Observable对象,分别使用window的数目和时间规则来进行分组并进行订阅。因为window操作符发送出来的数据时一些小的Observable,所以我们还会对这些小的Observable进行订阅。
private void window(){
Observable.just(1,2,3,4,5,6,7,8,9)
.window(3)
.subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(Observable<Integer> integerObservable) {
System.out.println(integerObservable.getClass().getName());
integerObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("window1: "+ integer);
}
});
}
});
Observable.interval(1000, TimeUnit.MILLISECONDS)
.window(3000, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> longObservable) {
System.out.println(System.currentTimeMillis()/1000);
longObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("windowTime: "+ aLong);
}
});
}
});
}
结果如下, 好像每次使用interval都不正确,有待研究
rx.subjects.UnicastSubject
window1: 1
window1: 2
window1: 3
rx.subjects.UnicastSubject
window1: 4
window1: 5
window1: 6
rx.subjects.UnicastSubject
window1: 7
window1: 8
window1: 9
1554604413
到此, 转化操作符已经学完了。掌握好转化操作符的使用技巧基本就可以处理好大多数应用场景了。另外需要注意的是,记清楚map和flatMap,以及buffer和window等操作符之间的区别,以便根据实际的需要选择最合算的操作符。