作者:IT魔幻师
博客:www.huyingzi.top
转载请注明出处://www.greatytc.com/p/afeba5aea533
一、创建型操作符
主要用于创建被观察者
-
just
create的快捷创建操作,create操作符必须手动调用onNext才能触发事件,just会自动触发@Test public void testjust() { //just是create的快捷创建操作 Observable.just("我是你爸爸","我是你爸爸2").subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { //此处会依次收到just参数传递过来的值 } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
-
fromArray
相比于just,fromArray适用于多参数的情况.@Test public void testFromArray() { Observable.fromArray(new String[]{"我是你爸爸", "我是你爸爸2", "我是你爸爸3", "我是你爸爸4"}).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println("onNext "+s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
-
range
创建在一定范围内的事件@Test public void testRange() { //从5开始执行11次事件 Observable.range(5,11).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
-
empty
主要适用于调用后不需要返回参数只需要关心结果,如:发起网络请求后在onComplete()中处理结果即可他不会回调onNext函数.@Test public void testempty() { Observable.empty().subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("执行结束"); } }); }
-
interval
定时器操作符,需要依赖Android的api不能在纯java环境下使用//每隔1单位秒的时间执行一次 Observable.interval(1, TimeUnit.SECONDS);
-
intervalRange
定时器操作符,需要依赖Android的api不能在纯java环境下使用//从0开始每隔1000毫秒发送50个事件 初始延时0 Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println(aLong); } });
timer
跟interval一样.
二、转换操作符
将事件类型转换成我们想要的结果
- map
@Test public void testMap() { //场景:根据图片地址最终转换成bitmap Observable.just("icon01.png","icon02.png").map(new Function<String, Bitmap>() { @Override public Bitmap apply(String url) throws Exception { //在此次模拟执行网络请求等操作 // ... 此处省略 Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888); return mBitmap; } }).subscribe(new Observer<Bitmap>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Bitmap bitmap) { //在此次就可以 以此得到请求到的图片 System.out.println("得到结果:"+bitmap); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
- flatMap
在上一个事件完成后才能开始下一个事件的情况@Test public void testFlatMap() { //比如:token过期了 必须先请求一个token 再进行登录请求 Observable.just("getToken","login").flatMap(new Function<String, ObservableSource<?>>() { @Override public ObservableSource<?> apply(String s) throws Exception { System.out.println("执行事件:"+s); return createRespone(s); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { //依次回调处理结果 System.out.println(o); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private ObservableSource<?> createRespone(final String s) { //根据请求再创建一个被观察者,观察上一个请求是否成功了 return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { System.out.println("上一个事件已经执行完成开始执行此事件:"+s); //此处是基于getToken完成之后才会执行 emitter.onNext(s); } }); }
- groupBy
对传入的事件进行分组,分组的条件可以自己指定@Test public void testGroupBy() { Observable.just(1,2,3,4).groupBy(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer>2?"A组":"B组"; } }).subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { //stringIntegerGroupedObservable 是一个分组后的被观察者 stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { String key = stringIntegerGroupedObservable.getKey(); System.out.println("key="+key+" "+integer); } }); } }); }
- buffer
大批量数据需要处理的时候,对其进行分批次处理@Test public void testBuffer() { //将6条数据每2条分一个组执行 Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<Integer> integers) { //以此回调每个组的数据 System.out.println(integers); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
- range
上一个结果作为下一个参数,所有的结果累加得到最终结果,文件合并或者字符串拼接等场景.@Test public void testScan() { Observable.range(1,5).scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { //第一个参数integer为之前所有结果的和 就是累加的形式 //相当于 第一个文件跟第二个文件合并,合并后的结果跟第三个文件合并...最终合并成一个大文件 return integer+integer2; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
三、过滤操作符
- filter
对事件进行过滤或者不过滤的处理@Test public void testFilter() { Observable.just(1,2,3,4,5,6).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { //此次决定是否过滤 //true 不过滤 //false 过滤掉不计入结果中 return integer>2; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { //接受过滤后的结果 System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
- take
限制产生事件的数量@Test public void testTake() { //每隔1单位秒的时间执行一次 take限制只执行5次 Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { System.out.println(aLong+""); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
- distinct
过滤重复事件@Test public void testDistinct() { Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
- elementAt
过滤指定的事件//指定过虑出第5个事件 Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } });
四、条件操作符
- all
判断所有事件是否满足一个条件,如果全部满足则为trueObservable.just(1,2,3,4,5,6).all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { //所有的事件都大于2吗 return integer>2; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { //此次返回时间结果 System.out.println(aBoolean); } });
- contains
判断所有事件中是否包含某项事件Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { //此处返回是否包含3的结果 System.out.println(aBoolean); } });
- any
所有事件中只要有有一个符合条件即为trueObservable.just(1,2,3,4,5).any(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer==3; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println(aBoolean); } });
- isEmpty
判断一个观察者是否有事件Observable.just(1).isEmpty().subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { //有事件返回true 空事件返回false System.out.println(aBoolean); } });
- defaultIfEmpty
如果被观察者不发送任何事件,则会发送默认事件.defaultIfEmpty(0)
- skipWhile
跳过满足条件的事件//从0开始每隔1000毫秒发送50个事件 初始延时0 Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { //跳过<10的事件 return aLong<10; } }).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println(aLong); } });
五、合并操作符
将被观察者进行合并
- startWith
把需要的事件合并成一个事件进行处理,会先处理startWith添加的事件//把需要的事件合并成一个事件进行处理,会先处理2,4,6,8的事件 Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } });
- concat
合并最多4个事件 以先来后到的顺序进行处理,跟startWith相反。//合并两个事件 123 会优先处理 Observable.concat( Observable.just(1,2,3), Observable.just(4,5,6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } });
- merge
merge合并多个被观察者,合并之后按照时间顺序并行执行Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS); Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS); Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS); Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { System.out.println(o); } });
- mergeDelayError
延迟抛出异常事件,当合并的其它事件都执行完成之后再抛出异常//延迟抛出异常事件 Flowable observable1 = Flowable.create(new FlowableOnSubscribe<Publisher<?>>() { @Override public void subscribe(FlowableEmitter<Publisher<?>> emitter) throws Exception { //假设此处发生了异常 emitter.onError(new NullPointerException()); } }, BackpressureStrategy.BUFFER); Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS); Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { System.out.println(o); } });
- zip
将多个被观察者压缩成单个,输出事件最少的被观察者结果