RxSwift(四)高阶函数

@TOC

我们知道Swift中有很多高阶函数,非常好用,而且效率都很高,如我们经常使用的map,fliter,flatmap等等。详情可以参考我之前的一篇博客:Swift的高阶函数

本篇文章主要讲解Rxswift中的高阶函数. 主要讲解高阶函数的使用,然后展开来探索具体源码实现。

Rwswift高阶函数简介

1. 高阶函数种类

  • Rxswift有哪些高阶函数呢?通过下面这张图你可以大致了解一下:


    Rxswift高阶函数-kyl

2. Rxswift高阶函数简介

2.1 组合操作符

  • startWith: 在开始从可观测源发出元素之前,发出知道的元素序列
  • merge:将员可观测序列中的元素组合成一个新的可观测序列,并将像每个源可观测序列发出元素一样发出每个元素。
  • zip:将多达8个源可观测序列组合成一个新的可观测序列,并将从组合的可观测序列中发射出对应索引处每个源可 观测序列的元素。
  • combineLatest:将8个源可观测序列组合成一个新的可观测序列,并将开始发出联合可观测序列的每个源的最新元素可观测序列一旦所有排放源序列至少有一个 元素,并且当源可观测序列发出的任何一个新元素。
  • switchLatest:将可观测序列发出的元素转换为可观察序列,并返回转换后的新可观察序列。

2.2 映射操作符

  • map:转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
  • flatMap:将可观测序列 发送的元素转换为可观测序列,并将两个可观测的序列 发送合并为一个可观测的序列。
    例如:当你有一个可观测的序列,他本身可以发出可观察序列,你想能够对任何一个可观察序列的新发射做出反应(序列中的序列:比如网络序列中还有模型序列)
  • flatMapLatest:同flatMap,区别就是flatMapLatest只会从最近的内部可观察序列发射元素。
  • scan:从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果。

2.3 过滤条件操作符

  • fliter:仅从满足指定条件的可 观察序列中发出那些元素。
  • distincUntilChanged:抑制可观察序列发出的顺序重复元素。
  • elementAt:仅在可观察序列发出的所有元素的指定索引处发出元素。
  • single:只发出可 观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
  • take:只从一个可观察序列的开始发出指定数量的元素。上面single只有一个序列,在实际开发中受到局限,这里引出take想几个就几个。
  • takeLast:仅从可观察序列的末尾发出指定数量的元素。
  • takeWhile:只要指定的条件只为true,就可以从可观察序列的开始发出元素。
  • takeUntil:从源可观察序列发出元素,直到参考可观察序列发出元素(这个很重要,应用非常频繁,比如我页面销毁了,就不能获取值了,如:cell重用)
  • skip:从源可观察序列发出元素,直到参考可观察序列发出元素。(这个很重要,应用频繁)
  • skipUntil:抑制从源可观察序列发出元素,直到参考可观察序列产生。

2.5 集合控制操作符

  • toArray:将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止。
  • reduce:从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累计器闭包,并以单个元素可观察序列的形式返回聚合结果。(类似于scan)
  • concat:以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功 终止。(用来控制顺序)

2.6 从序列错误中恢复的操作符

  • catchErrorJustReturn:从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止。
  • catchError:通过切换到提供的恢复可观察序列,从错误事件中恢复。
  • retry:通过 无限地重新订阅可观察序列来恢复重复的错误事件。
  • retry(_ : ):通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数。

2.7 debug Rx流程操作符

  • debug:打印所有订阅,事件和处理。
  • RxSwift.Resources.total:操作符 提供所有Rx资源分配的计数,这对于在开发期间检测内存泄漏非常有用。

2.8 链接操作符

  • multicast:将源可观察序列转换为可连接序列,并通过指定的主题广播它的发射。
  • replay:将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放一起排放的缓存大小。(首先拥有和publish一样的能力,共享Observable 和sequence,其次使用replay还需要我们传入一个参数:buffer size 来缓存已经发送的事件,当有了新的订阅者订阅了,会把缓存的事件发送给新的订阅者)
  • push:将源可观察序列转换为可连接序列。(共享一个Observable的实际序列,避免创建多个Observable 和sequence.注意:需要调用conect之后才会开始发送事件)

