RxJava2学习笔记

本文参考RxJava2 只看这一篇文章就够了,强烈推荐大家去看一下。

RxJava的组成

  • 被观察者-------Observable
  • 观察者-----------Observer
  • 订阅---------------subscribe
  //被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("a");
                emitter.onNext("b");
                emitter.onNext("c");
                emitter.onComplete();
            }
        });
        //观察者
        Observer observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                LogUtils.e(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        //订阅
        observable.subscribe(observer);

常见操作符

创建操作符

  • create()
  //创建一个被观察者
      Observable<String>observable = Observable.create(new ObservableOnSubscribe<String>() {
          @Override
          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
              emitter.onNext("Hello Java");
              emitter.onComplete();
          }
      });
  • just() --------发送事件不可以超过10个
Observable.just(1,2,3,4,5,6,7,8,9,0).subscribe(new Consumer<Integer>() {
          @Override
          public void accept(Integer integer) throws Exception {
              LogUtils.e(integer);
          }
      });
  • From 操作符
  1. fromArray() ----- 可以发送数组(数量可以大于10个)
Integer integers[] = {0,1,2,3,4,5};
        Observable.fromArray(integers).subscribe(new Consumer<Integer>() {
          @Override
          public void accept(Integer integer) throws Exception {
              LogUtils.e(integer);
          }
      });
  1. fromCallable() -----被观察者返回一个结果值给观察者
   Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "hello";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.e(s);
            }
        });
  1. fromIterable() ---------可以发送一个List集合给观察者
     List<String>list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Observable.fromIterable(list).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.e(s);
            }
        });
  1. fromFuture() ------- 可以发送一个Future
       FutureTask<String>futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello";
            }
        });
        //doOnSubscribe()----- 开始订阅时才会执行
        Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                futureTask.run();//开始执行
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.e(s);
            }
        });
  • defer() -------------- 只有观察者订阅时,才会创建新的被观察者
String str_name = "张三";
    public void rxjava() {
        Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just(str_name);
            }
        });
        str_name = "李四";
        Observer observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                LogUtils.e("str_name = " + s);//王五,赵六
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        str_name = "王五";
        observable.subscribe(observer);
        str_name = "赵六";
        observable.subscribe(observer);
    }
  • timer() -------------当到了指定时间就发送一个0L的值给观察者
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                LogUtils.e("along : "+aLong);//along : 0
            }
        });
  • interval() ------------ 每隔一段时间就会发送一个事件(从0开始不断增加1的数字)
  Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                //每隔2s 执行一次 along : 从0开始每次回增加1
                LogUtils.e("along : "+aLong);//along : 0, along : 1
            }
        });
        
        //延迟5s后开始执行
        Observable.interval(5,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                //每隔2s 执行一次 along : 从0开始每次回增加1
                LogUtils.e("along : "+aLong);//along : 0, along : 1
            }
        });
  • intervalRange() ------------ 可以指定发送事件的开始值,数量,其他的和interval()一样
        //start:起始数值 -------- 10
        //count:发射数量 -------- 3
        //initialDelay:延迟执行时间-------- 5s
        //period:发射周期时间------2s
        //unit:时间单位
        //数字从10开始,传递3次,第一次执行延迟5s,每隔2s执行一次
        Observable.intervalRange(10,3,5,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                //每隔2s 执行一次 along : 从0开始每次回增加1
                LogUtils.e("along : "+aLong);//along : 10, along : 11, along : 12
            }
        });
        
        //start:起始数值 -------- 10
        //count:发射数量 -------- 3
        //initialDelay:延迟执行时间-------- 5s
        //period:发射周期时间------2s
        //unit:时间单位
        //Scheduler:线程调度
        //数字从10开始,传递3次,第一次执行延迟5s,每隔2s执行一次,在新线程中执行
        Observable.intervalRange(10,3,5,2, TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                LogUtils.e("当前线程 :"+Thread.currentThread().getName());//RxNewThreadScheduler
                //每隔2s 执行一次 along : 从0开始每次回增加1
                LogUtils.e("along : "+aLong);//along : 10, along : 11, along : 12
            }
        });
        
  • range() ----------- 发送一定范围内的事件
   //从10开始,执行3次
       Observable.range(10,3).subscribe(new Consumer<Integer>() {
           @Override
           public void accept(Integer integer) throws Exception {
               LogUtils.e("integer : "+integer);//10,11,12
           }
       });
  • rangeLong() ----和range()方法类似,只是数据类型为Long
  //从10开始,执行3次
       Observable.rangeLong(10,3).subscribe(new Consumer<Long>() {
           @Override
           public void accept(Long aLong) throws Exception {
               LogUtils.e("along : "+aLong);//10,11,12
           }
       });
  • empty() & never() & error()
  1. empty() --------------- 直接发送 onComplete() 事件
   //只会进入 onSubscribe(),onComplete()方法
       Observable.empty().subscribe(new Observer<Object>() {
           @Override
           public void onSubscribe(Disposable d) {
               LogUtils.e("==================onSubscribe");
           }
           @Override
           public void onNext(Object o) {
               LogUtils.e("==================onNext");
           }
           @Override
           public void onError(Throwable e) {
               LogUtils.e("==================onError " + e);
           }
           @Override
           public void onComplete() {
               LogUtils.e("==================onComplete");
           }
       });
  1. never() ---------------- 不发生任何时间
 //只会进入 onSubscribe()方法
       Observable.never().subscribe(new Observer<Object>() {
           @Override
           public void onSubscribe(Disposable d) {
               LogUtils.e("==================onSubscribe");
           }
           @Override
           public void onNext(Object o) {
               LogUtils.e("==================onNext");
           }
           @Override
           public void onError(Throwable e) {
               LogUtils.e("==================onError " + e);
           }
           @Override
           public void onComplete() {
               LogUtils.e("==================onComplete");
           }
       });
  1. error() -------------------发送onError()事件
