Rx 从 SinkDisposer 一窥线程安全

// SinkDisposer
fileprivate final class SinkDisposer: Cancelable {
    fileprivate enum DisposeState: UInt32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }

    // Jeej, swift API consistency rules
    fileprivate enum DisposeStateInt32: Int32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }
    
    private var _state: AtomicInt = 0
    private var _sink: Disposable? = nil
    private var _subscription: Disposable? = nil

    var isDisposed: Bool {
        return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
    }

    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        _sink = sink
        _subscription = subscription

        let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }
    }
    
    func dispose() {
        let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = _sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = _subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            _sink = nil
            _subscription = nil
        }
    }
}



首先我们要分析下需求,哪些操作是需要保证线程安全的。 显然_state的设置是要保证线程安全的, 那么与之相关的读写操作都是要加锁的,因此 isDisposed,setSinkAndSubscription,dispose都是需要加锁的。RX 通过Atomic Operation 保证操作的原子性。这里值的注意的是通过or设置具体标志位,通过&操作检测具体相应标志位。

setSinkAndSubscription 为例子:

      let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }


现假设执行setSinkAndSubscription_state = 0bxy

那么执行 let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)

previousState = 0bxy

_state = 0bxy | 0b10 = 0b1y

也就是说这个操作最终导致_state 的第二位置1
previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue

0bxy & 0b10 = 0bx0

x = 1 则最终结果为0b10,否则则为0b00, 再通过if语句即可检测第二位是否为0.

这里还有一个小坑:

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
             print("triger  disposed in setSinkAndSubscription function \(self) \n \(Thread.current)")
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }

在检测完setSinkAndSubscription flag之后,立马又检测 disposed flag 如果为真则立即执行dispose操作,也就是说一旦set disposed flag,则再设置setSinkAndSubscription 则是无效操作,这里我不确定具体是什么情况会触发这个操作,不过我跑了下单元测试,确实某些case是会触发这种情况,以下便是一个触发该case的单元测试:

    func test1323() {
        func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
            _ = share(Observable<Int>.create({ observer in
                    observer.on(.next(1))
                    Thread.sleep(forTimeInterval: 0.1)
                    observer.on(.completed)
                    return Disposables.create()
                })
                .flatMap { (int) -> Observable<Int> in
                    return Observable.create { (observer) -> Disposable in
                        DispatchQueue.global().async {
                            observer.onNext(int)
                            observer.onCompleted()
                        }
                        return Disposables.create()
                    }
                })
                .subscribe { (e) in
                }
        }

        for op in [
            { $0.share(replay: 0, scope: .whileConnected) },
            { $0.share(replay: 0, scope: .forever) },
            { $0.share(replay: 1, scope: .whileConnected) },
            { $0.share(replay: 1, scope: .forever) },
            { $0.share(replay: 2, scope: .whileConnected) },
            { $0.share(replay: 2, scope: .forever) },
            ] as [(Observable<Int>) -> Observable<Int>] {
            performSharingOperatorsTest(share: op)
        }
    }

但是具体怎么原理还有待细究。

相关资料:

property属性的atomic和nonatomic区别

理解Memory Barrier(内存屏障)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容

  • 开篇 一直觉得自己似乎越来越浮躁了,可能当代的大多数年轻人都活在恐慌里,问题已经从小时候的不思进取变成了“太思进取...
    Maru阅读 3,806评论 13 26
  • 当程序员原来越浮躁了,项目做多了大都是雷同的, 对技术没啥帮助,读一些牛逼的第三方框架,有助于提升,关于RxSwi...
    水落斜阳阅读 763评论 0 1
  • 转载自:https://halfrost.com/go_map_chapter_one/ https://half...
    HuJay阅读 6,154评论 1 5
  • 接着上节 mutex,本节主要介绍atomic的内容,练习代码地址。本文参考http://www.cplusplu...
    jorion阅读 73,667评论 1 14
  • 上周日早晨,睡的迷迷糊糊,手机叮咚一声,银行通知短信,常用的卡上收到笔转账。 还在琢磨到底是什么钱。我妈的...
    一粒葵瓜子阅读 331评论 0 0