Observable 这个类是 Rx 框架的基础,可以称它为可观察序列。
它的作用,可以异步地产生一系列的 Event(事件),即一个 Observable 对象会随着时间推移不定期地发出 event(element : T) 这样一个东西。
而且这些 Event 还可以携带数据,它的泛型就是用来指定这个 Event 携带的数据的类型。
有了可观察序列,还需要有一个 Observer(订阅者)来订阅它,这样订阅者才能收到 Observable 发出的 Event
Event
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
可以看出 Event 就是个枚举,也就是说一个 Observable 是可以发出 3 种不同类型的事件:
- next事件:就是那个可以携带数据的事件。
- error事件:表示一个错误,可以携带具体的错误内容,一旦 Observable 发出了 error,则这个 Observable 就等于终止了,以后它再也不会发出 event 事件了。
- completed事件:表示 Observable 发出的事件正常结束了,跟 error 一样,一旦发出了 completed,则这个 Observable 就等于终止了,以后它再也不会发出 event 事件了。
Observable的创建
- just() 方法
该方法通过传入一个默认值来初始化。指定了这个 Observable 所发出的事件携带的数据类型必须是 Int 类型的。
let observable = Observable<Int>.just(5)
- of() 方法
该方法可以接受可变数量的参数(必需要是同类型的)
let observable = Observable.of("A", "B", "C")
- from() 方法
该方法需要一个数组参数。下面样例中数据的元素会被当做这个 Observable 所发出 event 携带的数据内容,最终效果同上面饿 of() 样例是一样的。
let observable = Observable.from(["A", "B", "C"])
- empty() 方法
该方法创建一个空内容的 Observable 序列 - never() 方法
该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列
let observable = Observable<Int>.never()
- error() 方法
该方法创建一个不做任何操作,而是直接发送一个错误的 Observable 序列
enum MyError: Error {
case A
case B
}
let observable = Observable<Int>.error(MyError.A)
- create() 方法
该方法接受一个 block 形式的参数,任务是对每一个过来的订阅进行处理
//这个block有一个回调参数observer就是订阅这个Observable对象的订阅者
//当一个订阅者订阅这个Observable对象的时候,就会将订阅者作为参数传入这个block来执行一些内容
let observable = Observable<String>.create{observer in
//对订阅者发出了.next事件,且携带了一个数据"hangge.com"
observer.onNext("hangge.com")
//对订阅者发出了.completed事件
observer.onCompleted()
//在结尾 returen 一个Disposable 来取消订阅
return Disposables.create()
}
//订阅测试
observable.subscribe {
print($0)
}
8. deferred() 方法
该方法相当于创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable 序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方
- interval() 方法
这个方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
- timer() 方法
这个方法有两种用法,一种是创建的 Observable 序列在经过设定的一段时间后,产生唯一的一个元素
//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素
//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
Observable的订阅、监听
有了 Observable,我们还要订阅它,接收它发出的 Event
- 订阅方式一
用 subscribe订阅Observable 对象,该方法的 block 的回调参数就是被发出的 event 事件
let observable = Observable.of("A", "B", "C")
observable.subscribe { event in
print(event)
}
- 订阅方式二
RxSwift 还提供了另一个 subscribe 方法,它可以把 event 进行分类:
通过不同的 block 回调处理不同类型的 event。(其中 onDisposed 表示订阅行为被 dispose 后的回调)
同时会把 event 携带的数据直接解包出来作为参数,方便我们使用。
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
subscribe() 方法的 onNext、onError、onCompleted 和 onDisposed 这四个回调 block 参数都是有默认值的,即它们都是可选的。所以我们也可以只处理 onNext 而不管其他的情况。
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
})
事件监听
- doOn
用 doOn 方法来监听事件的生命周期,它会在每一次事件发送前被调用。
它和 subscribe 一样,可以通过不同的 block 回调处理不同类型的 event。
如:do(onNext:) 方法就是在 subscribe(onNext:) 前调用
而 do(onCompleted:) 方法则会在 subscribe(onCompleted:) 前面调用
let observable = Observable.of("A", "B", "C")
observable
.do(onNext: { element in
print("Intercepted Next:", element)
}, onError: { error in
print("Intercepted Error:", error)
}, onCompleted: {
print("Intercepted Completed")
}, onDispose: {
print("Intercepted Disposed")
})
.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
})
订阅销毁
Observable 从创建到终结流程
Observable 序列被创建出来后不会马上就被激活发出 Event,而是要等到被订阅了才会激活
而 Observable 序列激活后,要一直等到它发出了 .error 或 .completed 的 event 后,才被终结。dispose()
该方法可以手动取消一个订阅行为。
如果订阅结束了不再需要了,可以调用 dispose() 方法,把订阅销毁掉,防止内存泄漏。
当一个订阅被 dispose 了,之后 observable 如果再发出 event,这个已经 dispose 的订阅就收不到消息了
let observable = Observable.of("A", "B", "C")
//使用subscription常量存储这个订阅方法
let subscription = observable.subscribe { event in
print(event)
}
//调用这个订阅的dispose()方法
subscription.dispose()
- DisposeBag
除dispose() 方法外,我们更经常用到的是一个叫 DisposeBag 的对象来管理多个订阅行为的销毁
我们可以把DisposeBag 对象看成一个垃圾袋,把用过的订阅行为都放进去。
而这个 DisposeBag 会在自己快要 dealloc 时,对它里面的所有订阅行为调用 dispose() 方法
let disposeBag = DisposeBag()
//第1个Observable,及其订阅
let observable1 = Observable.of("A", "B", "C")
observable1.subscribe { event in
print(event)
}.disposed(by: disposeBag)
//第2个Observable,及其订阅
let observable2 = Observable.of(1, 2, 3)
observable2.subscribe { event in
print(event)
}.disposed(by: disposeBag)
观察者(Observer)
观察者的作用就是监听事件,然后对事件做出响应。或者说任何响应事件的行为都是观察者
如:
- 当我们点击按钮,弹出一个提示框。那么这个“弹出一个提示框”就是观察者
- 当我们请求一个远程的 json 数据后,将其打印出来。那么这个“打印 json 数据”就是观察者
1. 创建观察者
- 创建观察者最直接的方法是在 Observable 的 subscribe 、bind方法后面描述当事件发生时,如何做出响应
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
})
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind { [weak self](text) in
//收到发出的索引数后显示到label上
self?.label.text = text
}
.disposed(by: disposeBag)
- 使用 AnyObserver 创建观察者
- AnyObserver 可以用来描叙任意一种观察者,配合 subscribe 方法使用
//观察者
let observer: AnyObserver<String> = AnyObserver { (event) in
switch event {
case .next(let data):
print(data)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
- 配合 bindTo 方法使用
//观察者
let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
switch event {
case .next(let text):
//收到发出的索引数后显示到label上
self?.label.text = text
default:
break
}
}
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind(to: observer)
.disposed(by: disposeBag)
}
- 使用 Binder 创建观察者
相较于 AnyObserver 的大而全,Binder 更专注于特定的场景。
Binder 主要有以下两个特征:
- 不会处理错误事件,确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)
- 一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。
Subjects
既是订阅者,也是 Observable
说它是订阅者,因为能够动态地接收新的值
说它是Observable,是因为当 Subjects 有了新的值后,就会通过 Event 将新值发出给所有的订阅者
一共有四种 Subjects,分别为:PublishSubject、BehaviorSubject、ReplaySubject、Variable
- PublishSubject
PublishSubject 是最普通的 Subject,它不需要初始值就能创建。
PublishSubject 的订阅者从他们开始订阅的时间点起,可以收到订阅后 Subject 发出的新 Event,而不会收到他们在订阅前已发出的 Event
let disposeBag = DisposeBag()
//创建一个PublishSubject
let subject = PublishSubject<String>()
//第1次订阅subject
subject.subscribe(onNext: { string in
print("第1次订阅:", string)
}, onCompleted:{
print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
//由于当前没有任何订阅者,所以这条信息不会输出到控制台
subject.onNext("111")
- BehaviorSubject
BehaviorSubject 需要通过一个默认初始值来创建。
当一个订阅者来订阅它的时候,这个订阅者会立即收到 BehaviorSubjects 上一个发出的 event。之后就跟正常的情况一样,它也会接收到 BehaviorSubject 之后发出的新的 event
let disposeBag = DisposeBag()
//创建一个BehaviorSubject
let subject = BehaviorSubject(value: "111")
//第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
变换操作符
- map
该操作符通过传入一个函数闭包把原来的 Observable 序列转变为一个新的 Observable 序列
变换操作指的是对原始的 Observable 序列进行一些转换
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
- flatMap
map 在做转换的时候容易出现“升维”的情况。即转变之后,从一个序列变成了一个序列的序列。
而 flatMap 操作符会对源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。即又将其 “拍扁”(降维)成一个 Observable 序列。
这个操作符是非常有用的。比如当 Observable 的元素本生拥有其他的 Observable 时,我们可以将所有子 Observables 的元素发送出来