//只会进入 onSubscribe(),onError()方法
       Observable.error(new NullPointerException()).subscribe(new Observer<Object>() {
           @Override
           public void onSubscribe(Disposable d) {
               LogUtils.e("==================onSubscribe");
           }
           @Override
           public void onNext(Object o) {
               LogUtils.e("==================onNext");
           }
           @Override
           public void onError(Throwable e) {
               LogUtils.e("==================onError " + e);
           }
           @Override
           public void onComplete() {
               LogUtils.e("==================onComplete");
           }
       });

转换操作符

  • map() -------------- 将被观察者发送的数据类型转换成其他的类型
Observable.just(1,2,3,4,5).map(new Function<Integer, String>() {
         @Override
         public String apply(Integer integer) throws Exception {
             return "当前值为:"+integer;
         }
     }).subscribe(new Consumer<String>() {
         @Override
         public void accept(String s) throws Exception {
             LogUtils.e(s);
         }
     });
  • flatMap() ----------------- 作用于map() 方法类似,返回一个 新的Observerable (无序: flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序 )
    Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                    list.add("当前值为:"+integer);
                if (integer == 3) {
                    return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//延迟10毫秒
                } else {
                    return Observable.fromIterable(list);
                }
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                LogUtils.e(o.toString());//当前值为:1,2,4,5,3 ------无序的
            }
        });
  • concatMap() ------------ 作用和flatMap() 方法一样(有序:concatMap()转发事件的顺序是有序的)
Observable.just(1, 2, 3, 4, 5).concatMap(new Function<Integer, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                    list.add("当前值为:"+integer);
                if (integer == 3) {
                    return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//延迟10毫秒
                } else {
                    return Observable.fromIterable(list);
                }
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                LogUtils.e(o.toString());//当前值为:1,2,3,4,5, ------有序的
            }
        });
  • buffer() ------------- 从需要发送的事件中获取一定数量的事件,将这些事件存放到缓冲区中一并发出
        //count: 缓冲区元素的数量
        //skip: 就代表缓冲区满了之后,发送下一次事件的时候要跳过的元素数量
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        LogUtils.e("缓冲区大小: " + integers.size());
                        String str = "";
                        for (int i = 0; i < integers.size(); i++) {
                            str = str + "," + integers.get(i);
                        }
                        LogUtils.e("当前元素: " + str);
                    }
                });
  • groupBy() ---------------- 将发送的数据进行分组,每个分组都会返回一个被观察者
Observable.just(1, 2, 3, 4, 5, 6, 7)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        return integer % 2;//分为2组
                    }
                })
                .subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
                    @Override
                    public void accept(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        LogUtils.e(" 第" + integerIntegerGroupedObservable.getKey()+"组");
                        integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                LogUtils.e("第" + integerIntegerGroupedObservable.getKey() + "组,当前元素: " + integer);
                            }
                        });
                    }
                });
  • scan() ---------------- 将数据按照一定的逻辑合并数据
