一直以来,响应式编程都是业界讨论的热门话题之一。为了推广响应式编程,ReactiveX 社区几乎为每一种编程语言设计实现了一种对应的响应式编程框架。RxSwift 就是针对 Swift 所开发的响应式框架。
关于 RxSwift,网上有不少相关的学习资料,但绝大多数都是 RxSwift 的使用说明,鲜有文章介绍 RxSwift 背后的设计原理。通过阅读源码,查阅资料,正向设计,我逐步理解了 RxSwift 的设计思想。因此,趁热打铁,记录并总结一下我的理解。
本文所实现的 RxSwift 代码已在 Github 开源——传送门。参照源码阅读本文效果更佳。
基础
RxSwift 主要蕴含了以下几种设计思想:
- 发布-订阅模式
- 流编程
- 函数式编程
下面,我们依次来进行介绍。
发布-订阅模式
发布-订阅模式 是 RxSwift 所呈现的一种最直观思想。发布-订阅模式可以分为两个角色:发布者、订阅者。
订阅者的主要职责是:
- 订阅:监听并处理某个事件。其本质就是向发布者注册一个处理某个事件的闭包。
- 取消(订阅)
发布者的主要职责是:
- 发布:分发某个事件
发布-订阅模式的基本原理是:
- 订阅者调用发布者提供的订阅方法进行订阅,从而在发布者内部注册订阅者。
- 发布者内部会维护一个订阅者列表。
- 当发布者发布事件时,会遍历订阅者列表,执行其中的处理方法(将事件作为参数传递给闭包,并执行)。
举例
在日常开发中,发布-订阅模式是一种广泛被应用的设计模式。比如:iOS 中的 NotificationCenter、Flutter 中的事件总线都是基于这种模式实现的。如下所示为 Flutter 中事件总线的一种实现方式,其中的代码逻辑基本遵循了上述所描述的发布-订阅模式的基本原理。
// 订阅者订阅内容签名
typedef void EventCallback(arg);
class EventBus {
// 私有构造函数
EventBus._internal();
// 保存单例
static EventBus _singleton = new EventBus._internal();
// 工厂构造函数
factory EventBus()=> _singleton;
// 保存事件订阅者队列,key:事件名(id),value: 对应事件的订阅者队列
var _emap = new Map<Object, List<EventCallback>>();
// 添加订阅者,即订阅
void on(eventName, EventCallback f) {
if (eventName == null || f == null) return;
_emap[eventName] ??= new List<EventCallback>();
_emap[eventName].add(f);
}
// 移除订阅者,即取消
void off(eventName, [EventCallback f]) {
var list = _emap[eventName];
if (eventName == null || list == null) return;
if (f == null) {
_emap[eventName] = null;
} else {
list.remove(f);
}
}
// 触发事件,事件触发后该事件所有订阅者会被调用,即发布
void emit(eventName, [arg]) {
var list = _emap[eventName];
if (list == null) return;
int len = list.length - 1;
// 反向遍历,防止订阅者在回调中移除自身带来的下标错位
for (var i = len; i > -1; --i) {
// 执行注册的闭包
list[i](arg);
}
}
}
// 定义一个 top-level(全局)变量,页面引入该文件后可以直接使用 bus
var bus = new EventBus();
思考:为什么 iOS 中一个监听了某个通知的类必须要在
dealloc
时要执行removeObserver:
方法?
流编程
流(stream) 是 RxSwift 另一个重要的设计思想。Observable<T>
是 Rx 框架的基础,也被称为 可观察序列。它的作用是可以异步地产生一系列数据,即一个 Observable<T>
对象会随着时间的推移不定期地发出 event(element: T)
。数据就像水流一样持续不断地在流动,顾名思义,这也被称为 流编程。
关于流编程,《计算机程序的构造与解释》一书中认为 流编程是一种调用驱动的编程思想。流编程的基本思想是:一般情况下,只是部分地构造出流的结构,并将这样的部分结构传给使用流的程序。如果使用者需要访问这个流中未构造出的那个部分,那么这个流就会自动地继续构造下去,但是只做出满足当时需要的那一部分。
如下所示是 RxSwift 中常见使用形式,observable
是数据源,不断地发出数据,如果水流一样,最终流向 subscribe
中的闭包。期间会流经 map
,filter
等操作符,经过转换或过滤。这就是流编程的思想。
let observable: Observable<Int> = Observable.create { (observer) -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}
observable.map { $0 + 2}.filter { $0 > 3 }.subscribe { (event) in
...
}
流编程底层实现的本质则是 闭包的延迟执行和强制执行。具体来说是基于一种称为 delay
的特殊形式,对于 delay <exp>
的求值不会对 <exp>
求值,而是返回一个称为 延时对象 的对象。它可以看做是对未来的某个时间求值 <exp>
的允诺。在各类编程语言中,返回特定类型的闭包常被用于描述一个延迟对象。与 delay
配对的是 force
的过程。它以一个延时对象为参数,执行相应的求值工作,即迫使 delay
完成其所允诺的求值。在各类编程语言中,执行返回特定类型的闭包常被用于描述 force
的过程。
函数式编程
RxSwift 提供了大量无副作用的操作符,无副作用也是函数式编程的一种重要特性。RxSwift 能够实现操作符的链式调用,一个重要的前提是:提供操作符的类型和操作符的返回类型必须保持一致。这里就涉及到了函数式编程中的一些高阶概念:函子(Functor)、适用函子(Applicative)、单子(Monad)。详细内容可参见《函数式编程——Functor、Applicative、Monad》一文。
不过,RxSwift 操作符中运用最多的是 函子(Functor),什么是函子?简而言之,函子能够将普通函数应用到一个包装类型。如下图及代码所示,一个包装类型包含了一个原始值,当函子作用在其上后,使用普通函数对原始值进行转换,最终将结果值放入包装类型中返回。
extension Result {
// 满足 Functor 的条件:map 方法能够将 普通函数 应用到包装类
func map<U>(_ f: (T) -> U) -> Result<U> {
switch self {
case .success(let x): return .success(f(x))
case .failure: return .failure
}
}
}
以 RxSwift 中最常用的 map
操作符为例,如下所示。map
方法扩展自 ObservableType
,其能够将普通函数 (Element) -> Result
应用到 ObservableType
包装类型。这其实就是一种典型的 函子 应用。
extension ObservableType {
public func map<Result>(_ transform: @escaping (Element) throws -> Result)
-> Observable<Result> {
return Map(source: self.asObservable(), transform: transform)
}
}
核心实现
下面我们将以正向设计的方式,结合 RxSwift 中的设计思想,手动实现一个 RxSwift 的核心部分。
基本功能(RxDemo-01)
首先,我们来定义事件,RxSwift 中有三种类型的事件,如下所示:
// MARK: - Event
enum Event<Element> {
case next(Element)
case error(Error)
case completed
}
其次,我们来定义订阅者,在 RxDemo-01
中,我们先忽略 取消订阅 的功能,如下所示。订阅者必须遵循订阅者协议,需要实现 监听事件 的方法 on(event: Event<Element>)
。订阅者内部维护一个处理事件的闭包 _handler
。当监听事件方法触发时,会立即执行处理事件闭包。
// MARK: - Observer
protocol ObserverType {
associatedtype Element
// 监听事件
func on(event: Event<Element>)
}
class Observer<Element>: ObserverType {
// 处理事件的闭包
private let _handler: (Event<Element>) -> Void
init(handler: @escaping (Event<Element>) -> Void) {
_handler = handler
}
// 实现 监听事件 的协议,内部处理事件
func on(event: Event<Element>) {
// 处理事件
_handler(event)
}
}
最后,我们来定义发布者,如下所示。发布者必须遵循发布者协议,需要实现 订阅操作 的方法 subscribe<O: ObserverType>(observer: O) where O.Element == Element
。发布者内部维护一个发布事件的闭包 _eventGenerator
。当订阅发生时(订阅操作方法被执行时),会立即执行发布事件的闭包。
// MARK: - Observable
protocol ObservableType {
associatedtype Element
// 订阅操作
func subscribe<O: ObserverType>(observer: O) where O.Element == Element
}
class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Void
init(eventGenerator: @escaping (Observer<Element>) -> Void) {
_eventGenerator = eventGenerator
}
// 实现 订阅操作 的协议,内部发布事件
func subscribe<O: ObserverType>(observer: O) where O.Element == Element {
// 生成事件
_eventGenerator(observer as! Observer<Element>)
}
}
接下来,我们来试用一下 RxDemo-01 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式基本吻合。
// MARK: - Test
let observable = Observable<Int> { (observer) in
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
print("send completed")
observer.on(event: .completed)
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
observable.subscribe(observer: observer)
// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// send completed
// recive completed
下图所示为 RxDemo-01 实现的 RxSwift 的内部调用关系。通过闭包实现将 observer
传递给 observable
,在发布事件时能够将事件传递给 observer
,从而形成一条看似自左向右流动的数据流。
取消订阅(RxDemo-02)
RxDemo-01 有一个明显的缺陷——无法取消订阅。我们来参考 RxSwift 的实现,它 并不是直接让订阅者支持取消订阅,而是通过一个第三方类型 Disposable
对订阅进行管理。Disposable
的核心作用是 提供一个状态位标识订阅是否已经取消。
关于由第三方类来管理订阅,而不是让订阅者自己管理的原因,我猜测有两个:一是出于职责单一的原则;二是为了支持函数式编程,抽取一个第三方类型作为返回值,从而在链式调用时保持类型一致。
Disposable
协议要求所有的 Disposable
类型都实现 dispose
方法,表示取消订阅。
protocol Disposable {
// 取消订阅
func dispose()
}
这里我们定义两个遵循 Dispoable
协议的类型:AnonymousDisposable
和 CompositeDisposable
。
AnonymousDisposable
作为一个匿名 Disposable
,在本例中作为最底层的 Disposable
并没有什么作用,只是为了实现模式统一而已。
class AnonymousDisposable: Disposable {
// AnonymousDisposable 封装了 取消订阅 的闭包
private let _disposeHandler: () -> Void
init(_ disposeClosure: @escaping () -> Void) {
_disposeHandler = disposeClosure
}
func dispose() {
_disposeHandler()
}
}
CompositeDisposable
作为一个可管理多个 Disposable
的容器,它内部维持一个标志位表示订阅是否被取消。CompositeDisposable
所实现的 dispose
方法真正改变了标志位,并对其所维护的所有 Disposable
执行各自的 dispose
方法,从而完成它们定义的在取消订阅时需要执行的附带操作。
class CompositeDisposable: Disposable {
// 可用于管理一组 Disposable 的 CompositeDisposable
// 判断是否已销毁(取消订阅)的标志位
private(set) var isDisposed: Bool = false
// 管理一组 Disposable
private var disposables: [Disposable] = []
init() {}
func add(disposable: Disposable) {
if isDisposed {
disposable.dispose()
return
}
disposables.append(disposable)
}
func dispose() {
guard !isDisposed else { return }
// 销毁所有 disposable,并设置标志位
disposables.forEach {
$0.dispose()
}
isDisposed = true
}
}
很显然,这里执行 dispose
操作只是修改了状态,并没有释放订阅资源。只有当 CompositeDisposable
对象被释放后才算真正释放资源。在原版 RxSwift 中,DisposeBag
差不多就是 CompositeDisposable
。这样也是为什么我们要把订阅交给 DisposeBag
来进行管理,DisposeBag
作为某个对象的属性,会随着对象的释放,从而自动释放真正的订阅资源。
为了支持 Disposable
,我们需要在 RxDemo-01 的基础上稍作修改即可,主要是修改发布者 Observable
。
// MARK: - Observable
protocol ObservableType {
associatedtype Element
// 订阅操作
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element
}
class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Disposable
init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
_eventGenerator = eventGenerator
}
// 实现 订阅操作 的协议,内部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let composite = CompositeDisposable()
// 通过一个中间 Observer 对原始 Observer 进行封装,用于过滤事件的传递。
let disposable = _eventGenerator(Observer { (event) in
guard !composite.isDisposed else { return }
// 事件传递给原始 observer
observer.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .error(_), .completed:
composite.dispose()
default:
break
}
})
// 将 _eventGenerator 返回的 AnonymousDisposable 加入至 CompositeDisposable 中进行管理
composite.add(disposable: disposable)
return composite
}
}
接下来,我们来试用一下 RxDemo-02 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式进一步吻合。这里,我们实现了取消订阅的功能。
// MARK: - Test
let observable = Observable<Int> { (observer) -> Disposable in
print("send 0")
observer.on(event: .next(0)) // observer.on(event: .next(0).map({ $0 * 2 }))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// dispose
// send completed
RxDemo-02 的核心思想是在 RxDemo-01 的基础上对事件进行拦截和过滤,如下图所示。
具体的实现方式如下所示,通过 CompositeDisposable
管理 AnonymousDisposable
(原始 subscribe
中的闭包执行后的返回类型)。同时,在执行发布事件时,使用一个中间 Observer 接收原始事件,中间 Observer 引用外部 CompositeDisposable
的状态决定是否将事件发送给原始 Observer。
使用 CompositeDisposable
的本质就是添加了一个中间层来解决管理订阅的问题。
结构优化(RxDemo-03)
在 RxDemo-02 中,AnonymousObserver
引用了外部的 CompositeDisposable
中的订阅状态,从而决定事件的传递方向。这种代码逻辑由内而外实现,并不是很直观。
为了更加清晰地描述这个事件流动方向,RxDemo-03 通过增加一个中间层,将原始 observer、中间 observer、事件转发逻辑聚合在同一层级下,让代码具有更好的可读性。这里我们实现一个遵循 Disposable
协议的 Sink
类型。Sink
是 水槽 的意思,象征着这里我们通过它来控制事件的流动方向,暗示了这个类的作用。
class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _eventGenerator: (Observer<O.Element>) -> Disposable
private let _composite = CompositeDisposable()
init(forward: O, eventGenerator: @escaping (Observer<O.Element>) -> Disposable) {
_forward = forward
_eventGenerator = eventGenerator
}
func run() {
// 通过一个中间 Observer 接收原始事件
// 根据 CompositionDisposable 的状态决定是否传递给原始 Observer
let observer = Observer<O.Element>(forward)
// 执行事件生成器
// 将返回值 Disposable 加入到 CompositeDisposable 中进行管理
_composite.add(disposable: _eventGenerator(observer))
}
private func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件传递给原始 observer
_forward.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}
func dispose() {
_disposed = true
_composite.dispose()
}
}
在定义了 Sink
之后,我们就可以简化 Observable
中 subscribe
方法的具体实现。
class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Disposable
init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
_eventGenerator = eventGenerator
}
// 实现 订阅操作 的协议,内部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let sink = Sink(forward: observer, eventGenerator: _eventGenerator)
sink.run()
return sink
}
}
SinkDisposable
引用了原始的事件发生器,并定义一个中间 Observer 转入至原始事件发生器,从而让中间 observer 接收原始事件。除此之外,SinkDisposable
还引用了原始 observer,当中间 observer 处理原始事件时,会判断订阅是否已经取消,从而决定是否将原始事件转发给原始 observer。此时,RxDemo-03 实现的 RxSwift 的内部调用关系如下所示。
操作符(RxDemo-04)
下面,我们来实现操作符,RxSwift 中包含了大量的操作符,它们基本上都是对函数式编程中 函子、单子 等进行了应用。我们以 map
为例进行介绍。
如下所示,map
方法能够将一个普通函数应用到包装类型 ObservableType
上。map
方法最终返回一个 Observable<Result>
类型(同样遵循 ObservableType
协议),因此能够完美支持链式操作。
extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return Observable<Result> { (observer) in // observer 为原始 observer
// 此闭包可看成是一个 eventGenerator
// 向原始 observable 中传入一个中间 map observer,即由中间 map observer 替换原始 observer 监听原始事件
// 中间 map observer 对原始事件进行转换后,转发给原始 observer
return self.subscribe(observer: Observer { (event) in
switch event {
case .next(let element):
do {
try observer.on(event: .next(transform(element)))
} catch {
observer.on(event: .error(error))
}
case .error(let error):
observer.on(event: .error(error))
case .completed:
observer.on(event: .completed)
}
})
}
}
}
接下来,我们来试用一下 RxDemo-04 所实现的 RxSwift。这里,我们实现了 map
操作符的功能。
// MARK: - Test
let observable = Observable<Int> { (observer) -> Disposable in
print("send 0")
observer.on(event: .next(0)) // observer.on(event: .next(0).map({ $0 * 2 }))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.map { $0 * 2 }.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 0
// send 1
// recive 2
// send 2
// recive 4
// send 3
// recive 6
// dispose
// send completed
RxDemo-04 中由于增加了操作符功能,其内部的调用关系也发生了变化。特别是,发布事件和过滤事件的逻辑,很显然,每增加一个操作符,就会增加一个 SinkDisposable
中间层,调用栈也会更深。
如果我们仔细分析 RxDemo-04,其实我们可以发现内部隐藏了如下所示的调用关系链:observable -> MapObserver -> MapObservable -> observer。实际执行时,调用关系如下所示:
- observable 调用 MapObserver.on 方法,将原始事件传递给 MapObserver;
- MapObserver 使用 map 方法将原始事件转换成 map 事件(即 map 后的数据),作为 MapObservable 发出的事件;
- MapObservable 调用 observer.on 方法,将 map 事件传给 observer。
分类细化(RxDemo-05)
在 RxDemo-04 中,为了增加 map
操作符,对 ObservableType
进行了扩展,其本质就是在原始的 Observable 和 Observer 之间插入了一个 MapObserver 和一个 MapObservable。本节,我们继续进行优化,对中间类也进行细分和定义。
在 RxDemo-04 中,Sink
的主要作用是 根据订阅是否取消决定是否拦截事件的传递。这里我们可能会想到:中间订阅者(如:MapObserver)本身是不是就应该具备 Sink
的这种功能呢?事实上,RxSwift 就是让 Sink
作为所有 Observer 的基类。
对于操作符,我们可以实现上述的模式;但是,对于不带操作符的情况,该如何处理呢?为了实现模式的统一,我们可以认为不带操作符的情况等同于带了 只返回原值的匿名操作符,即等同于 map { $0 }
。针对此情况,我们也需要定义两个类 AnonymousObserver
和 AnonymousObservable
。
下面,我们来进行优化改进。
首先,我们将原来的 Observable<Element>
改成抽象基类。
// 抽象类
class Observable<Element>: ObservableType {
// // 定义 发布事件 的闭包,有子类来定义
// private let _eventGenerator: (Observer<Element>) -> Disposable
//
// init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
// _eventGenerator = eventGenerator
// }
// 实现 订阅操作 的协议,内部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
fatalError("Abstract Method", file: file, line: line)
}
其次,我们再一定一个新的类 Producer<Element>
代替 Observable<Element>
。Producer<Element>
继承自 Observable<Element>
,作为发布者的基类,该类内部没有时间生成器 eventGenerator
的闭包,由子类选择性进行定义。
class Producer<Element>: Observable<Element> {
// 实现 订阅操作 的协议,内部生成事件
override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
return run(observer: observer)
}
func run<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}
然后,我们来对 Sink
进行修改。在 RxDemo-04 中,Sink
即提供了事件生成的功能和事件转发的功能。这里,我们让 Sink
的职责更加单一,仅仅是提供事件转发的功能。修改结果如下:
class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _composite = CompositeDisposable()
init(forward: O) {
_forward = forward
}
func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件传递给原始 observer
_forward.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}
func dispose() {
_disposed = true
print("dispose execute")
_composite.dispose()
}
}
为了便于泛型类型的转换,我们给 ObservableType
协议增加一个方法,并由 Observable<Element>
予以实现。
protocol ObservableType {
// ...
func asObservable() -> Observable<Element>
}
class Observable<Element>: ObservableType {
// ...
func asObservable() -> Observable<Element> {
return self
}
}
接下来,我们来分别实现 Sink
的子类 AnonymousObserver
和 MapObserver
以及 Producer
的子类 AnonymousObservable
和 MapObservable
。
class AnonymousObserver<O: ObserverType>: Sink<O>, ObserverType {
typealias Element = O.Element
override init(forward: O) {
super.init(forward: forward) // forward 为原始订阅者
}
func on(event: Event<Element>) {
// 对原始事件进行转发
switch event {
case .next(let element):
self.forward(event: .next(element))
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}
func run(parent: AnonymousObservable<Element>) -> Disposable {
// 执行事件生成器
parent._eventGenerator(Observer(self))
}
}
class AnonymousObservable<Element>: Producer<Element> {
// 持有事件生成器闭包
let _eventGenerator: (Observer<Element>) -> Disposable
init(eventGenerator: @escaping (Observer<Element>) -> Disposable) {
self._eventGenerator = eventGenerator
}
override func run<O>(observer: O) -> Disposable where Element == O.Element, O : ObserverType {
// 订阅发生时,生成一个中间订阅者 AnonymousObserver 来订阅原始事件,并将事件转发给原始订阅者
let sink = AnonymousObserver(forward: observer)
sink.run(parent: self)
return sink
}
}
class MapObserver<Source, Result, O: ObserverType>: Sink<O>, ObserverType {
typealias Element = Source
typealias Result = O.Element
typealias Transform = (Source) throws -> Result
private let _transform: Transform
init(forward: O, transform: @escaping Transform) { // forward 为原始订阅者
self._transform = transform
super.init(forward: forward)
}
func on(event: Event<Element>) {
// 对原始事件进行 map 转换,对结果进行转发
switch event {
case .next(let element):
do {
let mappedElement = try _transform(element)
self.forward(event: .next(mappedElement as! O.Element))
} catch {
self.forward(event: .error(error))
}
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}
}
class MapObservable<Source, Result>: Producer<Result> {
typealias Transform = (Source) throws -> Result
private let _transform: Transform
private let _source: Observable<Source>
init(source: Observable<Source>, transform: @escaping Transform) {
self._source = source
self._transform = transform
}
override func run<O: ObserverType>(observer: O) -> Disposable where Result == O.Element {
// 订阅发生时,生成一个中间订阅者 MapObserver 来订阅上游事件
let sink = MapObserver(forward: observer, transform: self._transform)
self._source.subscribe(observer: sink)
return sink
}
}
现在我们再来看发布者、操作符,其实两者的本质都是一样,都是创建了一个发布者。对此,我们采用扩展的方式来提供相应的方法。如下所示:
extension ObservableType {
static func create(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(eventGenerator: eventGenerator)
}
}
extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return MapObservable(source: self.asObservable(), transform: transform)
}
}
最后,我们再来验证一下实现结果,如下所示。
// MARK: - Test
let observable = Observable<Int>.create { (observer) -> Disposable in // observer 为 MapObserver
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.map { $0 * 2 }.map { $0 + 1 }.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 1
// send 1
// recive 3
// send 2
// recive 5
// send 3
// recive 7
// dispose execute
// send completed
从运行结果而言,RxDemo-05 基本实现了分类细化,并且达到了取消订阅的功能。此时,上述例子中实际的订阅关系如下所示。相比 RxDemo-04,多了 AnonymousObserver
和 AnonymousObservable
,但是整体的内部订阅关系链更加清晰了。
此时,我们再来对照一下 RxSwift 和 RxDemo-05 中类的定义,如下表所示。各个类的功能基本相同,整体结构也是大同小异,只是在类名上略有差异。
RxDemo-05 | Producer | Sink | AnonymousObserver | AnonymousObservable | MapObserver | MapObservable |
---|---|---|---|---|---|---|
RxSwift | Producer | Sink | AnonymousObservableSink | AnonymousObservable | Map | MapSink |
订阅管理(RxDemo-06)
如果,我们仔细对比,可以发现 RxSwift 中 Producer
的 subscribe
方法内部与 RxDemo-05 还是不太一样,前者内部还引用了一个 SinkDisposer
类。其作用是什么呢?事实上,其主要作用是管理 disposeHandler
。细心的同学可能会发现 RxDemo-05 中有个 BUG:取消订阅时没有执行 print("dispose")
闭包。
对此,我们也可以用类似的方式来解决,通过增加一个 Diposer
类来进行管理。具体代码见:RxDemo-06。
当订阅发生时(即执行 subscribe
方法时),内部会产生一个递归的控制流,如下图所示。
通过递归返回的方式构建整个订阅管理关系链,如下图所示。diposer0
是 subscribe
方法最终返回的 Disposable
对象。当我们对 disposer0
执行 dispose
方法时,内部会递归地执行 dispose
方法,最终取消订阅链中所有的订阅。
注意:这里面会有循环引用,如 MapObserver
内部又引用了 Disposer0
,RxDemo-06 以及 RxSwift 中的处理是给 Disposer
类内部添加一个状态,表示是否已经取消了订阅,从而避免循环引用导致的循环调用。这里的循环引用并不一定是坏事,它在下述的场景下起到了非常关键的作用。
当事件中出现一个 complete
或 error
事件时,由于事件会依次传递至 Observer
,最后一次传递时,即 MapObserver
进行传递时,会判断是否是 complete
或 error
事件,从而决定是否执行 dispose
方法。当 MapObserver
执行 dipose
方法时,会通过上述的循环引用,调用 Disposer0
执行 dispose
方法,从而实现整体取消订阅。
总结
本文通过逐步实现 RxSwift 核心部分中的功能,一窥其背后的设计思路。从中我们也看到了其对函数式编程的应用,以及其所呈现出来的流编程模式的底层实现原理。
后续,我们将进一步探索原版 RxSwift 中其他的一些内容。