RxSwift 粗略源码分析之of

开始之前,先来讲一件一群科学家的故事,最近一些科学家需要研究不同地方水的特质,由于样本太多他们创建了这样一台机器,把水装入指定的容器,放入一台机器里,插入编写好的分析芯片,就能自动分析得出水的特质,还为这台机器提供了一个调度程序,能根据不同模式进行不同的处理工作。下面这个图就是大致的情况:
快照.png

在RxSwfit也有这样一台机器😄

    let disposeBag = DisposeBag()
    
    Observable.of("waterA", "waterB")
      .subscribe(onNext: { print("Name Of Water : " + $0) })
      .addDisposableTo(disposeBag)

这里只是打印了水的名字。现在就来找找它们的对应关系。研究的水对应的是"waterA", "waterB"指定的容器ObservableSequence处理芯片onNext: { print("Name Of Water : " + $0) }协作机器在哪呢?ObservableSequenceSink (先在这里指出后面进行分析)。调度程序RecursiveImmediateScheduler,此例中使用的模式是递归立即执行模式RecursiveImmediateScheduler

现在我们来看看他们是怎样一起工作的。
根据上一篇 RxSwift 粗略源码分析 先找出Observable,源码如下:(这个就是把水放入指定的容器中,注意在这个步骤中ObservableSequence在创建时,还指定了一个调度程序CurrentThreadScheduler.instance

//文件名:Observable+Creation.swift
  public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: elements, scheduler: scheduler)
    }

subscribe时,把装水的容器,调度程序,和处理芯片插入到机器中,(装水的容器,调度程序在这里绑在了一块),然后启动机器进行处理。

//文件名: Sequence.swift
class ObservableSequence<S: Sequence> : Producer<S.Iterator.Element> {
    fileprivate let _elements: S
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: S, scheduler: ImmediateSchedulerType) {
        _elements = elements
        _scheduler = scheduler
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
        let sink = ObservableSequenceSink(parent: self, observer: observer)
        sink.disposable = sink.run()
        return sink
    }
}

在这台机器运行时,直接选择了递归模式进行运行(_parent._scheduler.scheduleRecursive),注意这里的尾闭包处理,在调度程序后面的处理中才会执行到这里的action

//文件名: Sequence.swift
class ObservableSequenceSink<S: Sequence, O: ObserverType> : Sink<O> where S.Iterator.Element == O.E {
    typealias Parent = ObservableSequence<S>

    private let _parent: Parent

    init(parent: Parent, observer: O) {
        _parent = parent
        super.init(observer: observer)
    }

    func run() -> Disposable {
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) 
    { (iterator, recurse) in
            var mutableIterator = iterator
            if let next = mutableIterator.0.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
            }
        }
}

下面是直接递归模式启用的地方,通过的是ImmediateSchedulerType扩展。由于是直接的,所以直接进行了此模式的执行recursiveScheduler.schedule(state)。注意这里把调度程序作为参数传递给了执行模式scheduler: self

//文件名:  ImmediateSchedulerType.swift
extension ImmediateSchedulerType {
    /**
    Schedules an action to be executed recursively.
    
    - parameter state: State passed to the action to be executed.
    - parameter action: Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state.
    - returns: The disposable object used to cancel the scheduled action (best effort).
    */
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }
}

在看执行模式前,需要先了解调度程序,离开调度程序谈执行模式,就如毛之无皮。

下面是调度程序,在schedule方法中(此方法为申请任务的执行),如果当前调度程序没有其它操作时,就直接执行action;如果有其它操作,则把操作加入到队列CurrentThreadScheduler.queue中,返回清理处理块Disposables.create(with: scheduledItem.dispose),当执行完前一个操作后,调度程序会循环队列,执行队列中的下一个任务while let latest = queue.value.dequeue() ...

//文件名:CurrentThreadScheduler.swift
/**
Represents an object that schedules units of work on the current thread.

This is the default scheduler for operators that generate elements.

This scheduler is also sometimes called `trampoline scheduler`.
*/
public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /**
    The singleton instance of the current thread scheduler.
    */
    public static let instance = CurrentThreadScheduler()

    static var queue : ScheduleQueue? 

    /**
    Schedules an action to be executed as soon as possible on current thread.

    If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
    automatically installed and uninstalled after all work is performed.

    - parameter state: State passed to the action to be executed.
    - parameter action: Action to be executed.
    - returns: The disposable object used to cancel the scheduled action (best effort).
    */
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false

            let disposable = action(state)

            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)
        
        // In Xcode 7.3, `return scheduledItem` causes segmentation fault 11 on release build.
        // To workaround this compiler issue, returns AnonymousDisposable that disposes scheduledItem.
        return Disposables.create(with: scheduledItem.dispose)
    }
}

现在来看看模式的执行,模式的schedule方法中(与调度程序方法名同名),向调度程序申请运行操作_scheduler.schedule ...。在运行操作中,最终执行协作机器run后面的操作self._action,传入的参数为state(整个流程保持不变),和递归执行的方法self.schedule。注意,调度程序当前有操作执行时,返回清理处理块,执行模式把它加入到了当前对象的清理模块组中,并生产了removeKey,当此操作执行时,移除清理处理模块self._group.remove(for: removeKey!)。调度程序当前无操作执行时不会加入。