Observable.just(1, 2, 3, 4, 5, 6, 7)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer1, Integer integer2) throws Exception {
                        LogUtils.e("integer1 = "+integer1);//上一次的结果
                        LogUtils.e("integer2 = "+integer2);
                        return integer1 +integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.e("integer1 + integer2 ="+integer);
            }
        });
  • window() --------------将发送数据 按指定数量进行分组
 Observable.just(1, 2, 3, 4, 5, 6, 7)
              .window(3)
                .subscribe(new Consumer<Observable<Integer>>() {
                    @Override
                    public void accept(Observable<Integer> integerObservable) throws Exception {
                        integerObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                LogUtils.e("当前元素: " + integer);
                            }
                        });
                    }
                });

组合操作符

  • zip() -------------- 将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
        Observable.zip(
                Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        String s1 = "A" + aLong;
                        LogUtils.d("A 发送的事件: " + s1);
                        return s1;
                    }
                }), Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        String s2 = "B" + aLong;
                        LogUtils.d("B 发送的事件: " + s2);
                        return s2;
                    }
                }), new BiFunction<String, String, String>() {
                    @Override
                    public String apply(String s, String s2) throws Exception {
                        String res = s + s2;
                        LogUtils.d("A & B 发送的事件: " + res);
                        return res;
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.d( "onNext: " + s);
            }
        });
  • concat() -------------- 将多个观察者组合在一起,然后按照之前的发送顺序发送事件,最多只能合并4个被观察者
 Observable.concat(
                Observable.just(1,2),
                Observable.just(3,4),
                Observable.just(5,6),
                Observable.just(7,8)
        ).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.e("当前数字:"+integer);//1,2,3,4,5,6,7,8
            }
        });
  • concatArray() ------------ 作用和concat()方法一样,可以发送多于4个的被观察者
Observable.concatArray(
                Observable.just(1,2),
                Observable.just(3,4),
                Observable.just(5,6),
                Observable.just(7,8),
                Observable.just(9,10),
                Observable.just(11,12)
        ).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.e("当前数字:"+integer);//1,2,3,4,5,6,7,8,9,10,11,12
            }
        });
  • merge() -------------- 作用和concat() 方法一样,只不过concat()是串行发送,而merge() 是并行发送事件
 Observable.merge(
                Observable.interval(1, TimeUnit.SECONDS)
                        .map(new Function<Long,String>() {
                            @Override
                            public String apply(Long aLong) throws Exception {
                                return "A"+aLong;
                            }
                        }),
                Observable.interval(1, TimeUnit.SECONDS)
                        .map(new Function<Long,String>() {
                            @Override
                            public String apply(Long aLong) throws Exception {
                                return "B"+aLong;
                            }
                        })
        ).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.e(s);//A,B 交替出现---------- A0,B0,A1,B1,A2,B2
            }
        });
  • combineLatest() ------------ 作用和zip()类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送。
        Observable.combineLatest(
                Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        String s1 = "A" + aLong;
                        LogUtils.d("A 发送的事件: " + s1);
                        return s1;
                    }
                }), Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        String s2 = "B" + aLong;
                        LogUtils.d("B 发送的事件: " + s2);
                        return s2;
                    }
                }), new BiFunction<String, String, String>() {
                    @Override
                    public String apply(String s, String s2) throws Exception {
                        String res = s + s2;
                        LogUtils.d("A & B 发送的事件: " + res);
                        return res;
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.d( "onNext: " + s);
            }
        });
  • concatArrayDelayError() & mergeArrayDelayError()& combineLatestDelayError() -------------- 如果有一个被观察者发送了一个Error事件,那么就结束发送,如果你想将Error() 事件延迟到所有被观察者都发送完事件后再执行。
       Observable.concatArrayDelayError(
                Observable.just(1,2),
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(3);
                        e.onError(new NullPointerException());
                        e.onNext(4);
                    }
                }),
                Observable.just(5,6)
        ).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                LogUtils.e("onNext : 当前数字:"+integer);//1,2,3,5,6
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.e("onError : "+e.toString());//java.lang.NullPointerException
            }

            @Override
            public void onComplete() {

            }
        });
