透视RxSwift核心逻辑
篇幅稍微有点长,了解程度不同,可以跳过某些部分。
- 如果对源码比较熟悉的,建议直接看图就行了,时序图更加清晰。第一次摸索有必要阅读文字内容。
- 贴出来的代码省略了不必要的部分,用省略号代替。
示例
RxSwift的基础用法就是很简单的几步:
- 创建可观察序列
- 监听序列(订阅信号)
- 销毁序列
//创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//发送信号
observer.onNext("今日份麻酱凉皮")
observer.onCompleted()
return Disposables.create()
}
//订阅信号
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
控制台输出:
订阅到:今日份麻酱凉皮
完成
销毁
探究
在看源码之前,应该对接触到的类和协议有些认识,方便之后的理解。下面的关系图在需要的时候回头熟悉一下就行:

到底是什么在支撑如此便捷的调用?
第一句Observable<Any>.create创建了一个可观察序列Observable对象,第二句就是这个Observable序列对象订阅了消息。
从输出可以看出,都是订阅到的消息。那么订阅时传入subscribe的闭包是什么时候调用的呢?
单从现在的几句代码,也能猜出是第一句代码的闭包中的observer.onNext的调用引起的。但是,我们也没有看到这个create函数中的闭包是在哪里执行的?
为了能够清晰的描述,暂且称第一句
create中的闭包为create闭包,第二句subscribe中的几个闭包为subscribe闭包。
外面看不出来,那我们只能进去RxSwift里面探索下create和subscribe到底做了什么?
create函数的实现:
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
原来这函数内部实际上是创建了一个AnonymousObservable匿名可观察序列对象。而之前的闭包也是给AnonymousObservable对象初始化用了。
AnonymousObservable类:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
这里在初始化的时候把create闭包赋值给了_subscribeHandler属性。
到此为止,
Observable<Any>.create函数实际上创建了一个AnonymousObservable匿名可观察序列对象,并保存了create闭包。

