RxSwift (三)Observable的创建,订阅,销毁

@TOC

可观察的序列Observable

通过前面博客对Rxswift的源码分析,我们知道在Rxswift中一条主线思想就是万物皆序列,这里的序列就是我们的可观察序列,也可以称之为观察者。所以使用Rxswift总会跟观察者Observer打交道,这里重温一下观察者的定义。

本篇博客主要讲解Observer的创建,订阅,销毁的用法,不深入讲解它的底层实现,不讲解它的源码,如要了解源码实现过程分析请参考我之前的博客:序列的核心逻辑

Observable定义

观察者(Observable)的作用就是监听事件,然后对这个事件做出响应。或者说任何响应事件的行为都是观察者。比如:

  • 当我们点击按钮,弹出一个提示框。那么这个“弹出一个提示框”就是观察者Observer<Void>
  • 当我们请求一个远程的json 数据后,将其打印出来。那么这个“打印 json 数据”就是观察者 Observer<JSON>
image

Observable的创建,订阅,销毁

Observable创建

在 subscribe 方法中创建Observable

  1. 创建观察者最直接的方法就是在 Observablesubscribe 方法后面描述当事件发生时,需要如何做出响应。
  2. 比如下面的样例,观察者就是由后面的 onNextonErroronCompleted 这些闭包构建出来的。
  3. 实例1

代码:

let observable = Observable.of("A", "B", "C")
          
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
})

运行结果:

A
B
C
completed

在 bind 方法中创建Observable

  1. 实例2
    下面代码我们创建一个定时生成索引数的 Observable 序列,并将索引数不断显示在 label 标签上

代码:

import UIKit
import RxSwift
import RxCocoa
 
class ViewController: UIViewController {
     
    @IBOutlet weak var label: UILabel!
     
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
         
        //Observable序列(每隔1秒钟发出一个索引数)
        let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
 
        observable
            .map { "当前索引数:\($0 )"}
            .bind { [weak self](text) in
                //收到发出的索引数后显示到label上
                self?.label.text = text
            }
            .disposed(by: disposeBag)
    }
}

使用 AnyObserver 创建Observable

AnyObserver 可以用来描叙任意一种观察者。

  1. AnyObserver配合 subscribe 方法使用
    比如上面实例1我们可以改成如下代码:
//观察者
let observer: AnyObserver<String> = AnyObserver { (event) in
    switch event {
    case .next(let data):
        print(data)
    case .error(let error):
        print(error)
    case .completed:
        print("completed")
    }
}
 
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
  1. AnyObserver配合bindTo 方法使用
    也可配合 Observable 的数据绑定方法(bindTo)使用。比如上面的实例2可以改成如下代码:
import UIKit
import RxSwift
import RxCocoa
 
class ViewController: UIViewController {
     
    @IBOutlet weak var label: UILabel!
    let disposeBag = DisposeBag()
    override func viewDidLoad() {
         
        //观察者
        let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
            switch event {
            case .next(let text):
                //收到发出的索引数后显示到label上
                self?.label.text = text
            default:
                break
            }
        }
         
        //Observable序列(每隔1秒钟发出一个索引数)
        let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        observable
            .map { "当前索引数:\($0 )"}
            .bind(to: observer)
            .disposed(by: disposeBag)
    }
}

使用 Binder 创建Observable

  1. 相较于AnyObserver 的大而全,Binder 更专注于特定的场景。Binder 主要有以下两个特征:
  • 不会处理错误事件
  • 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)

一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。

针对上面的实例2 中的label 标签的文字显示就是一个典型的 UI 观察者。它在响应事件时,只会处理 next 事件,而且更新 UI 的操作需要在主线程上执行。那么这种情况下更好的方案就是使用 Binder改用 Binder 会简单许多。

使用Binder改造后的代码如下:

import UIKit
import RxSwift
import RxCocoa
 
class ViewController: UIViewController {
     
    @IBOutlet weak var label: UILabel!
    let disposeBag = DisposeBag()
  
    override func viewDidLoad() {      
        //观察者
        let observer: Binder<String> = Binder(label) { (view, text) in
            //收到发出的索引数后显示到label上
            view.text = text
        }
        //Observable序列(每隔1秒钟发出一个索引数)
        let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        observable
            .map { "当前索引数:\($0 )"}
            .bind(to: observer)
            .disposed(by: disposeBag)
    }
}

使用工厂方法创建Observable