Observable.mergeArrayDelayError(
                Observable.interval(1, TimeUnit.SECONDS)
                        .map(new Function<Long, String>() {
                            @Override
                            public String apply(Long aLong) throws Exception {
                                return "A" + aLong;
                            }
                        }),
                Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("C1");
                        e.onNext("C2");
                        e.onNext("C3");
                        e.onError(new NullPointerException());
                        e.onNext("C4");
                        e.onNext("C5");
                    }
                }),
                Observable.interval(1, TimeUnit.SECONDS)
                        .map(new Function<Long, String>() {
                            @Override
                            public String apply(Long aLong) throws Exception {
                                return "B" + aLong;
                            }
                        })
        ).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) { }
            @Override
            public void onNext(String s) {
                LogUtils.e(s);
            }
            @Override
            public void onError(Throwable e) {
                LogUtils.e(e.toString());
            }
            @Override
            public void onComplete() { }
        });
Observable<String> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
            @Override
            public String apply(Long aLong) throws Exception {
                String s1 = "A" + aLong;
                LogUtils.d("A 发送的事件: " + s1);
                return s1;
            }
        });
        Observable<String> observable2 = Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
            @Override
            public String apply(Long aLong) throws Exception {
                String s2 = "B" + aLong;
                LogUtils.d("B 发送的事件: " + s2);
                return s2;
            }
        });
        Observable<String> observable3 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("C1");
                e.onNext("C2");
                e.onNext("C3");
                e.onError(new NullPointerException());
                e.onNext("C4");
                e.onNext("C5");
            }
        });
        Observable.combineLatestDelayError(new ObservableSource[]{observable1, observable2, observable3}, new Function() {
            @Override
            public Object apply(Object o) throws Exception {
                Object[] objects = (Object[]) o;
                String res = "";
                for (int i = 0; i < objects.length; i++) {
                    res = res + String.valueOf(objects[i]);
                }
                LogUtils.d("A & B & C 发送的事件: " + res);
                return res;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                LogUtils.d(s);
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.e(e.toString());
            }

            @Override
            public void onComplete() {
            }
        });
  • reduce() ------------ 将所有数据聚合在一起才会发送事件给观察者
Observable.just(1,2,3,4,5,6,7,8)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        int res = integer + integer2;
                        LogUtils.d("integer : " + integer);
                        LogUtils.d("integer2 : " + integer2);
                        LogUtils.d("res : " + res);
                        return res;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.d("accept : "+ integer);
            }
        });
  • collect() ------------ 将数据收集到数据结构当中。
Observable.just(1,2,3,4,5,6,7,8)
                .collect(new Callable<ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                        integers.add(integer);
                    }
                }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(ArrayList<Integer> integers) throws Exception {
                LogUtils.d(integers);
            }
        });
  • startWith() & startWithArray() ------------ 在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出
Observable.just(6, 7, 8)
                .startWithArray(3, 4, 5)
                .startWith(2)
                .startWithArray(0, 1)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d(String.valueOf(integer));
                    }
                });
  • count() ------------ 返回被观察者发送事件的数量
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        LogUtils.d("发送数量:" + aLong);//8
                    }
                });

功能操作符

  • delay() -------------- 延迟一段事件发送事件
