RxSwift特征序列之Driver

RxSwift特征序列之Driver

Driver 是个比较特殊的序列,它主要是对需要在 UI 上做出响应的序列进行了封装。这层封装做了三件事情:

  1. 在主线程监听
  2. 不会产生 error 事件
  3. 共享附加作用

没有对比就没有伤害,先看看一搬的序列在驱动 UI 时会怎么做,再回首就能体会到 Driver 的便捷之处了。

demo

准备一个模拟网络请求的函数,然后把输入框的编辑事件和网络请求的结果合并之后,订阅到的结果在 UI(lab 和 btn)上展示出来。

func dealWithData(inputText:String)-> Observable<Any>{
    print("准备网络请求---\(Thread.current)") // data
    return Observable<Any>.create({ (ob) -> Disposable in
        if inputText == "1234" {
            ob.onError(NSError.init(domain: "❌", code: 120, userInfo: nil))
        }
        
        DispatchQueue.global().async {
            print("发送前的线程: \(Thread.current)")
            ob.onNext("\(inputText)")
            ob.onCompleted()
        }
        return Disposables.create()
    })
}

然后开始序列的创建、订阅:

let result  = self.tf.rx.text.skip(1)
    .flatMap { [weak self](input) -> Observable<Any> in
        return (self?.dealWithData(inputText:input ?? ""))!
}

result.map {
        print("map映射lab---\($0)---\(Thread.current)")
        return "长度: \(($0 as! String).count)"
    }
    .bind(to:self.lab.rx.text)
    .disposed(by: disposeBag)

result.map {
        print("map映射btn---\($0)---\(Thread.current)")
        return "\($0 as! String)"
    }
    .bind(to:self.btn.rx.title())
    .disposed(by: disposeBag)

当输入框输入 2 的时候会打印:

准备网络请求---<NSThread: 0x600003798dc0>{number = 1, name = main}
发送前的线程: <NSThread: 0x6000037fd900>{number = 4, name = (null)}
map映射lab---2---<NSThread: 0x6000037fd900>{number = 4, name = (null)}
准备网络请求---<NSThread: 0x600003798dc0>{number = 1, name = main}
发送前的线程: <NSThread: 0x6000037307c0>{number = 6, name = (null)}
map映射btn---2---<NSThread: 0x6000037307c0>{number = 6, name = (null)}

这样写会有些问题:

  1. 输入框每次的编辑事件都会触发两次请求,因为订阅(bind)了两次,并没有共享。
  2. 在子线程请求后,响应也是在子线程。
  3. 如果网络请求序列发出 error 事件,就会取消所有的绑定,无法发出新的请求,并抛出异常错误。

为了避免这三个问题,就得多调用几个方法:

let result  = self.tf.rx.text.skip(1)
    .flatMap { [weak self](input) -> Observable<Any> in
        return (self?.dealWithData(inputText:input ?? ""))!
            //保证了在主线程监听
            .observeOn(MainScheduler())
            //避免程序抛出错误异常
            .catchErrorJustReturn("检测到了错误事件")
    }
    //共享附加作用
    .share(replay: 1, scope: .whileConnected)

result.map {
        print("map映射lab---\($0)---\(Thread.current)")
        return "长度: \(($0 as! String).count)"
    }
    .bind(to:self.lab.rx.text)
    .disposed(by: disposeBag)

result.map {
        print("map映射btn---\($0)---\(Thread.current)")
        return "\($0 as! String)"
    }
    .bind(to:self.btn.rx.title())
    .disposed(by: disposeBag)
    
error 事件的打印:
准备网络请求---<NSThread: 0x600001f6e880>{number = 1, name = main}
map映射lab---检测到了错误事件---<NSThread: 0x600001f6e880>{number = 1, name = main}
map映射btn---检测到了错误事件---<NSThread: 0x600001f6e880>{number = 1, name = main}
发送前的线程: <NSThread: 0x600001f1d100>{number = 4, name = (null)}

使用 Driver

如果使用 Driver 的话,就是这个样子的:

let result = self.tf.rx.text.orEmpty
    .asDriver()
    .flatMap {
        return self.dealWithData(inputText: $0)
            .asDriver(onErrorJustReturn: "检测到了错误事件")
}

result.map {
        return "长度: \(($0 as! String).count)"
    }
    .drive(self.lab.rx.text)
    .disposed(by: disposeBag)

result.map {
        return "\($0 as! String)"
    }
    .drive(self.btn.rx.title())
    .disposed(by: disposeBag)

要使用 Driver,就要先用asDriver把序列转换为 Driver,然后才能拥有 drive 的绑定能力。把输入框编辑事件和网络请求都转为 Driver 序列再合并后,用 drive 把订阅到的数据绑定到 UI 上。同样可以避免那三种情况,写起来更简洁。

解析

先点进去asDriver(由于 demo 中 error 事件是在网络请求中发出的,这里主要看flatMap中的asDriver):

extension ObservableConvertibleType {
    public func asDriver(onErrorJustReturn: Element) -> Driver<Element> {
        let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchErrorJustReturn(onErrorJustReturn)
        return Driver(source)
    }
}

到了ObservableConvertibleTypeDriver分类文件中。里面以调用者作为源序列构建了Driver序列,源序列在这里也是做了两个处理:1.observeOn:在主线程监听;2.catchErrorJustReturn:error 事件不会抛出异常。

1.主线程监听

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
}

结构体DriverSharingStrategyscheduler返回的是SharingScheduler枚举值。默认值就是主线程。

public enum SharingScheduler {
    public private(set) static var make: () -> SchedulerType = { MainScheduler() }
}

2.回避 error 事件

public func catchErrorJustReturn(_ element: Element) -> Observable<Element> {
    return Catch(source: self.asObservable(), handler: { _ in Observable.just(element) })
}

传入 source 和 handler 闭包,构建了一个Catch序列。

final private class Catch<Element>: Producer<Element> {
    typealias Handler = (Swift.Error) throws -> Observable<Element>
    
    init(source: Observable<Element>, handler: @escaping Handler) {
        self._source = source
        self._handler = handler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

老套路了,CatchSinkrun中,self._parent._source.subscribe(self)订阅后的响应在CatchSinkon里面。

final private class CatchSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    func run() -> Disposable {
        let d1 = SingleAssignmentDisposable()
        self._subscription.disposable = d1
        d1.setDisposable(self._parent._source.subscribe(self))

        return self._subscription
    }
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            self.forwardOn(event)
        case .completed:
            self.forwardOn(event)
            self.dispose()
        case .error(let error):
            do {
                let catchSequence = try self._parent._handler(error)

                let observer = CatchSinkProxy(parent: self)
                
                self._subscription.disposable = catchSequence.subscribe(observer)
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }
}

next 和 completed 事件都是常规操作,主要是 error 的处理,首先回调了_handler(error),这个_handler就是初始化Catch时的{ _ in Observable.just(element) }。虽然回调时把error传过去了,但是闭包中直接忽略了(error 被无视了),还是把外界传入的element用来构造一个 just 序列作为返回值(just 只发出唯一的元素,就是这里的element了)。所以,这里的catchSequence就是个 just 序列

然后构造了一个中间层CatchSinkProxy,作为 just 序列的观察者,订阅后自然还是在中间层CatchSinkProxyon中响应:

final private class CatchSinkProxy<Observer: ObserverType>: ObserverType {
    func on(_ event: Event<Element>) {
        self._parent.forwardOn(event)
        
        switch event {
        case .next:
            break
        case .error, .completed:
            self._parent.dispose()
        }
    }
}

二话不说,直接让CatchSink通过forwardOn响应外界。回避 error 流程结束。

3. 共享附加作用

好像还少了一点:共享附加作用,继续跟着Driver的构造函数走。

public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>

Driver只是SharedSequence的别名。看到这里也大致能猜到这个SharedSequence就是处理共享附加作用的。

public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
    let _source: Observable<Element>

    init(_ source: Observable<Element>) {
        self._source = SharingStrategy.share(source)
    }
}