//文件名:RecursiveScheduler.swift
/**
Type erased recursive scheduler.
*/
class RecursiveImmediateScheduler<State> {
    typealias Action =  (_ state: State, _ recurse: (State) -> Void) -> Void
    
    private var _lock = SpinLock()
    private let _group = CompositeDisposable()
    
    private var _action: Action?
    private let _scheduler: ImmediateSchedulerType
    
    init(action: @escaping Action, scheduler: ImmediateSchedulerType) {
        _action = action
        _scheduler = scheduler
    }
    
    // immediate scheduling
    
    /**
    Schedules an action to be executed recursively.
    
    - parameter state: State passed to the action to be executed.
    */
    func schedule(_ state: State) {
        
        var isAdded = false
        var isDone = false
        
        var removeKey: CompositeDisposable.DisposeKey? = nil
        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                if isAdded {
                    self._group.remove(for: removeKey!)
                }
                else {
                    isDone = true
                }
                
                return self._action
            }
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
        
        _lock.performLocked {
            if !isDone {
                removeKey = _group.insert(d)
                isAdded = true
            }
        }
    }
    
    func dispose() {
        _lock.performLocked {
            _action = nil
        }
        _group.dispose()
    }
}

现在我们再来回看协作机器中的action,怎样进行的递归操作(代码如下:)。state为一个元组 (_parent._elements.makeIterator(), _parent._elements),在整个过程中是没有变化的,然后是操作action,在操作中,首先获取元组第一元素当前的值,并自动移动了下标(错误的表达方式),如果有值,就将值传递给处理芯片进行处理(本例中只是打印),然后调用递归处理方法进行下一步处理,参数还是为当前元组recurse(mutableIterator);如果没有值,则告诉处理芯片已完成数据处理,递归调用结束。

  func run() -> Disposable {
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) 
    { (iterator, recurse) in
            var mutableIterator = iterator
            if let next = mutableIterator.0.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
            }
        }

文字看起来很空洞,下面通过例子查看从任务开始执行后的每一步的状态和动作。
例子:

  let disposeBag = DisposeBag()
    
    Observable.of("waterA", "waterB")
      .subscribe(onNext: { print("Name Of Water : " + $0) })
      .addDisposableTo(disposeBag)
  • A. 在协作机器启动时,recursiveScheduler.schedule(state) ...执行模式直接被调用 ,进入执行模式块后,执行模式块到调度程序申请操作执行_scheduler.schedule(state) ...

  • B. 进入调度程序中,此时还没有其它操作,直接执行此操作let disposable = action(state)

  • C. 回到执行模式块,操作最终调用协作机器中的action

  • D. 进入协作机器中, action中的mutableIterator.0.next()返回的是waterA,协作机器把它传给了处理芯片,打印了Name Of Water : waterA,然后调用了递归处理方法,并传入的当前元组。

  • E. 再次进入执行模式块,在执行模式的schedule方法中,执行模式再到调度程序申请处理操作。

  • F. 进入调度程序,但此时还在执行当前操作,所以被加入到了调度程序的队列中,到此第一次处理结束,let disposable = action(state)执行完毕。开始处理队列中的剩余操作while let latest = queue.value.dequeue() ...,通过latest.invoke()方法,处理执行模式前一次申请的操作。

  • G. 进入到执行模式块中, 这里先从清理组中移除此操作的清理处理块self._group.remove(for: removeKey!),然后调用协作机器中的action

  • H. 回到协作机器的action中, 注意这里的元组是没有变的,但是元组的下标在上一次处理中变了,所有通过let next = mutableIterator.0.next()获取到的是waterB,将它传给处理芯片处理,再一次执行recurse进行执行操作。再次跳到 E 开始处理。当let next = mutableIterator.0.next()获取为空时,跳到 I 进行结束处理。

  • I. 协作机器告诉处理芯片处理结束,recurse不会再次执行,调度程序通过while let latest = queue.value.dequeue()获取操作为空,也终于结束了它的schedule。到此整个处理结束。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 毕飞宇《苏北少年“堂吉诃德”》是我喜欢的文字,虽说都是童年小趣,却能写出人生哲理的感觉。文风干练,不造作,也接地气...
    我说人生啊阅读 201评论 0 1
  • 作者: Elaine 职场会客厅 ,转载请联系授权。 嗨,大家早上好!我是虞莹。今天想和大家谈谈“专业”和“有效”...
    职场E姐阅读 126评论 0 0
  • “怎么去拥有一道彩虹,怎么去拥抱一夏天的风……” “那天你和我那个山丘,那样的唱着那一年的歌……” 有这样一个地方...
    前世的红豆阅读 397评论 0 1
  • 我这个人从来不会向别人讲述自己的悲痛,正如我的泪水只会在一个人独处的时候兀自盈眶。一是自己没有这样向朋友袒露心胸的...
    Duchenne_smile阅读 479评论 0 2