Rwswift高阶函数使用

2 Rx高阶函数实例讲解

本篇博客涉及的实例都放在一个项目源码里面:点击此处下载高阶函数使用源码

2.1 组合操作符

startWith

  • 在开始从可观测源发出元素之前,发出知道的元素序列

实例 1
代码:

// *** startWith : 在开始从可观察源发出元素之前,发出指定的元素序列
func test_startWith()  {
        print("*****startWith*****")
        Observable.of("1", "2", "3", "4")
            .startWith("A")
            .startWith("B")
            .startWith("C", "a", "b")
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        //效果: CabBA1234
 }

运行结果:


image

merge

将员可观测序列中的元素组合成一个新的可观测序列,并将像每个源可观测序列发出元素一样发出每个元素。

实例 5
代码:

// **** merge : 将源可观察序列中的元素组合成一个新的可观察序列,并将像每个源可观察序列发出元素一样发出每个元素
func test_merge()  {
        print("*****merge*****")
        let subject1 = PublishSubject<String>()
        let subject2 = PublishSubject<String>()
        // merge
        Observable.of(subject1,subject2)
            .merge()
            .subscribe(onNext: { print($0)})
            .disposed(by: disposeBag)
        
        //- 下面任何一个响应都会勾起新序列响应
        subject1.onNext("K")
        subject1.onNext("o")
        subject2.onNext("n")
        subject2.onNext("g")
        subject1.onNext("Y")
        subject2.onNext("u")
        subject1.onNext("L")
        subject2.onNext("u")
    }

运行结果:


image

zip

  • 将多达8个源可观测序列组合成一个新的可观测序列,并将从组合的可观测序列中发射出对应索引处每个源可 观测序列的元素。

实例 10
代码:

//  *** zip: 将多达8个源可观测序列组合成一个新的可观测序列,
//  并将从组合的可观测序列中发射出对应索引处每个源可观测序列的元素
func test_zip()  {
        print("*****zip*****")
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()
        
        Observable.zip(stringSubject, intSubject) { stringElement , intElement in
            "\(stringElement) \(intElement)"
        }
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        stringSubject.onNext("K")
        stringSubject.onNext("o") //到这里存储了K o 但是不好响应,除非有另一个响应
        
        intSubject.onNext(1) //勾出一个
        intSubject.onNext(2) //勾出另一个
        stringSubject.onNext("o") //再存一个
        intSubject.onNext(3) //勾出一个
        
        //总结: 只有两个序列同时有值的 时候才会响应,否则只会存值。
        
    }

运行结果:


image

combineLatest

  • 将8个源可观测序列组合成一个新的可观测序列,并将开始发出联合可观测序列的每个源的最新元素可观测序列一旦所有排放源序列至少有一个 元素,并且当源可观测序列发出的任何一个新元素。

实例 15
代码:

/// combineLatest:将8源可观测序列组合成一个新的观测序列,
    ///并将开始发出联合观测序列的每个源的最新元素可观测序列一旦所有排放源序列至少有一个元素,
    ///并且当源可观测序列发出的任何一个新元素
    func test_combineLatest()  {
        print("*****combineLatest*****")
        let stringSub =  PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        //合并序列
        Observable.combineLatest(stringSub, intSub) { strE, intE in
            "\(strE) \(intE)"
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        stringSub.onNext("K") //存一个K
        stringSub.onNext("Y") //存了一个覆盖 - 和zip不一样
        intSub.onNext(1) //发现strOB 观察者存在值 Y(上面的Y覆盖了K) 则 响应 Y 1
        intSub.onNext(2) //覆盖1 -> 2, 发现strOB有值YK 响应 Y 2
        stringSub.onNext("Kongyulu") // 覆盖Y -> Kongyulu 发现intOB有值 2 响应:Kongyulu 2
        
        //总结:1. combineLatest 比较zip 会覆盖
        // 2. 应用非常频繁: 比如账户和密码同时满足->才能登陆. 不关系账户密码怎么变化的只要查看最后有值就可以 loginEnable
    }

运行结果:


image

switchLatest

  • 将可观测序列发出的元素转换为可观察序列,并返回转换后的新可观察序列。

实例 20
代码:

    // 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
    func test_switchLatest()  {
        // switchLatest : 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
        print("*****switchLatest*****")
        let switchLatestSub1 = BehaviorSubject(value: "K")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        //注意下面这句代码:这里选择了switchLatestSub1就不会再监听switchLatestSub2
        let switchLatestSub = BehaviorSubject(value: switchLatestSub1)
        
        switchLatestSub.asObservable()
        .switchLatest()
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("Y")
        switchLatestSub1.onNext("_")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3") // 2,3都不会监听,但是默认保存有2 覆盖1  3覆盖2
        
    }

运行结果:


image

代码:

    // 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
    func test_switchLatest()  {
        // switchLatest : 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
        print("*****switchLatest*****")
        let switchLatestSub1 = BehaviorSubject(value: "K")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        //注意下面这句代码:这里选择了switchLatestSub1就不会再监听switchLatestSub2
        let switchLatestSub = BehaviorSubject(value: switchLatestSub1)
        
        switchLatestSub.asObservable()
        .switchLatest()
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("Y")
        switchLatestSub1.onNext("_")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3") // 2,3都不会监听,但是默认保存有2 覆盖1  3覆盖2
        switchLatestSub.onNext(switchLatestSub2) //切换到 switchLatestSub2
        switchLatestSub1.onNext("*") //由于上面切换到了switchLatestSub2,所以switchLatestSub1不会响应,不会输出*
        switchLatestSub1.onNext("Kongyulu")//这里不会响应,不会输出Kongyulu
        switchLatestSub2.onNext("4")
        /*
         到这里会输出:
         *****switchLatest*****
         K
         Y
         _
         3
         4
         */
        switchLatestSub.onNext(switchLatestSub1)// 如果再次切换到 switchLatestSub1会打印出 Kongyulu
        switchLatestSub2.onNext("5")
        /*
         到这里会输出:
         *****switchLatest*****
         K
         Y
         _
         3
         4
         Kongyulu
         */
        
    }

运行结果:

image

2.2 映射操作符

map

  • 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。

实例 25
代码:

/// 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
    func test_map() {
        // ***** map: 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
        print("*****map*****")
        let ob = Observable.of(1,2,3,4)
        ob.map { (number) -> Int in
            return number + 2
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    }

运行结果:


在这里插入图片描述

flatMap

  • 将可观测序列 发送的元素转换为可观测序列,并将两个可观测的序列 发送合并为一个可观测的序列。
    例如:当你有一个可观测的序列,他本身可以发出可观察序列,你想能够对任何一个可观察序列的新发射做出反应(序列中的序列:比如网络序列中还有模型序列)

实例 30
代码:

///将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
    ///这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,
    ///你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)
    func test_flatmap() {
        print("*****flatMap*****")
        let boy = LGPlayer(score: 100)
        let girl = LGPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMap { $0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        boy.score.onNext(60)
        player.onNext(girl)
    }

运行结果:

image

代码:

struct LGPlayer {
    init(score: Int) {
        self.score = BehaviorSubject(value: score)
    }
    let score: BehaviorSubject<Int>
}

func test_flatmap() {
        print("*****flatMap*****")
        let boy = LGPlayer(score: 100)
        let girl = LGPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMap { $0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        boy.score.onNext(60)
        player.onNext(girl)
        boy.score.onNext(50)
        boy.score.onNext(40)//  如果切换到 flatMapLatest 就不会打印
        girl.score.onNext(10)
        girl.score.onNext(0)
    }

运行结果:

image

flatMapLatest

  • 同flatMap,区别就是flatMapLatest只会从最近的内部可观察序列发射元素。

实例 35
代码:

struct LGPlayer {
    init(score: Int) {
        self.score = BehaviorSubject(value: score)
    }
    let score: BehaviorSubject<Int>
}
 /// flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素
    /// flatMapLatest实际上是map和switchLatest操作符的组合。
    func test_flatMapLatest() {
        print("*****flatMapLatest*****")
        let boy = LGPlayer(score: 100)
        let girl = LGPlayer(score: 90)
        let player = BehaviorSubject(value: boy)
        
        player.asObservable()
            .flatMapLatest { $0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        boy.score.onNext(60)
        player.onNext(girl)
        boy.score.onNext(50)
        boy.score.onNext(40)//  如果切换到 flatMapLatest 就不会打印
        girl.score.onNext(10)
        girl.score.onNext(0)
    }

运行结果:


image

scan

  • 从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果。

实例 40
代码:

///从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果
    func test_scan() {
        print("*****scan*****")
        Observable.of(10,100,1000)
            .scan(2) { aggregateValue, newValue in
                aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
        }
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    }

运行结果:


image

2.3 过滤条件操作符

fliter

  • 仅从满足指定条件的可 观察序列中发出那些元素。

实例 45
代码:

///仅从满足指定条件的可观察序列中发出那些元素
    func test_fliter() {
        // **** filter : 仅从满足指定条件的可观察序列中发出那些元素
        print("*****filter*****")
        Observable.of(1,2,3,4,5,6,7,8,9,0)
            .filter{$0 % 2 == 0}
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
    }

运行结果:


image

distincUntilChanged

  • 抑制可观察序列发出的顺序重复元素。

实例 50
代码:

///抑制可观察序列发出的顺序重复元素
    func test_distinctUntilChanged() {
        // ***** distinctUntilChanged: 抑制可观察序列发出的顺序重复元素
        print("*****distinctUntilChanged*****")
        Observable.of("1", "2", "2", "2", "3", "3", "4")
            .distinctUntilChanged()
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
    }

运行结果:


image

elementAt

  • 仅在可观察序列发出的所有元素的指定索引处发出元素。

实例 55
代码:

///仅在可观察序列发出的所有元素的指定索引处发出元素
    func test_elementAt() {
        // **** elementAt: 仅在可观察序列发出的所有元素的指定索引处发出元素
        print("*****elementAt*****")
        Observable.of("C", "o", "o", "c", "I")
            .elementAt(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

single

  • 只发出可 观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。

实例 60
代码:

///只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
    func test_single() {
        // *** single: 只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
        print("*****single*****")
        Observable.of("kongyulu", "yuhairong")
            .single()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

代码:

func test_single2() {
        // *** single: 只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
        print("*****single*****")
        Observable.of("kongyulu", "yuhairong")
            .single{ $0 == "kongyulu"}
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

take

  • 只从一个可观察序列的开始发出指定数量的元素。上面single只有一个序列,在实际开发中受到局限,这里引出take想几个就几个。

实例 65
代码:

///只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
    func test_take() {
        // **** take: 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
        print("*****take*****")
        Observable.of("kongyulu", "yuhairong","yifeng", "yisheng")
            .take(2)//这里取前面两个
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

代码:

func test_take2() {
        // **** take: 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
        print("*****take*****")
        Observable.of("kongyulu", "yuhairong","yifeng", "yisheng")
            .take(3) //这里取前面三个
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

takeLast

  • 仅从可观察序列的末尾发出指定数量的元素。

实例 70
代码:

//仅从可观察序列的末尾发出指定数量的元素
    func test_takeLast() {
        // *** takeLast: 仅从可观察序列的末尾发出指定数量的元素
        print("*****takeLast*****")
        Observable.of("kongyulu", "yuhairong","yifeng", "yisheng")
            .takeLast(3)//取从末尾开始算起的3个元素
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

takeWhile

  • 只要指定的条件只为true,就可以从可观察序列的开始发出元素。

实例 75
代码:

///只要指定条件的值为true,就从可观察序列的开始发出元素
    func test_takeWhile() {
        // **** takeWhile: 只要指定条件的值为true,就从可观察序列的开始发出元素
        print("*****takeWhile*****")
        Observable.of(1, 2, 3, 4, 5, 6)
            .takeWhile { $0 < 3 } //取出满足条件的元素 (1,2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:


image

takeUntil

  • 从源可观察序列发出元素,直到参考可观察序列发出元素(这个很重要,应用非常频繁,比如我页面销毁了,就不能获取值了,如:cell重用)

实例 80
代码:

/// 从源可观察序列发出元素,直到参考可观察序列发出元素
    /// 这个要重点,应用非常频繁 比如我页面销毁了,就不能获取值了(cell重用运用)
    func test_takeUntil() {
        // ***** takeUntil: 从源可观察序列发出元素,直到参考可观察序列发出元素
        print("*****takeUntil*****")
        let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        sourceSequence
            .takeUntil(referenceSequence)
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
        
        sourceSequence.onNext("kongyulu")
        sourceSequence.onNext("yifeng")
        sourceSequence.onNext("yisheng")
        //referenceSequence.onNext("yuhairong") // 条件一出来,下面就走不了
        sourceSequence.onNext("test1")
        sourceSequence.onNext("test2")
        sourceSequence.onNext("test3")
    }

运行结果:


image
  • 将上面的实例 80的注释行:referenceSequence.onNext("yuhairong") 放开后,控制台打印的结果为:
image

skip

  • 从源可观察序列发出元素,直到参考可观察序列发出元素。(这个很重要,应用频繁)

实例 85
代码:

///从源可观察序列发出元素,直到参考可观察序列发出元素
    /// 这个要重点,应用非常频繁  textfiled 都会有默认序列产生
    func test_skip() {
        // ***** skip: 从源可观察序列发出元素,直到参考可观察序列发出元素
        print("*****skip*****")
        Observable.of(1,2,3,4,5,6)
        .skip(2) //直接跳过前面两个元素,即从3开始
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    }

运行结果:

image

skipWhile

  • 抑制满足指定条件的元素,直到参考可观察序列产生。

实例 90
代码:

/// 直接跳过满足条件的元素,相当于过滤作用
    func test_skipWhile() {
        print("*****skipWhile*****")
        //skipWhile刚刚和takeWhile的作用相反
        Observable.of(1, 2, 3, 4, 5, 6)
            .skipWhile { $0 < 4 } //直接跳过满足条件的元素,相当于过滤作用(满足小于4的都跳过,即只有4,5,6)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

运行结果:

image

skipUntil

  • 抑制从源可观察序列发出元素,直到参考可观察序列产生。

代码:

/// 抑制从源可观察序列发出元素,直到参考可观察序列发出元素
    func test_skipUntil() {
        // *** skipUntil: 抑制从源可观察序列发出元素,直到参考可观察序列发出元素
        // skipUntil 作用刚刚和 takeUntil 相反
        print("*****skipUntil*****")
        let sourceSeq = PublishSubject<String>()
        let referenceSeq = PublishSubject<String>()
        
        sourceSeq
            .skipUntil(referenceSeq)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // 没有条件命令 下面走不了
        sourceSeq.onNext("kongyulu")
        sourceSeq.onNext("yifeng")
        sourceSeq.onNext("yisheng")
        
        //referenceSeq.onNext("yuhairong") // 条件一出来,下面就可以走了
        
        sourceSeq.onNext("test1")
        sourceSeq.onNext("test2")
        sourceSeq.onNext("test3")
    }

运行结果:

image

这里由于注释了条件代码,什么都没有输出。下面我们把注释的那行代码放开referenceSeq.onNext("yuhairong") 再来看运行结果。

运行结果:


image

2.5 集合控制操作符

toArray

  • 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止。

实例 95
代码:

    /// 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
    func test_toArray() {
        // *** toArray: 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
        print("*****toArray*****")
        Observable.range(start: 1, count: 10)
            .toArray() //这里生成一个从1到10的数组
            .subscribe { print($0) }
            .disposed(by: disposeBag)
    }

运行结果:

image

reduce

  • 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累计器闭包,并以单个元素可观察序列的形式返回聚合结果。(类似于scan)

实例 100
代码:

/// 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
    func test_reduce() {
        // *** reduce: 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
        print("*****reduce*****")
        Observable.of(10, 100, 1000)
            .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

结果:


image

concat

  • 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功 终止。(用来控制顺序)

实例 105
代码:

/// 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止
    /// 用来控制顺序
    func test_concat() {
        // *** concat: 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止
        // 用来控制顺序
        print("*****concat*****")
        let subject1 = BehaviorSubject(value: "kongyulu")
        let subject2 = BehaviorSubject(value: "1")
        
        let subjectsSubject = BehaviorSubject(value: subject1)
        
        subjectsSubject.asObservable()
            .concat()
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        subject1.onNext("yifeng")
        subject1.onNext("yisheng")
        
        subjectsSubject.onNext(subject2)
        
        subject2.onNext("打印不出来")
        subject2.onNext("2")
        
        //subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
        subject2.onNext("3")
    }

结果1:

image

由于我们注释代码subject1.onCompleted(), 而使用了concat后subject2的订阅需要等待subject1完成之后才能执行。所以才有了我们看到的上面打印结果,subject2的订阅信息都没有打印出来。

  • 我们放开注释的那行代码,再重新运行代码,可以得到下面的结果:


    image

2.6 从序列错误中恢复的操作符

catchErrorJustReturn

  • 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止。

实例 110
代码:

/// 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
    func test_catchErrorJustReturn() {
        // **** catchErrorJustReturn
        // 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
        print("*****catchErrorJustReturn*****")
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("kongyulu")
            .subscribe{print($0)}
            .disposed(by: disposeBag)

        sequenceThatFails.onNext("yifeng")
        sequenceThatFails.onNext("yisheng")// 正常序列发送成功的
        sequenceThatFails.onError(self.kylError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
    }
    

输出结果:


image

catchError

  • 通过切换到提供的恢复可观察序列,从错误事件中恢复。

实例 115
代码:

 /// 通过切换到提供的恢复可观察序列,从错误事件中恢复
    func test_catchError() {
        // **** catchErrorJustReturn
        // 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
        print("*****catchErrorJustReturn*****")
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("kongyulu")
            .subscribe{print($0)}
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("yifeng")
        sequenceThatFails.onNext("yisheng")// 正常序列发送成功的
        sequenceThatFails.onError(self.kylError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
        
        // **** catchError
        // 通过切换到提供的恢复可观察序列,从错误事件中恢复
        print("*****catchError*****")
        
        let recoverySequence = PublishSubject<String>()
        
        recoverySequence
            .catchError {
            print("Error:",$0)
            return recoverySequence // 获取到了错误序列-我们在中间的闭包操作处理完毕,返回给用户需要的序列(showAlert)
            }
            .subscribe{print($0)}
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("test1")
        sequenceThatFails.onNext("test2")  // 正常序列发送成功的
        sequenceThatFails.onError(kylError) // 发送失败的序列
        
        recoverySequence.onNext("yuhairong")
        
    }

输出结果:


image

retry

  • 通过 无限地重新订阅可观察序列来恢复重复的错误事件。

实例 120
代码:

/// 通过无限地重新订阅可观察序列来恢复重复的错误事件
    func test_retry() {
        // *** retry: 通过无限地重新订阅可观察序列来恢复重复的错误事件
        print("*****retry*****")
        var count = 1 // 外界变量控制流程
        let sequenceRetryErrors = Observable<String>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            observer.onNext("yifeng")
            observer.onNext("yisheng")
            
            if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
                observer.onError(self.kylError)
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("test1")
            observer.onNext("test2")
            observer.onNext("test3")
            observer.onCompleted()
            return Disposables.create()
        }
        
        sequenceRetryErrors
            //.retry() //调用这个retry后,上面的observer闭包会重新执行一次
            .subscribe(onNext: {print($0)})
            .disposed(by: disposeBag)
    }

上面的代码我们注释掉了//.retry() ,这样我们可以更好的对比结果
输出结果:

image

从结果中我们看到了错误提示信息打印多来了,现在我们把注释的那行代码打开,再重新运行,查看结果,如下:

image

我们可以看到,observer的闭包重新执行了一次,多打印了信息。

retry(_ : )

  • 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数。

实例 125
代码:

/// retry(_:): 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数
    func test_retry2() {
        // **** retry(_:): 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数
        print("*****retry(_:)*****")
        var count = 1 // 外界变量控制流程
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("kongyulu")
            observer.onNext("yifeng")
            observer.onNext("yisheng")
            
            if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
                observer.onError(self.kylError) //发送错误消息
                print("错误序列来了")
                count += 1
            }
            //发送错误后,下面的sender都不会打印了
            observer.onNext("sender 1")
            observer.onNext("sender 2")
            observer.onNext("sender 3")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3) //重复地从错误事件中恢复3次
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

结果:


image

2.7 debug Rx流程操作符

debug

  • 打印所有订阅,事件和处理。
    实例 130
    代码:
func test_debug() {
        // **** debug
        // 打印所有订阅、事件和处理。
        print("*****debug*****")
        var count = 1
        
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("Kongyulu")
            observer.onNext("yifeng")
            observer.onNext("yisheng")
            
            if count < 5 {
                observer.onError(self.kylError)
                print("错误序列来了")
                count += 1
            }
            
            observer.onNext("yuhairong")
            observer.onNext("zhangsiyuan")
            observer.onNext("kongliyuan")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3)
            .debug()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    }

结果:

RxSwift.Resources.total

  • 操作符 提供所有Rx资源分配的计数,这对于在开发期间检测内存泄漏非常有用。

实例 135
代码:

    /// RxSwift.Resources.total 操作符
    func testResourcesTotal() {
        
        // ** RxSwift.Resources.total: 提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用。
        print("*****RxSwift.Resources.total*****")

        print(RxSwift.Resources.total)

        let subject = BehaviorSubject(value: "Cooci")

        let subscription1 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        let subscription2 = subject.subscribe(onNext: { print($0) })

        print(RxSwift.Resources.total)

        subscription1.dispose()

        print(RxSwift.Resources.total)

        subscription2.dispose()

        print(RxSwift.Resources.total)
    }

结果:

2.8 链接操作符

multicast

  • 将源可观察序列转换为可连接序列,并通过指定的主题广播它的发射。

实例 140
代码:

/// multicast
    func testMulticastConnectOperators(){
        
        // *** multicast : 将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
        print("*****multicast*****")
        let subject = PublishSubject<Any>()
        subject.subscribe{print("00:\($0)")}
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { (observer) -> Disposable in
            sleep(2)// 模拟网络延迟
            print("我开始请求网络了")
            observer.onNext("请求到的网络数据")
            observer.onNext("请求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("销毁回调了")
            }
            }.publish()
        
        netOB.subscribe(onNext: { (anything) in
            print("订阅1:",anything)
        })
            .disposed(by: disposeBag)
        
        // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
        // 所以在订阅一次 会出现什么问题?
        netOB.subscribe(onNext: { (anything) in
            print("订阅2:",anything)
        })
            .disposed(by: disposeBag)
        
        _ = netOB.connect()
        
    }

结果:

image

replay

  • 将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放一起排放的缓存大小。(首先拥有和publish一样的能力,共享Observable 和sequence,其次使用replay还需要我们传入一个参数:buffer size 来缓存已经发送的事件,当有了新的订阅者订阅了,会把缓存的事件发送给新的订阅者)

实例 145
代码:

/// replay
    func testReplayConnectOperators(){
        
        // **** replay: 将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
        // 首先拥有和publish一样的能力,共享 Observable sequence, 其次使用replay还需要我们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者
        print("*****replay*****")
        
        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
        
        interval.subscribe(onNext: { print(Date.time,"订阅: 1, 事件: \($0)") })
            .disposed(by: self.disposeBag)
        
        delay(2) { _ = interval.connect() }
        
        delay(4) {
            interval.subscribe(onNext: { print(Date.time,"订阅: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        
        delay(8) {
            interval.subscribe(onNext: { print(Date.time,"订阅: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(20, closure: {
            self.disposeBag = DisposeBag()
        })
        
        /**
         订阅: 1, 事件: 4
         订阅: 1, 事件: 0
         2019-05-28 21-32-42 订阅: 2, 事件: 0
         2019-05-28 21-32-42 订阅: 1, 事件: 1
         2019-05-28 21-32-42 订阅: 2, 事件: 1
         2019-05-28 21-32-45 订阅: 2, 事件: 4
         2019-05-28 21-32-46 订阅: 3, 事件: 0
         2019-05-28 21-32-46 订阅: 3, 事件: 1
         2019-05-28 21-32-46 订阅: 3, 事件: 2
         2019-05-28 21-32-46 订阅: 3, 事件: 3
         2019-05-28 21-32-46 订阅: 3, 事件: 4
         
         // 序列从 0开始
         // 定时器也没有断层  sub2 sub3 和 sub1 是同步的
         */
    }

结果:

image

push

  • 将源可观察序列转换为可连接序列。(共享一个Observable的实际序列,避免创建多个Observable 和sequence.注意:需要调用conect之后才会开始发送事件)

实例 150
代码:

/// push - connect 将源可观察序列转换为可连接序列
    func testPushConnectOperators(){
        
        // **** push:将源可观察序列转换为可连接序列
        // 共享一个Observable的事件序列,避免创建多个Observable sequence。
        // 注意:需要调用connect之后才会开始发送事件
        print("*****testPushConnect*****")
        
        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
        
        interval.subscribe(onNext: { print("订阅: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        delay(2) {
            _ = interval.connect()
        }
        delay(4) {
            interval.subscribe(onNext: { print("订阅: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(6) {
            interval.subscribe(onNext: { print("订阅: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        delay(10, closure: {
            self.disposeBag = DisposeBag()
        })
        /**
         订阅: 1, 事件: 1
         订阅: 2, 事件: 1
         订阅: 1, 事件: 2
         订阅: 2, 事件: 2
         订阅: 1, 事件: 3
         订阅: 2, 事件: 3
         订阅: 3, 事件: 3
         
         订阅: 2 从1开始
         订阅: 3 从3开始
         */
        // 但是后面来的订阅者,却无法得到之前已发生的事件
    }

结果:


image

Rwswift高阶函数源码分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348

推荐阅读更多精彩内容

  • 组合操作符 startWith 在开始从可观察源发出元素之前,发出指定的元素序列。可以理解为“+”号,并且后加的先...
    简_爱SimpleLove阅读 499评论 0 2
  • Rxswift的高阶函数在我们程序员实际开发中可以极大的提高我们的开发效率,他几乎可以满足我们日常开发的大部分需求...
    Ldies阅读 279评论 0 0
  • 1.RxSwift初识 RxSwift是 ReactiveX 的swift版本,是一种函数式响应编程的框架。RxS...
    MrMessy阅读 3,168评论 0 11
  • 最近在学习RxSwift相关的内容,在这里记录一些基本的知识点,以便今后查阅。 Observable 在RxSwi...
    L_Zephyr阅读 1,746评论 1 4
  • 今天被一篇小学生写的作文彻底震撼到了。真的,就算我这个参加工作20年有着一定人生阅历的成年人,也断然写不...
    赤色风铃66阅读 470评论 0 1