在Observable的扩展类里提供了一系列创建被观察对象的工厂化方法。主要有下面这些:

  • never() :创建一个Never序列,该序列不会发出任何事件,也不会终止。
  • empty(): 创建一个Empty序列,该序列只发出completed事件。
  • error(): 创建一个错误的序列,该序列以’error’事件终止。即创建一个不会发送任何条目并且立即终止错误的Observerable序列
  • just(): 创建一个Just序列,该序列只包含一个元素。
  • of(): 创建一个新的被观察序列的对象,它包含可变数量的元素。
  • from(): 通过数组来创建一个被观察序列。从一个序列(如Array/Dictionary/Set)中创建一个Observer
  • range():在指定范围内生成一个被观察的整数序列,发出事件n次。即创建一个Observable序列,它会发出一系列连续的整数,然后终止。
  • repeatElement(): 生成一个被观察的序列,重复发出指定元素n次。
  • generate(): 创建一个被观察的序列,只要提供的条件为真,就发出状态值。
  • deferred(): 为每个订阅事件的观察者都创建一个新的被观察的序列。(一对一的关系)
  • doOn(): 在订阅的被观察者的事件执行之前,先执行do后面和要执行的订阅事件对应的方法。
  • create(): 通过指定的方法实现来自定义一个被观察的序列。
  • timer(): 获取计时器Observable序列
  • interval(): 底层就是封装timer

针对上面的工厂方法,下面来举例说明

never创建序列

never()方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
实例4
代码:

let disposeBag = DisposeBag()
let neverSequence = Observable<String>.never()
_ = neverSequence.subscribe {_ in
    print("This will never be printed")
}.addDisposableTo(disposeBag)

结果:

不会输出任何结果。
empty创建序列

empty()方法创建一个空内容的 Observable 序列。
实例5
代码:

let disposeBag = DisposeBag()
    Observable<Int>.empty().subscribe {
        event in
        print(event)
    }.addDisposableTo(disposeBag)

结果:

completed
error创建序列

error()方法创建一个不做任何操作,而是直接发送一个错误的Observable 序列。
实例6
代码:

let disposeBag = DisposeBag()
    Observable<Int>.error(TestError.test)
        .subscribe { print($0) }
        .addDisposableTo(disposeBag)

结果:

error(test)
just创建序列

just()方法通过传入一个默认值来初始化。

实例7
代码:

let disposeBag = DisposeBag()
Observable.just("1").subscribe { event in
            print(event)
        }.addDisposableTo(disposeBag)

结果:

next(1)
completed
of创建序列

of()方法可以接受可变数量的参数(必需要是同类型的)
实例8
代码:

let disposeBag = DisposeBag()
Observable.of("1", "2", "3", "4").subscribe(onNext: { element in
     print(element)
}).addDisposableTo(disposeBag)

结果:

1
2
3
4
from创建序列

from()方法需要一个数组参数。

实例9
代码:

let disposeBag = DisposeBag()
Observable.from(["1", "2", "3", "4"]).subscribe(onNext: { print($0) }).addDisposableTo(disposeBag)

结果:

1
2
3
4
range创建序列

range()方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的Observable序列。
实例10
代码:

let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10).subscribe { print($0) }.addDisposableTo(disposeBag)

结果:

next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
next(7)
next(8)
next(9)
next(10)
completed
repeatElement创建序列

repeatElement()方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)。
实例11
代码:

let disposeBag = DisposeBag()
Observable.repeatElement("1")
.take(3)
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)

结果:

1
1
1
generate创建序列

generate()方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
实例12
代码:

let disposeBag = DisposeBag()
Observable.generate(
           initialState: 0,
           condition: { $0 < 3 },
           iterate: { $0 + 1 }
        )
        .subscribe(onNext: { print($0) })
        .addDisposableTo(disposeBag)

结果:

0
1
2
deferred创建序列

实例13
代码:

let disposeBag = DisposeBag()
    var count = 1
    let deferredSequence = Observable<String>.deferred {
        print("Creating \(count)")
        count += 1

        return Observable.create { observer in
            print("Emitting...")
            observer.onNext("1")
            observer.onNext("2")
            observer.onNext("3")
            return Disposables.create()
        }
    }

    deferredSequence
        .subscribe(onNext: { print($0) })
        .addDisposableTo(disposeBag)

    deferredSequence
        .subscribe(onNext: { print($0) })
        .addDisposableTo(disposeBag)

结果:

Creating 1
Emitting...
1
2
3
Creating 2
Emitting...
1
2
3
deferred创建序列

deferred()方法相当于是创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方。

实例14
代码:

let disposeBag = DisposeBag()
    var count = 1
    let deferredSequence = Observable<String>.deferred {
        print("Creating \(count)")
        count += 1

        return Observable.create { observer in
            print("Emitting...")
            observer.onNext("1")
            observer.onNext("2")
            observer.onNext("3")
            return Disposables.create()
        }
    }

    deferredSequence
        .subscribe(onNext: { print($0) })
        .addDisposableTo(disposeBag)

    deferredSequence
        .subscribe(onNext: { print($0) })
        .addDisposableTo(disposeBag)

结果:

Creating 1
Emitting...
1
2
3
Creating 2
Emitting...
1
2
3
doOn创建序列

实例15
代码:

let disposeBag = DisposeBag()
Observable.of("1", "2", "3", "4")
          .do(onNext: { print("Intercepted:", $0) }, 
              onError { print("Intercepted error:", $0) },
              onCompleted: { print("Completed")  })
          .subscribe(onNext { print($0) },
                     onCompleted: { print("结束") })
          .addDisposableTo(disposeBag)

结果:

Intercepted: 1
1
Intercepted: 2
2
Intercepted: 3
3
Intercepted: 4
4
Completed
结束
create创建序列

create()方法接受一个 block 形式的参数,任务是对每一个过来的订阅进行处理。
代码:
实例16

let disposeBag = DisposeBag()
let myJust = { (element: String) -> Observable<String> in
    return Observable.create { observer in  
    observer.on(.next(element))
    observer.on(.completed)
    return Disposables.create()
    }
}
myJust("1").subscribe { print($0) }.addDisposableTo(disposeBag)

结果:

next(1)
completed
timer创建序列

timer()方法有两种用法,一种是创建的 Observable序列在经过设定的一段时间后,产生唯一的一个元素。另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素。

实例17

方式一:
//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

方式二:
//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}
interval创建序列

interval()方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。

实例18

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

Observer订阅

有了 Observable,我们还要使用 subscribe() 方法来订阅它,接收它发出的 Event。

订阅Observable有两种方式:

  1. 我们使用 subscribe() 订阅了一个Observable 对象,该方法的 block 的回调参数就是被发出的 event 事件,如果想要获取到这个事件里的数据,可以通过 event.element 得到。
    例如:
let observable = Observable.of("A", "B", "C")
observable.subscribe { event in
    print(event)
    //print(event.element)
}

结果:
next(A)
next(B)
next(C)
completed

  1. RxSwift 还提供了另一个 subscribe方法,它可以把 event 进行分类,通过不同的 block 回调处理不同类型的 event.同时会把 event 携带的数据直接解包出来作为参数,方便我们使用。subscribe() 方法的 onNextonErroronCompletedonDisposed 这四个回调 block 参数都是有默认值的,即它们都是可选的。所以我们也可以只处理 onNext而不管其他的情况。

实例19
代码如下:

let observable = Observable.of("A", "B", "C")
         
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
}, onDisposed: {
    print("disposed")
})

Observable销毁

一个 Observable 序列被创建出来后它不会马上就开始被激活从而发出 Event,而是要等到它被某个人订阅了才会激活它。而Observable 序列激活之后要一直等到它发出了.error或者 .completed的 event 后,它才被终结。dispose()就是用来销毁Observable。

  • dispose()
    (1)使用该方法我们可以手动取消一个订阅行为。
    (2)如果我们觉得这个订阅结束了不再需要了,就可以调用 dispose()方法把这个订阅给销毁掉,防止内存泄漏。
    (3)当一个订阅行为被dispose 了,那么之后 observable 如果再发出 event,这个已经 dispose 的订阅就收不到消息了。

实例20
代码:

let observable = Observable.of("A", "B", "C")
         
//使用subscription常量存储这个订阅方法
let subscription = observable.subscribe { event in
    print(event)
}
         
//调用这个订阅的dispose()方法
subscription.dispose()
  • DisposeBag
    除了 dispose()方法之外,我们更经常用到的是一个叫 DisposeBag 的对象来管理多个订阅行为的销毁:
  1. 我们可以把一个 DisposeBag对象看成一个垃圾袋,把用过的订阅行为都放进去。
  2. 而这个DisposeBag 就会在自己快要dealloc 的时候,对它里面的所有订阅行为都调用 dispose()方法。

实例21

代码:

let disposeBag = DisposeBag()
         
//第1个Observable,及其订阅
let observable1 = Observable.of("A", "B", "C")
observable1.subscribe { event in
    print(event)
}.disposed(by: disposeBag)
 
//第2个Observable,及其订阅
let observable2 = Observable.of(1, 2, 3)
observable2.subscribe { event in
    print(event)
}.disposed(by: disposeBag)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容