在RxSwfit也有这样一台机器😄
let disposeBag = DisposeBag()
Observable.of("waterA", "waterB")
.subscribe(onNext: { print("Name Of Water : " + $0) })
.addDisposableTo(disposeBag)
这里只是打印了水的名字。现在就来找找它们的对应关系。研究的水
对应的是"waterA", "waterB"
,指定的容器
是ObservableSequence
,处理芯片
是onNext: { print("Name Of Water : " + $0) }
,协作机器
在哪呢?ObservableSequenceSink
(先在这里指出后面进行分析)。调度程序
是RecursiveImmediateScheduler
,此例中使用的模式
是递归立即执行模式RecursiveImmediateScheduler
。
现在我们来看看他们是怎样一起工作的。
根据上一篇 RxSwift 粗略源码分析 先找出Observable,源码如下:(这个就是把水放入指定的容器中,注意在这个步骤中ObservableSequence
在创建时,还指定了一个调度程序CurrentThreadScheduler.instance
)
//文件名:Observable+Creation.swift
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
当subscribe
时,把装水的容器,调度程序,和处理芯片插入到机器中,(装水的容器,调度程序在这里绑在了一块),然后启动机器进行处理。
//文件名: Sequence.swift
class ObservableSequence<S: Sequence> : Producer<S.Iterator.Element> {
fileprivate let _elements: S
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: S, scheduler: ImmediateSchedulerType) {
_elements = elements
_scheduler = scheduler
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
let sink = ObservableSequenceSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
在这台机器运行时,直接选择了递归模式
进行运行(_parent._scheduler.scheduleRecursive
),注意这里的尾闭包处理,在调度程序后面的处理中才会执行到这里的action
。
//文件名: Sequence.swift
class ObservableSequenceSink<S: Sequence, O: ObserverType> : Sink<O> where S.Iterator.Element == O.E {
typealias Parent = ObservableSequence<S>
private let _parent: Parent
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer)
}
func run() -> Disposable {
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements))
{ (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
}
}
}
下面是直接递归模式
启用的地方,通过的是ImmediateSchedulerType
扩展。由于是直接的
,所以直接进行了此模式的执行recursiveScheduler.schedule(state)
。注意这里把调度程序作为参数传递给了执行模式scheduler: self
。
//文件名: ImmediateSchedulerType.swift
extension ImmediateSchedulerType {
/**
Schedules an action to be executed recursively.
- parameter state: State passed to the action to be executed.
- parameter action: Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
}
在看执行模式前,需要先了解调度程序
,离开调度程序谈执行模式,就如毛之无皮。
下面是调度程序,在schedule
方法中(此方法为申请任务的执行),如果当前调度程序没有其它操作时,就直接执行action
;如果有其它操作,则把操作加入到队列CurrentThreadScheduler.queue
中,返回清理处理块Disposables.create(with: scheduledItem.dispose)
,当执行完前一个操作后,调度程序会循环队列,执行队列中的下一个任务while let latest = queue.value.dequeue() ...
。
//文件名:CurrentThreadScheduler.swift
/**
Represents an object that schedules units of work on the current thread.
This is the default scheduler for operators that generate elements.
This scheduler is also sometimes called `trampoline scheduler`.
*/
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/**
The singleton instance of the current thread scheduler.
*/
public static let instance = CurrentThreadScheduler()
static var queue : ScheduleQueue?
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
// In Xcode 7.3, `return scheduledItem` causes segmentation fault 11 on release build.
// To workaround this compiler issue, returns AnonymousDisposable that disposes scheduledItem.
return Disposables.create(with: scheduledItem.dispose)
}
}
现在来看看模式的执行,模式的schedule
方法中(与调度程序方法名同名),向调度程序申请运行操作_scheduler.schedule ...
。在运行操作中,最终执行协作机器run
后面的操作self._action
,传入的参数为state
(整个流程保持不变),和递归执行的方法self.schedule
。注意,调度程序当前有操作执行时,返回清理处理块,执行模式把它加入到了当前对象的清理模块组中,并生产了removeKey
,当此操作执行时,移除清理处理模块self._group.remove(for: removeKey!)
。调度程序当前无操作执行时不会加入。
//文件名:RecursiveScheduler.swift
/**
Type erased recursive scheduler.
*/
class RecursiveImmediateScheduler<State> {
typealias Action = (_ state: State, _ recurse: (State) -> Void) -> Void
private var _lock = SpinLock()
private let _group = CompositeDisposable()
private var _action: Action?
private let _scheduler: ImmediateSchedulerType
init(action: @escaping Action, scheduler: ImmediateSchedulerType) {
_action = action
_scheduler = scheduler
}
// immediate scheduling
/**
Schedules an action to be executed recursively.
- parameter state: State passed to the action to be executed.
*/
func schedule(_ state: State) {
var isAdded = false
var isDone = false
var removeKey: CompositeDisposable.DisposeKey? = nil
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
if isAdded {
self._group.remove(for: removeKey!)
}
else {
isDone = true
}
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
_lock.performLocked {
if !isDone {
removeKey = _group.insert(d)
isAdded = true
}
}
}
func dispose() {
_lock.performLocked {
_action = nil
}
_group.dispose()
}
}
现在我们再来回看协作机器
中的action
,怎样进行的递归操作(代码如下:)。state
为一个元组 (_parent._elements.makeIterator(), _parent._elements)
,在整个过程中是没有变化的,然后是操作action
,在操作中,首先获取元组第一元素当前的值,并自动移动了下标(错误的表达方式),如果有值,就将值传递给处理芯片
进行处理(本例中只是打印),然后调用递归处理方法进行下一步处理,参数还是为当前元组recurse(mutableIterator)
;如果没有值,则告诉处理芯片
已完成数据处理,递归调用结束。
func run() -> Disposable {
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements))
{ (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
}
}
文字看起来很空洞,下面通过例子查看从任务开始执行后的每一步的状态和动作。
例子:
let disposeBag = DisposeBag()
Observable.of("waterA", "waterB")
.subscribe(onNext: { print("Name Of Water : " + $0) })
.addDisposableTo(disposeBag)
A. 在协作机器启动时,
recursiveScheduler.schedule(state) ...
执行模式直接被调用 ,进入执行模式块后,执行模式块到调度程序申请操作执行_scheduler.schedule(state) ...
。B. 进入调度程序中,此时还没有其它操作,直接执行此操作
let disposable = action(state)
。C. 回到执行模式块,操作最终调用协作机器中的
action
。D. 进入协作机器中,
action
中的mutableIterator.0.next()
返回的是waterA
,协作机器把它传给了处理芯片,打印了Name Of Water : waterA
,然后调用了递归处理方法,并传入的当前元组。E. 再次进入执行模式块,在执行模式的
schedule
方法中,执行模式再到调度程序申请处理操作。F. 进入调度程序,但此时还在执行当前操作,所以被加入到了调度程序的队列中,到此第一次处理结束,
let disposable = action(state)
执行完毕。开始处理队列中的剩余操作while let latest = queue.value.dequeue() ...
,通过latest.invoke()
方法,处理执行模式前一次申请的操作。G. 进入到执行模式块中, 这里先从清理组中移除此操作的清理处理块
self._group.remove(for: removeKey!)
,然后调用协作机器中的action
。H. 回到协作机器的action中, 注意这里的元组是没有变的,但是元组的下标在上一次处理中变了,所有通过
let next = mutableIterator.0.next()
获取到的是waterB
,将它传给处理芯片处理,再一次执行recurse
进行执行操作。再次跳到 E 开始处理。当let next = mutableIterator.0.next()
获取为空时,跳到 I 进行结束处理。I. 协作机器告诉处理芯片处理结束,
recurse
不会再次执行,调度程序通过while let latest = queue.value.dequeue()
获取操作为空,也终于结束了它的schedule
。到此整个处理结束。