part05_Rxjava操作符

作者:IT魔幻师
博客:www.huyingzi.top
转载请注明出处://www.greatytc.com/p/afeba5aea533


一、创建型操作符

主要用于创建被观察者

  • just
    create的快捷创建操作,create操作符必须手动调用onNext才能触发事件,just会自动触发

    @Test
     public void testjust() {
         //just是create的快捷创建操作
         Observable.just("我是你爸爸","我是你爸爸2").subscribe(new Observer<String>() {
             @Override
             public void onSubscribe(Disposable d) {
             }
    
             @Override
             public void onNext(String s) {
                 //此处会依次收到just参数传递过来的值
             }
    
             @Override
             public void onError(Throwable e) {
    
             }
    
             @Override
             public void onComplete() {
    
             }
         });
     }
    
  • fromArray
    相比于just,fromArray适用于多参数的情况.

      @Test
      public void testFromArray() {
          Observable.fromArray(new String[]{"我是你爸爸",
                  "我是你爸爸2",
                  "我是你爸爸3",
                  "我是你爸爸4"}).subscribe(new Observer<String>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("onNext  "+s);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • range
    创建在一定范围内的事件

    @Test
      public void testRange() {
          //从5开始执行11次事件
          Observable.range(5,11).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
              @Override
              public void onError(Throwable e) {
              }
              @Override
              public void onComplete() {
              }
          });
      }
    
  • empty
    主要适用于调用后不需要返回参数只需要关心结果,如:发起网络请求后在onComplete()中处理结果即可他不会回调onNext函数.

    
      @Test
      public void testempty() {
          Observable.empty().subscribe(new Observer<Object>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
                  System.out.println("执行结束");
              }
          });
      }
    
  • interval
    定时器操作符,需要依赖Android的api不能在纯java环境下使用

    //每隔1单位秒的时间执行一次
    Observable.interval(1, TimeUnit.SECONDS);
    
  • intervalRange
    定时器操作符,需要依赖Android的api不能在纯java环境下使用

     //从0开始每隔1000毫秒发送50个事件  初始延时0
          Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    
  • timer
    跟interval一样.

二、转换操作符