Observable.just(1, 2, 3, 4)
                .delay(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnEach() ---------------- Observable 每发送一个之前都会先回调这个方法
  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        LogUtils.d("doOnEach 方法 "+ integerNotification.getValue());
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnNext() ----------------- Observable 每发送 onNext() 之前都会先回调这个方法
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doOnNext(new Consumer<Integer>(){
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("doOnNext 方法 "+ integer);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doAfterNext() -------------- Observable 每发送 onNext() 之后都会回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doAfterNext(new Consumer<Integer>(){
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("doAfterNext 方法 "+ integer);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnComplete ------------------ Observable 每发送 onComplete() 之前都会回调这个方法
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
              .doOnComplete(new Action() {
                  @Override
                  public void run() throws Exception {
                      LogUtils.d("doOnComplete 方法");
                  }
              })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnError() ---------------- Observable 每发送 onError() 之前都会回调这个方法
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onError(new NullPointerException());
            }
        })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        LogUtils.e("doOnError() :"+throwable.toString());
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnSubscribe() ----------------- Observable 每发送 onSubscribe() 之前都会回调这个方法
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                       LogUtils.d("doOnSubscribe()方法");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnDispose() -------------- 当调用 Disposable 的取消订阅dispose()方法之后回调该方法
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doOnDispose()方法");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    private Disposable disposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                        this.disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    if(integer==2){
                        disposable.dispose();//取消订阅
                    }
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnLifecycle() ------------- 在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        LogUtils.d("doOnLifecycle accept");
                        //disposable.dispose();//取消订阅
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doOnLifecycle Action ");
                    }
                })
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doOnDispose()方法");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    private Disposable disposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                        this.disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                        if (integer == 2) {
                            disposable.dispose();//取消订阅
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doOnTerminate() & doAfterTerminate() --------------- doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。如果取消订阅之后 doAfterTerminate() 就不会被回调
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doOnTerminate() 方法");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doAfterTerminate() 方法");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • doFinally() ------------- 在所有事件发送完毕之后回调该方法,doFinally() 在取消订阅后也都会被回调,且都会在事件序列的最后。
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doFinally() 方法");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doAfterTerminate() 方法");
                    }
                })
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        LogUtils.d("doOnDispose()方法");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    private Disposable disposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                        this.disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                        if (integer == 2) {
                            disposable.dispose();//取消订阅
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • onErrorReturn() ------------------- 当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onError(new NullPointerException());
            }
        })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        LogUtils.e("onErrorReturn() 方法"+throwable.toString());
                        return 404;
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • onErrorResumeNext() ----------- 当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onError(new NullPointerException());
            }
        })
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                        LogUtils.e("onErrorResumeNext()方法"+throwable.toString());
                        return Observable.just(5,6,7,8,9);
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • onExceptionResumeNext() -------------- 与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onError(new Exception("111"));
            }
        })
               .onExceptionResumeNext(new Observable<Integer>() {
                   @Override
                   protected void subscribeActual(Observer<? super Integer> observer) {
                       observer.onNext(404);
                       observer.onNext(405);
                       observer.onComplete();
                   }
               })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • retry() ----------------- 如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onError(new NullPointerException());
            }
        })
           .retry(2)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });

  • retryUntil() --------------- 出现错误事件之后,可以通过此方法判断是否继续发送事件。
 final int[] i = {0};
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onNext(5);
                e.onError(new NullPointerException());
            }
        })
                .retryUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Exception {
                        if (i[0] >= 6) {//停止继续发送
                            return true;
                        } else {
                            return false;
                        }
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        i[0] += integer;
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • retryWhen() ---------------- 当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onNext(5);
                e.onError(new NullPointerException());
            }
        })
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Throwable throwable) throws Exception {
                                if (throwable instanceof NullPointerException){
                                    return Observable.error(new Throwable("终止啦"));
                                }else{
                                    return Observable.just(6,7,8,9);
                                }
                            }
                        });
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • repeat() -------------- 重复发送被观察者的事件,times 为发送次数
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
//                e.onError(new NullPointerException());
                e.onComplete();
            }
        })
                .repeat(2)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • repeatWhen() ------------------ 这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
//                e.onError(new NullPointerException());
                e.onComplete();
            }
        })
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                        return Observable.empty();//直接发送 onComplete() 事件
//                        return Observable.just(1);//不发送任何事件
//                        return Observable.error(new Exception("404"));//发送 onError() 事件
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.e("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • subscribeOn() ------------ 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                LogUtils.d("当前线程:"+Thread.currentThread().getName());
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
//                e.onError(new NullPointerException());
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                        LogUtils.d("onSubscribe()方法—当前线程:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                        LogUtils.d("onNext()方法—当前线程:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                        LogUtils.d("onError()方法—当前线程:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                        LogUtils.d("onComplete()方法—当前线程:"+Thread.currentThread().getName());
                    }
                });
  • observeOn() ----------------- 指定观察者的线程,每指定一次就会生效一次
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                LogUtils.d("当前线程:"+Thread.currentThread().getName());
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
//                e.onError(new NullPointerException());
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                        LogUtils.d("onSubscribe()方法—当前线程:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                        LogUtils.d("onNext()方法—当前线程:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                        LogUtils.d("onError()方法—当前线程:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                        LogUtils.d("onComplete()方法—当前线程:"+Thread.currentThread().getName());
                    }
                });

过滤操作符

  • filter() -------------- 通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送
