一.引言
Operators 操作符是为了实现对被观察者发射事件或数据的变换,用于在被观察者Observable/Flowable和目标观察者Observer/Subscriber之间修改Observable发出的事件。
RxJava提供了很多很有用的操作符,按照功能可以主要分为一下类型:
● 创建操作符-- Creating Observables
● 变换操作符--Transforming Observables
● 过滤操作符-- Filtering Observables
● 合并操作符-- Combining Observables
● 布尔操作符-- Boolean Operators
● 事件流操作符--Observable Utility Operators
1.创建操作符 Creating Observables
用来创建新的Observables.主要有以下方法:
●Create
●Defer
●Empty/Never/Throw
●From
● Interval
●Just
● Range
● Repeat
● Start
● Timer
创建操作符在之前讲Observable/Flowable的创建时已经讲过。
2.变换操作符Transforming Observables
将上级的数据处理变换后再发射出去,主要有以下方法:
●Buffer
●FlatMap
● GroupBy
● Map
● Scan
● Window
1.Buffer
Buffer操作符定期收集Observable的数据放进一个数据集合里,然后发射这些数据集合,而不是一次发射一个源数据的值。
Observable.fromArray(arr)
.buffer(3,1)
.subscribe(getStringListObserver("buffer"));
// 3 means, 每创建一个list的长度 (it takes max of three from its start index and create list)
// 1 means, 每创建一个list,开始下标跳过的个数(it jumps one step every time)
//输出
// a,b,c
// b,c,d
// c,d,e
// d,e
// e
Observable.fromArray(arr)
.buffer(2)//默认skip 和count相同
.subscribe(getStringListObserver("buffer"));
//输出
// a,b,
// c,d,
// e
```
**2.Map**
将上级数据修改,变换处理后,在发射到下一级,变换数据是一对一的。
```java
Observable.just(1,2,3,4,5)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer+".xxxxx";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print(s);
}
});
//输出结果
// 1.xxxxx
// 2.xxxxx
// 3.xxxxx
// 4.xxxxx
// 5.xxxxx
3. FlatMap
将上级数据修改,变换处理后,在发射到下一级,变换数据可以一对一,也可以一对多。
Observable.just(1,2,3,4,5)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(integer+".x");
e.onNext(integer+".xx");
e.onNext(integer+".xxx");
e.onNext(integer+".xxxx");
e.onComplete();
}
});
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("accept:"+s);
}
});
Observable.just(1,2,3,4,5).flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
return Observable.just(integer * 2);
}
}, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("accept:"+integer);
}
});
//输出结果
// accept:1.x
// accept:1.xx
// accept:1.xxx
// accept:1.xxxx
// accept:2.x
// accept:2.xx
// accept:2.xxx
// accept:2.xxxx
// accept:3.x
// accept:3.xx
// accept:3.xxx
// accept:3.xxxx
// accept:4.x
// accept:4.xx
// accept:4.xxx
// accept:4.xxxx
// accept:5.x
// accept:5.xx
// accept:5.xxx
// accept:5.xxxx
// accept:3
// accept:6
// accept:9
// accept:12
// accept:15
4.GroupBy
GroupBy操作符将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小的Observable分别发射其所包含的的数据。
String[] arr = new String[]{"aaa", "bb", "ccc", "dd", "eee"};
Observable.fromArray(arr).groupBy(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
return s.length()== 3;
}
}).subscribe(new Consumer<GroupedObservable<Boolean, String>>() {
@Override
public void accept(final GroupedObservable<Boolean, String> booleanStringGroupedObservable) throws Exception {
if(booleanStringGroupedObservable.getKey()) {
booleanStringGroupedObservable.toList().subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
print("key=" + booleanStringGroupedObservable.getKey() + ",val=" + strings);
}
});
}else{
booleanStringGroupedObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("key=" + booleanStringGroupedObservable.getKey() + ",val=" + s+s);
}
});
}
}
});
//输出结果
// key=false,val=bbbb
// key=false,val=dddd
// key=true,val=[aaa, ccc, eee]
5. Scan
Scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用。
Observable.just(1,2,3,4,5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print(integer+"");
}
});
Observable.fromArray(new String[]{"a","b","c","d","e"}).scan(new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s+s2;
}
}).subscribe(getStringObserver("Scan"));
//输出结果
//1 3 6 10 15
// Scan-->String onSubscribe
// Scan-->String onNext : value :a
// Scan-->String onNext : value :ab
// Scan-->String onNext : value :abc
// Scan-->String onNext : value :abcd
// Scan-->String onNext : value :abcde
// Scan-->String onComplete
6.Window
window操作符会在时间间隔内缓存结果,类似于buffer缓存一个list集合,区别在于window将这个结果集合封装成了observable
Observable.interval(300, TimeUnit.MILLISECONDS).take(50)
.window(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> longObservable) throws Exception {
print("Window accept---------");
longObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
print(aLong+"");
}
});
}
});
//输出结果:
//Window accept---------
//0
//1
//...
//15
//Window accept---------
//16
//...
//32
//Window accept---------
//33
//...
//48
//Window accept---------
//49