将事件类型转换成我们想要的结果

  • map
     @Test
      public void testMap() {
          //场景:根据图片地址最终转换成bitmap
          Observable.just("icon01.png","icon02.png").map(new Function<String, Bitmap>() {
              @Override
              public Bitmap apply(String url) throws Exception {
                  //在此次模拟执行网络请求等操作
                  // ...  此处省略
                  Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888);
                  return mBitmap;
                  
              }
          }).subscribe(new Observer<Bitmap>() {
              @Override
              public void onSubscribe(Disposable d) {
                  
              }
    
              @Override
              public void onNext(Bitmap bitmap) {
                  //在此次就可以 以此得到请求到的图片 
                  System.out.println("得到结果:"+bitmap);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
    
  • flatMap
    在上一个事件完成后才能开始下一个事件的情况
    @Test
      public void testFlatMap() {
          //比如:token过期了 必须先请求一个token 再进行登录请求
          Observable.just("getToken","login").flatMap(new Function<String, ObservableSource<?>>() {
              @Override
              public ObservableSource<?> apply(String s) throws Exception {
                  System.out.println("执行事件:"+s);
                  return createRespone(s);
              }
          }).subscribe(new Observer<Object>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
                  //依次回调处理结果
                  System.out.println(o);
    
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
    
      }
    
      private ObservableSource<?> createRespone(final String s) {
          //根据请求再创建一个被观察者,观察上一个请求是否成功了
          return Observable.create(new ObservableOnSubscribe<String>() {
              @Override
              public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                  System.out.println("上一个事件已经执行完成开始执行此事件:"+s);
                  //此处是基于getToken完成之后才会执行
                  emitter.onNext(s);
              }
          });
      }
    
  • groupBy
    对传入的事件进行分组,分组的条件可以自己指定
    @Test
      public void testGroupBy() {
          Observable.just(1,2,3,4).groupBy(new Function<Integer, String>() {
              @Override
              public String apply(Integer integer) throws Exception {
                  return integer>2?"A组":"B组";
              }
          }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
              @Override
              public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable)
                      throws Exception {
                  //stringIntegerGroupedObservable 是一个分组后的被观察者
                  stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          String key = stringIntegerGroupedObservable.getKey();
                          System.out.println("key="+key+" "+integer);
                      }
                  });
              }
          });
      }
    
  • buffer
    大批量数据需要处理的时候,对其进行分批次处理
     @Test
      public void testBuffer() {
          //将6条数据每2条分一个组执行
          Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer<List<Integer>>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(List<Integer> integers) {
                  //以此回调每个组的数据
                  System.out.println(integers);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • range
    上一个结果作为下一个参数,所有的结果累加得到最终结果,文件合并或者字符串拼接等场景.
      @Test
      public void testScan() {
          Observable.range(1,5).scan(new BiFunction<Integer, Integer, Integer>() {
              @Override
              public Integer apply(Integer integer, Integer integer2) throws Exception {
                  //第一个参数integer为之前所有结果的和  就是累加的形式
                  //相当于 第一个文件跟第二个文件合并,合并后的结果跟第三个文件合并...最终合并成一个大文件
                  return integer+integer2;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    

三、过滤操作符

  • filter
    对事件进行过滤或者不过滤的处理
    @Test
      public void testFilter() {
          Observable.just(1,2,3,4,5,6).filter(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  //此次决定是否过滤
                  //true 不过滤
                  //false 过滤掉不计入结果中
                  return integer>2;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  //接受过滤后的结果 
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • take
    限制产生事件的数量
        @Test
      public void testTake() {
          //每隔1单位秒的时间执行一次 take限制只执行5次
          Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer<Long>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Long aLong) {
                  System.out.println(aLong+"");
                  
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • distinct
    过滤重复事件
        @Test
      public void testDistinct() {
          Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • elementAt
    过滤指定的事件
          //指定过虑出第5个事件
          Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    

四、条件操作符

  • all
    判断所有事件是否满足一个条件,如果全部满足则为true
     Observable.just(1,2,3,4,5,6).all(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  //所有的事件都大于2吗
                  return integer>2;
              }
          }).subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  //此次返回时间结果
                  System.out.println(aBoolean);
              }
          });
    
  • contains
    判断所有事件中是否包含某项事件
     Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer<Boolean>() {
             @Override
             public void accept(Boolean aBoolean) throws Exception {
                 //此处返回是否包含3的结果
                 System.out.println(aBoolean);
             }
         });
    
  • any
    所有事件中只要有有一个符合条件即为true
    Observable.just(1,2,3,4,5).any(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  return integer==3;
              }
          }).subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  System.out.println(aBoolean);
              }
          });
    
  • isEmpty
    判断一个观察者是否有事件
     Observable.just(1).isEmpty().subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  //有事件返回true  空事件返回false
                  System.out.println(aBoolean);
              }
          });
    
  • defaultIfEmpty
    如果被观察者不发送任何事件,则会发送默认事件
    .defaultIfEmpty(0)
    
  • skipWhile
    跳过满足条件的事件
             //从0开始每隔1000毫秒发送50个事件  初始延时0  
            Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate<Long>() {
              @Override
              public boolean test(Long aLong) throws Exception {
                  //跳过<10的事件
                  return aLong<10;
              }
          }).subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    

五、合并操作符

将被观察者进行合并

  • startWith
    把需要的事件合并成一个事件进行处理,会先处理startWith添加的事件
            //把需要的事件合并成一个事件进行处理,会先处理2,4,6,8的事件
          Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • concat
    合并最多4个事件 以先来后到的顺序进行处理,跟startWith相反。
            //合并两个事件 123 会优先处理
          Observable.concat(
                  Observable.just(1,2,3),
                  Observable.just(4,5,6))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • merge
    merge合并多个被观察者,合并之后按照时间顺序并行执行
            Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS);
    
          Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • mergeDelayError
    延迟抛出异常事件,当合并的其它事件都执行完成之后再抛出异常
    //延迟抛出异常事件
          Flowable observable1 = Flowable.create(new FlowableOnSubscribe<Publisher<?>>() {
              @Override
              public void subscribe(FlowableEmitter<Publisher<?>> emitter) throws Exception {
                  //假设此处发生了异常
                  emitter.onError(new NullPointerException());
              }
          }, BackpressureStrategy.BUFFER);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
    
    
          Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • zip
    将多个被观察者压缩成单个,输出事件最少的被观察者结果
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335