RxJava 操作符第一波

上一篇文章RxJava造轮子初步的了解了rxjava的简单原理,更深入的可能还是要去查看源码了,后续会再研究。这个阶段的目标是了解使用常用的操作符。

创建操作符

create

含义:使用一个函数从头开始创建observable
实现:

  • create()函数中传入一个接受observer观察者的函数Observable.Onsubscrib()

  • Observable.Onsubscriber()函数中只有一个call(Subscriber)方法

  • 在call()方法中调用subscriber.onNext()、onError()、onComplete()

      /**
               * 创建一个observable
               * 注意: 在create中函数调用发送消息时候,检查,是否有观察者,没有不发送消息,减少资源消耗
               * observer.isUnSubscribed()
               */
              fun create() {
                  Observable.create(Observable.OnSubscribe<String> { t ->
                      if (!(t?.isUnsubscribed ?: true)) {
                          try {
                              for (i in 1..10) {
                                  t?.onNext("" + i)
                              }
                              t?.onCompleted()
                          } catch (e: Exception) {
                              e.printStackTrace()
                              t?.onError(e)
                          }
    
                      }
                  })
                          .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
              }
    
from

含义: 将数组或者iterable、future转化为一个observable
实现: 产生的observable将iterable、数组中的每个item数据发送出去

    /**
             * from 将数组或者对象,生成新的observable发送出去
             */
            fun from(){
                Observable.from(arrayOf(1,2,3,4,5))
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())

            }
interval

