本教程均是基于java的项目:
Buffer — 周期性收集Obserable产生结果到集合中,并一次性发送它。
private static void buffer() {
ArrayList<Integer> list = new ArrayList<>();
for (int i = 1; i < 11; i++) {
list.add(i);
}
Observable
.fromIterable(list)
.buffer(2, 3) //一次收集2个,下次跳过3个收集
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
for (Integer integer : integers) {
System.out.println("accept: " + integer);
}
}
});
}
输出结果:
accept: 1
accept: 2
accept: 4
accept: 5
accept: 7
accept: 8
accept: 10
FlatMap — 可以应用一个函数把Observable事件转换到Observables,然后再通过一个Obserable发射出去,需要注意flatMap 并不能保证事件的顺序。
private static void flatMap() {
Observable
.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
System.out.println(Thread.currentThread() + " apply " + integer);
return Observable.just("this is " + integer).delay(3, TimeUnit.SECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(Thread.currentThread() + " accept: " + s);
}
});
while (true) ;
}
输出结果:
Thread[RxCachedThreadScheduler-2,5,main] apply 1
Thread[RxCachedThreadScheduler-2,5,main] apply 2
Thread[RxCachedThreadScheduler-2,5,main] apply 3
Thread[RxCachedThreadScheduler-2,5,main] apply 4
Thread[RxCachedThreadScheduler-2,5,main] apply 5
Thread[RxComputationThreadPool-2,5,main] accept: this is 2
Thread[RxComputationThreadPool-2,5,main] accept: this is 3
Thread[RxComputationThreadPool-4,5,main] accept: this is 4
Thread[RxComputationThreadPool-1,5,main] accept: this is 1
Thread[RxComputationThreadPool-1,5,main] accept: this is 5
可以看到我们apply的时候是1 2 3 4 5,订阅收到的确是2 3 4 1 5,可能下一次运行又不是这个顺序了,需要保证顺序则可以使用 concatMap替换flatMap。
Map — 通过一个函数(apply)转换通过Observable发射的项目。
private static void map() {
Observable
.just(1, 2, 3, 4, 5)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "result " + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept " + s);
}
});
}
输出结果:
accept result 1
accept result 2
accept result 3
accept result 4
accept result 5
GroupBy — 对源Observable结果分组,转换成GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值。
由于GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。所以你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。
private static void groupBy() {
Observable
.range(0, 9)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer value) throws Exception {
return value % 3; //返回值决定组名 这里分了0 1 2三组
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(final GroupedObservable<Integer, Integer> res) throws Exception {
res.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer aLong) throws Exception {
System.out.println("group: " + res.getKey() + " - " + aLong);
}
});
}
});
}
输出结果:
group: 0 - 0
group: 1 - 1
group: 2 - 2
group: 0 - 3
group: 1 - 4
group: 2 - 5
group: 0 - 6
group: 1 - 7
group: 2 - 8
Scan — scan对迭代源Observable产生的结果应用一个函数,将结果发射出去并作为下次迭代的一个参数。
private static void scan() {
Observable
.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer sum, @NonNull Integer item) throws Exception {
System.out.println("sum " + sum);
return sum + item;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Next: " + integer);
}
});
}
输出结果:
Next: 1
sum 1
Next: 3
sum 3
Next: 6
sum 6
Next: 10
sum 10
Next: 15
Window — 类似于buffer,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理
private static void window() {
ArrayList<Integer> list = new ArrayList<>();
for (int i = 1; i < 11; i++) {
list.add(i);
}
Observable
.fromIterable(list)
.window(2, 3)
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> ob) throws Exception {
ob.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept: " + integer);
}
});
}
});
}
输出结果:
accept: 1
accept: 2
accept: 4
accept: 5
accept: 7
accept: 8
accept: 10