SharedSequence构造函数中的SharingStrategy像是突然蹦出来的,点击也跳不到定义的位置。从SharedSequence的定义中看出它是个遵守SharingStrategyProtocol协议的泛型。之前给Driver起别名的时候,SharedSequence中指定的是DriverSharingStrategy。那我们点击share是就可以选择DriverSharingStrategy.share的位置。

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}

结构体DriverSharingStrategyshare里,又让source调用了一个share。然后,Driver(也就是SharedSequence)的_source还是调用asDriver的那个序列么?

看到这里,感觉很绕,外面一句代码的调用,里面缺穿行了好多层,还看不到头。但是也会发现,每一层都只做很简单的事,它们之间灵活搭配,不同的组合可以完成各种不同的功能。

继续跳进share

public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected) -> Observable<Element> {
    switch scope {
    case .forever:
        switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
        }
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
        default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
        }
    }
}

我们只看满足条件(replay == 1,scope == whileConnected)的ShareReplay1WhileConnected(source: self.asObservable())。其他分支跳进去又是一大坨。这里果然是构建了另一个序列ShareReplay1WhileConnected,但source还是那个调用asDriver的源序列。

final private class ShareReplay1WhileConnected<Element>
    : Observable<Element> {
    fileprivate typealias Connection = ShareReplay1WhileConnectedConnection<Element>
    fileprivate var _connection: Connection?

    init(source: Observable<Element>) {
        self._source = source
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()

        let connection = self._synchronized_subscribe(observer)
        let count = connection._observers.count

        let disposable = connection._synchronized_subscribe(observer)

        self._lock.unlock()
        
        if count == 0 {
            connection.connect()
        }

        return disposable
    }

    @inline(__always)
    private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
        let connection: Connection

        if let existingConnection = self._connection {
            connection = existingConnection
        }
        else {
            connection = ShareReplay1WhileConnectedConnection<Element>(
                parent: self,
                lock: self._lock)
            self._connection = connection
        }

        return connection
    }
}

ShareReplay1WhileConnected的订阅中,调用_synchronized_subscribe,引用了一个ShareReplay1WhileConnectedConnection。然后获取一下观察者总数,就把ShareReplay1WhileConnected作为ShareReplay1WhileConnectedConnection的观察者开始同步订阅了。

fileprivate final class ShareReplay1WhileConnectedConnection<Element>
    : ObserverType
    , SynchronizedUnsubscribeType {
    final func on(_ event: Event<Element>) {
        self._lock.lock()
        let observers = self._synchronized_on(event)
        self._lock.unlock()
        dispatch(observers, event)
    }

    final private func _synchronized_on(_ event: Event<Element>) -> Observers {
        if self._disposed {
            return Observers()
        }

        switch event {
        case .next(let element):
            self._element = element
            return self._observers
        case .error, .completed:
            let observers = self._observers
            self._synchronized_dispose()
            return observers
        }
    }

    final func connect() {
        self._subscription.setDisposable(self._parent._source.subscribe(self))
    }

    final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock(); defer { self._lock.unlock() }
        if let element = self._element {
            observer.on(.next(element))
        }

        let disposeKey = self._observers.insert(observer.on)

        return SubscriptionDisposable(owner: self, key: disposeKey)
    }
}

ShareReplay1WhileConnectedConnection_synchronized_subscribe中,如果_element有值,观察者就发出 next 事件出去,然后就是observer.on装袋了,很熟悉的模式,跟 RxSwift之Subject 中的 ReplaySubject、PublishSubject里的处理非常类似。

到此为止还没有真正的去订阅,我们回到ShareReplay1WhileConnectedsubscribe函数里,继续下一步。初次进来,袋子里的观察者 count 必定为 0 。也会调用connection.connect()ShareReplay1WhileConnectedConnectionconnect里才走了源序列的订阅subscribe。之后的响应也和PublishSubject中的一样多点发送,只是 Replay 的只有一个元素罢了。这样也完成了共享附加作用。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容