含义:间接或者无限期的发送数据

    /**
             * Interval:
             *      固定时间发送数据
             *      初始值为0
             *      无线递增发送数据
             */
            fun interval(){
                Observable.interval(1,TimeUnit.SECONDS)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
just

含义:发送单个或者多个对象(对多发送10个对象)

    /**
             * just
             *      发送单个对象
             *      参数可选,1-10
             *      按照参数列表发送数据
             */
            fun just(){
                Observable.just(1,2,3,4,5,6,7,8,9,10)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
range

含义: 发送指定范围内数据的observable
如果找不到需要发送的数据,如,起始位负数,发送数据个数不足,抛出异常

    /**
             * range: 发送整数范围内的有序序列
             *      第一个参数:整数的起始数
             *      第二个参数:一共要发送几个数据
             */
            fun range(){
                Observable.range(3,3)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
repeat

含义:重复发送observable中数据

    /**
             * repeat
             *      重复发送源数据源,重复次数可设置
             *      repeat() 表示无限循环
             *      repeat(5):表示循环5次
             *
             * 其他循环操作符
             *      满足条件循环
             *      repeatWhen()
             *      doWhile()
             *      whileDo()
             */
            fun repeat(){
                Observable.just(1)
                        .repeat(5)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
timer

含义:在一段时间之后发送数据

    /**
             * timer
             *      在一定延时后发送一条数据
             */
            fun timer(){
                Observable.timer(1,TimeUnit.SECONDS)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }

变换操作符

buffer

含义: 可以将一段时间或者将count个数据打包发送
: 可以实现backpress 背压操作,将快速产生很多数据,缓存打包发送

    /**
             * buffer
             *      buffer(3)  三个为一体,打包发送
             *      buffer(3,4)
             *              第一个参数为count:几个数据作为一个打包
             *              第二个参数为跳跃:跳跃第一个值
             */
            fun buffer() {
                Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
    //                    .buffer(3)
                        .buffer(3, 4)
                        .subscribe(Action1<List<Int>> {
                            integers ->
                            integers.forEach {
                                integer ->
                                Log.e(TAG, "" + integer)
                            }

                            Log.e(TAG, "------------------------------------")

                        }, RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
flatMap

含义:
将observable发送的每一项数据进行变换操作,转化为多个observables,再使用merge()将所有observables合并
因为是merge()合并发送,所以发送的顺序不是有序的

    /**
             * flatMap
             *      * 合并所有产生的observables,产生的自己数据列不能保证顺序
             *      1.通过一个指定的函数将源数据源变化为其他数据
             *      2.新建一个observable发送变化后的数据源
             *      3.merge合并所有产生的observable->放入新的observable一起发送出去
             *      4.发送的顺序是无序的
             * flatMap(function1,maxCount)
             *      1.第二个参数:从源数据最大同时订阅个数,当达到最大限制,会等待其中一个终止在订阅
             */
            fun flatMap() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
    //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                        .flatMap({
                            t ->
                            var list = arrayListOf(1, 2, 3)
                            Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                        }, 3)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
concatMap

含义:和flatMap类似,都是将原始observable发送的每一项数据进行转化,不同的是,concatMap是按照顺序连接每一个发送数据

  • 就是当前一个数据源结束之后接着下一个事件的发送

      /**
               * concatMap
               *      * 按照次序连接生成的observables,然后产生自己的数据列
               *      1.和flatMap操作符类似
               *      2.不同的是,严格按照源数据的顺序发送数据源
               */
              fun concatMap() {
                  Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
      //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                          .concatMap({
                              t ->
                              var list = arrayListOf(t, t, t)
                              Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                          })
                          .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
              }
    
switchMap

含义: 只按最后发送过来的事件为准,永远只监听最后一个事件

     /**
             * switchMap
             *      只监听当前的数据
             */
            fun switchMap() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .switchMap({
                            t ->
                            var list = arrayListOf(t, t, t)
                            Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
map

含义: 将observable中发送的源数据,转化为另一种数据

    /**
             * map
             *      1.根据你指定的函数将源数据源转化为另一种类型
             */
            fun map() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .map {
                            number ->
                            "" + number
                        }
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }

辅助操作符

delay

含义: 1.第一种是对整个observable的延时,在一段时间之后再发送数据
2.第二章是对observable中的每一项数据发送之前延时

    /**
             * delay
             *      延时一段指定的时间,再发送observable数据
             *      * 整体发射时间延长
             * delay(observable)
             *      *: 发射的每一项都会延时
             *      *: 每一项数据都默认使用这个bservable的定时器
             *
             * delaySubscription(long,timeunit)
             *      *: 延时订阅原始的observable
             *      *: 整体的延时订阅
             *
             */
            fun delay() {
                Observable.just(1, 2, 3)
    //                    .delay(1,TimeUnit.SECONDS)
    //                    .delay { t ->
    //                        Observable.create<Int> {
    //                            subscriber->
    //                            Thread.sleep(1000)
    //                            subscriber.onNext(t)
    //                            subscriber.onCompleted()
    //                        }
    //                    }
    //                    .delaySubscription(1,TimeUnit.SECONDS)
                        .delaySubscription(object : Func0<Observable<Int>> {
                            override fun call(): Observable<Int> {
                                return Observable.create<Int> {
                                    subscriber ->
                                    Thread.sleep(1000)
                                    subscriber.onNext(1)
                                    subscriber.onCompleted()
                                }
                            }
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
timestamp

含义:为observable中每一个数据源包装一个时间戳,返回Timestamped<T>类型
其中t.timestampMillis获取发送这条数据的时间戳
t.value获取原始发送数据

    /**
             * timestamp
             */
            fun timestamp(){
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .timestamp()
                        .subscribe(object :Action1<Timestamped<Int>>{
                            override fun call(t: Timestamped<Int>?) {
                                Log.e(RxUtil.TAG,""+t?.timestampMillis+"value-"+t?.value)
                            }

                        })
            }
doEtch生命周期

含义: observable的整个生命周期,在发送之前调用一下事件

    /**
             * doEatch
             *      *: 在observable的对于生命周期之前的时候调用对应代码
             *  doOnNext:在subscriber->onNext之前调用
             *  doOnError:在onError->之前调用
             *  doOnCompleted: 在onComplete->之前调用
             *  doOnTerminate: observable终止的时候调用(无论是否正常终止)
             */
            fun doEatch() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
    //                    .doOnNext { Log.e(RxUtil.TAG,"doOnNext-onNext") }
    //                    .doOnTerminate{Log.e(RxUtil.TAG,"doOnTerminate-doOnTerminate")}
    //                    .finallyDo{Log.e(RxUtil.TAG,"finallyDo-finallyDo")}
                        .doOnEach(object : Observer<Int> {
                            override fun onNext(t: Int?) {
                                Log.e(RxUtil.TAG, "doEatch-onNext")
                            }

                            override fun onError(e: Throwable?) {
                                Log.e(RxUtil.TAG, "doEatch-onError")
                            }

                            override fun onCompleted() {
                                Log.e(RxUtil.TAG, "doEatch-onCompleted")
                            }
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
timeout

含义: 在一段时间之后没有发送数据,有两种处理
1. 直接抛出异常
2. 运行默认的observable数据
可以结合onErrorReturn进行错误处理

    /**
             * timeout
             *      *: 超过一段时间没有发送数据,抛出异常
             *  timeout(long,timeunit,observable)
             *      *: 超过一段时间,执行默认的observable
             */
            fun timeout() {
                Observable.create<Int> {
                    subscriber ->
                    Thread.sleep(2000)
                    subscriber.onNext(1)
                    subscriber.onCompleted()
                }
    //                    .timeout(1, TimeUnit.SECONDS)
                        .timeout(1, TimeUnit.SECONDS, Observable.create {
                            subscriber ->
                            subscriber.onNext(-1)
                            subscriber.onCompleted()
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
toList

含义:将observable发送的所有数据结果包装一个list集合中,一起发出去
toSortedList()可以对生成的数据进行排序
toSortedList(func2())这个自定义实现排序还有问题

    /**
             * toList
             *      *: 让observable将多项数据组合成一个list数据返回
             *  toSortedList
             *      *: 可以排序,默认自然顺序
             */
            fun toList(){
                Observable.from(arrayOf(1, 3, 2, 5, 4, 8, 7, 6))
    //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                        .concatMap({
                            t ->
                            var list = arrayListOf(3, 1, 2)
                            Observable.from(list)
                        })
    //                    .toList()
                        .toSortedList()
    //                    .toSortedList(object :Func2<Int,Int,Int>{
    //                        override fun call(t1: Int?, t2: Int?): Int {
    //                            return t2?.toInt()?:0
    //                        }
    //                    })
                        .subscribe(object :Action1<List<Int>>{
                            override fun call(t: List<Int>?) {
                                t?.forEach {
                                    Log.e(RxUtil.TAG,"toList-"+t)
                                }
                            }

                        }, Action1<Throwable> {
                            error->
                            Log.e(RxUtil.TAG,"toList-"+error.toString())
                        })
            }
toMap

含义: 将observable中所有数据结果合并到map中,一起发送出去

    /**
             * toMap
             *      *: 将原始所有数据合并到map中,发送这个map
             *
             */
            fun toMap(){
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .toMap(object :Func1<Int,Int>{
                            override fun call(t: Int): Int {
                                return 10*t
                            }
                        },object :Func1<Int,String>{
                            override fun call(t: Int?): String {
                                return ""+t
                            }

                        })
                        .subscribe(object :Action1<Map<Int,String>>{
                            override fun call(map: Map<Int,String>?) {
                                map?.forEach {
                                    (key,value)->
                                    Log.e(RxUtil.TAG, "toMap-key-$key-value-$value")
                                }
                            }

                        }, Action1<Throwable> {
                            error->
                            Log.e(RxUtil.TAG,"toMap-"+error.toString())
                        })
            }

本期的操作符暂时这么多啊,后面还有第二波哦

附上github地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容