。。。貌似这条不是主线啊!没有找到任何一个问题的答案。
再来翻翻subscribe函数:
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
......
let observer = AnonymousObserver<E> { event in
......
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
这也是对定义在ObservableType协议中的函数的实现,返回了一个Disposable。这个Disposable就是用来管理订阅的生命周期的,示例代码中并没有体现出来,实际是在订阅信号的内部处理的。前面都没什么,后面创建了AnonymousObserver,并且在AnonymousObserver初始化时传入了闭包,并赋值给_eventHandler属性。
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
之前,
AnonymousObservable匿名序列对象,保存了create闭包。
此时,创建了AnonymousObserver匿名观察者对象,保存了对subscribe闭包的回调执行的EventHandler闭包。
又一条支线。。。一路走来,都是在创建对象,保存闭包。两个主线疑问还是无迹可寻。难道一开始就走上了歧路?非也!继续看下去就明白了什么叫「柳暗花明又一村」。
AnonymousObservable的subscribe函数中,在创建了AnonymousObserver对象后,还返回了一个新建的Disposable对象。重点就在这里的第一个参数:self.asObservable().subscribe(observer)中。asObservable还是返回了self,后面贴上的ObserverType中可以看到。剩下的就是AnonymousObservable的父类Producer中的subscribe了:
class Producer<Element> : Observable<Element> {
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
......
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
由于是在当前线程中执行的,只看else那部分。disposer相关的不用关心。关键是observer参数,这个参数中有对subscribe闭包的处理的EventHandler闭包。observer传入了self.run(observer, cancel)。所以,还要回头再看看AnonymousObservable类中的run函数:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
这里又创建了一个AnonymousObservableSink对象,observer和cancel继续往初始化函数中丢:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
......
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
前面sink.run(self)把self传了进来,又对AnonymousObservable起了别名Parent。parent._subscribeHandler不就是AnonymousObservable在调用它最开始保存的那个create闭包么?AnyObserver(self)则把AnonymousObservableSink作为AnyObserver初始化的参数。
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
其中还把AnonymousObservableSink的on函数赋值给了AnyObserver的属性observer,observer就是EventHandler。这个EventHandler在create闭包中会用到。
这不就是第二个主线疑问(create函数中的闭包何时调用)的答案么!
整理一下:
- 从
subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable函数中创建Disposable开始 -
AnonymousObservable调用subscribe(observer) -
AnonymousObservable调用run(observer, cancel) - 创建
AnonymousObservableSink(observer: observer, cancel: cancel),并且sink.run(self) parent._subscribeHandler(AnyObserver(self))
这是一条从
subscribe闭包-->create闭包的线。

还没完,还有个create闭包中怎么触发subscribe闭包的?
又臭又长的写了这么多,这里就只看observer.onNext("今日份麻酱凉皮")吧。点进去看:
observer.onNext("今日份麻酱凉皮"):
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
onNext中调用了on,在前面AnyObserver结构体定义中可以看出,on函数中返回了self.observer(event),之前是把AnonymousObservableSink的on赋值给了这个self.observer,所以,此时会走到AnonymousObservableSink的on函数中。这里面又调用了self.forwardOn(event),看下AnonymousObservableSink的父类Sink中定义的forwardOn:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
......
init(observer: O, cancel: Cancelable) {
......
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
......
self._observer.on(event)
}
}
forwardOn中走了一句self._observer.on(event)。这里的_observer属性不就是AnonymousObservableSink初始化时传入的AnonymousObserver对象么!
继续跟AnonymousObserver的on:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
......
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
这里没有on,看父类ObserverBase:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
这里的on函数中,有个.next分支,调用了self.onCore(event)。子类AnonymousObserver实现的onCore中又调用了self._eventHandler(event)。
这个_eventHandler是什么?不就是AnonymousObserver初始化时保存的对subscribe闭包处理的闭包么!所以create闭包中的observer.onNext("今日份麻酱凉皮")就能触发subscribe闭包了。
这是一条从
AnonymousObservable调用_subscribeHandler(也就是create闭包)时的参数AnyObserver-->subscribe闭包的线。

现在我们看清楚了响应式的数据流:
- 在订阅信号时创建了observer并执行创建序列时的闭包
- 在创建序列的闭包中有回调observer,监听序列的变动而触发订阅信号的闭包
图解
看清楚了么?好像清楚的不太明显。毕竟好几个类、协议,又那么多函数调来调去的。加把劲再撸一撸。既然这么五花八门的调用流程搞清楚了,那就来弄清楚它主要都做了什么?
一图概千言,既然画了图,就少敲点键盘吧!

仔细看了流程图就会发现,出现的几个类中,干活的主要是Anonymous开头的那三个类。我们在外面调用的操作,其实是在使用RxSwift内部封装的一些类。
- 创建可观察序列
AnonymousObservable,并保存create闭包 - 订阅信号
- 首先创建了一个
AnonymousObserver,并且把对subscribe闭包的操作封装成了EventHandler - 苦力活还是
AnonymousObservable来干。- 在创建返回值
Disposable中,由subscribe(observer),把AnonymousObserver传给了AnonymousObservableSink-
AnonymousObservableSink才是信息处理的核心,因为他知道的太多了 -
AnonymousObservableSink有AnonymousObserver观察者,AnonymousObserver持有着EventHandler。 -
AnonymousObservableSink在调用run函数时也传入了AnonymousObservable序列,AnonymousObservable就是create闭包的持有者。 -
AnonymousObservableSink初始化的时候,除了观察者外,还有个管理序列生命周期的Disposable。
-
-
AnonymousObservableSink作为一个内部类,在被create闭包当做参数回调给外界时需要转换为AnyObserver,在这里AnyObserver则是以闭包属性的形式保留了AnonymousObservableSink的on函数 - 后面在信号发生改变时就可以让
AnyObserver通过这个属性值联系到AnonymousObservableSink
- 在创建返回值
- 首先创建了一个
订阅信号:
Observable-->AnonymousObservable-->AnonymousObserver-->AnonymousObservableSink-->AnyObserver-->create闭包
- 发出信号
- 这个过程基本就是和订阅信号时相反的
- 从
create闭包中调用AnyObserver的onNext开始 - 通过
AnyObserver.observer访问闭包中的AnonymousObservableSink -
AnonymousObservableSink拥有AnonymousObserver -
AnonymousObserver掌控EventHandler - 句号
发出信号:
create闭包-->AnyObserver-->AnonymousObservableSink-->AnonymousObserver-->subscribe闭包