Observable.just(1, 2, 3, 4, 5, 6)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        if (integer % 2 == 0) {
                            return true;
                        } else {
                            return false;
                        }
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • ofType() ----------------- 可以过滤不符合该类型事件
 Observable.just("one","two","three",1,2,3)
                .ofType(Integer.class)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • skip() & skipLast() --------------- 跳过正序某些事件,count 代表跳过事件的数量
Observable.just(1,2,3,4,5,6)
                .skip(2)//跳过前面2个事件
                .skipLast(2)//跳过后面2个事件
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • distinct() ------------ 过滤事件序列中的重复事件
Observable.just(1,2,3,4,4,3,2,1)
                .distinct()
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);//1,2,3,4
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • distinctUntilChanged() ---------------- 过滤掉连续重复的事件
Observable.just(1,2,3,4,4,3,2,1)
                .distinctUntilChanged()
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);//1,2,3,4,3,2,1
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • take() & takeLast() --------------------- 控制观察者接收的事件的数量
Observable.just(1,2,3,4,5,6,7,8,9)
                .take(3)//只接收前面3个
//                .takeLast(2)//只接收后面2个
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • debounce() ----------------- 如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                Thread.sleep(900);
                e.onNext(2);
            }
        })
                .debounce(1,TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.d("onNext()方法 : " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • firstElement() && lastElement() ----------------- firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素
Observable.just(1, 2, 3, 4)
                .firstElement()
                .subscribe(new Consumer < Integer > () {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("firstElement() 方法 " + integer);
                    }
                });
        Observable.just(1, 2, 3, 4)
                .lastElement()
                .subscribe(new Consumer < Integer > () {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("lastElement() 方法 " + integer);
                    }
                });
  • elementAt() & elementAtOrError() -------------- elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError()
Observable.just(1, 2, 3, 4)
                .elementAt(4)
                .subscribe(new Consumer < Integer > () {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("elementAt() 方法 " + integer);
                    }
                });
                
                
Observable.just(1, 2, 3, 4)
                .elementAtOrError(4)//报错
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("elementAtOrError() 方法 " + integer);
                    }
                });

条件操作符

  • all() ---------------- 判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false
Observable.just(1, 2, 3, 4)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 5;
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                LogUtils.d("aBoolean : " + aBoolean);
            }
        });
  • takeWhile() ------------- 可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送
Observable.just(1, 2, 3, 4, 3, 2)
                .takeWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 4;//如果第一条数据没有满足条件,后面的都不会进行
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.d("integer : " + integer);
            }
        });
  • skipWhile() ---------- 可以设置条件,当某个数据满足条件时不发送该数据,反之则发送
        Observable.just(1, 2, 3, 4, 3, 2)
                .skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 4;//当满足条件时,后面的都运行
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.d("integer : " + integer);
            }
        });
  • takeUntil() -------------- 可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了
    Observable.just(1, 2, 3, 4, 3, 2)
                .takeUntil(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer >= 3;//当满足条件后,从下一次的事件开始都不会发送了
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.d("integer : " + integer);
            }
        });
  • skipUntil() ------------ 当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
                .skipUntil(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Long along) {
                        LogUtils.d("onNext()方法 : " + along);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });
  • sequenceEqual() --------------- 判断两个 Observable 发送的事件是否相同
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        LogUtils.d("aBoolean : " + aBoolean);
                    }
                });
  • contains() ------------- 判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false
Observable.just(1,2,3,4,5)
                .contains(3)
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        LogUtils.d("aBoolean : " + aBoolean);
                    }
                });
  • isEmpty() --------------- 判断事件序列是否为空 ( true :空 )
  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onComplete();
            }
        })
                .isEmpty()
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        LogUtils.d("aBoolean : " + aBoolean);
                    }
                });
  • amb() --------------- amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃
        List<Observable<Long>> list = new ArrayList<>();
        list.add(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS));
        list.add(Observable.intervalRange(11, 5, 0, 1, TimeUnit.SECONDS));
        list.add(Observable.intervalRange(21, 5, 2, 1, TimeUnit.SECONDS));
        Observable.amb(list)
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        LogUtils.d("onSubscribe()方法");
                    }

                    @Override
                    public void onNext(Long along) {
                        LogUtils.d("onNext()方法 : " + along);
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.d("onError()方法 :" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.d("onComplete()方法");
                    }
                });

  • defaultIfEmpty() -------------- 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onComplete();
            }
        })
                .defaultIfEmpty(1001)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.d("integer : " + integer);
                    }
                });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容