Publisher
- 发布源协议,可以实现该协议来实现自己的数据
源
,
-
Subject
继承自Publisher
,提供了三套默认的内置实现类
- 容器包装类,具体实现交由
Subscriber
实现类来转发数据流
- 内置提供各种各样的操作符(函数式编程的仿函数,Swift语言的
Operator
)
- allSatisfy
- tryAllSatisfy
- compactMap
- contains
- filter
- tryFilter
- throttle
- ....等等
-
Publisher
接口
public protocol Publisher {
/// 数据输出流,相当于订阅者的数据输入流
associatedtype Output
/// 数据发布,要么发布一个真实数据流,要么发布一个错误(或者可以选择丢弃错误, Never忽略错误)
associatedtype Failure: Error
/// 接收数据源输入流,并转发给订阅者
func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input
}
- 所有的操作符的流程都类似,追踪一个操作符的调用顺序和触发流程
- 定义一个数组的数据流
[1, 2, 10000].publisher
, publisher
是Sequence
的一个扩展,内部使用Publishers.Sequence
进行了包装成了一个可以被观察的数据流
extension Sequence {
public var publisher: Publishers.Sequence<Self, Never> {
return .init(sequence: self)
}
- `Publishers.Sequence` 内部实现
/// 序列流继承于`Publisher`
public struct Sequence<Elements: Swift.Sequence, Failure: Error>: Publisher {
/// 输出源
public typealias Output = Elements.Element
public let sequence: Elements
public init(sequence: Elements) {
self.sequence = sequence
}
/// 实现 `Publisher`协议方法`receive`
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Failure == Downstream.Failure,
Elements.Element == Downstream.Input
{
/// 包装类`Inner`实现了`Subscription`协议,内部持有当前收到的`subscriber`, 进行转发
let inner = Inner(downstream: subscriber, sequence: sequence)
/// 判断是否序列是否到末尾,如果序列结束发送完成事件并取消序列,数据流完成,反之,持续接收数据流
if inner.isExhausted {
subscriber.receive(subscription: Subscriptions.empty)
subscriber.receive(completion: .finished)
inner.cancel()
} else {
/// 内部会调用`Subscription`协议的 `request`方法
subscriber.receive(subscription: inner)
}
}
}
-
Inner
作为一个私有类,单独实现了序列数据流的内部数据源的流转(每一个操作符都有一套内部特有的Inner实现类),源码我进行了部分简化
private final class Inner<Downstream: Subscriber, Elements: Sequence, Failure>
: Subscription
where Downstream.Input == Elements.Element,
Downstream.Failure == Failure
{
typealias Iterator = Elements.Iterator
typealias Element = Elements.Element
private var sequence: Elements?
private var downstream: Downstream?
private var iterator: Iterator
private var next: Element?
private var pendingDemand = Subscribers.Demand.none
/// 初始化持有的`downstream`数据流,方便后续数据流转
fileprivate init(downstream: Downstream, sequence: Elements) {
self.sequence = sequence
self.downstream = downstream
self.iterator = sequence.makeIterator()
next = iterator.next()
}
func request(_ demand: Subscribers.Demand) {
guard downstream != nil else {
return
}
while let downstream = self.downstream, pendingDemand > 0 {
if let current = self.next {
/// 迭代数据流,依次进行数据的转发,交给订阅者接收
let additionalDemand = downstream.receive(current)
}
if next == nil {
self.downstream = nil
self.sequence = nil
/// 序列结束,发送完成事件
downstream.receive(completion: .finished)
return
}
}
}
}
- 上面定义了数据流源,并发出了数据,等待订阅者监听数据流,
sink
和assign
操作符可以进行订阅,后续会列出sink和assign的源码
代码演示片段
class Root: NSObject {
var name: String = ""
}
let root = Root()
let arr: [Int] = [1, 2, 100]
/// 将数组转换成一个数据流
arr.publisher
/// 过滤数据流中大于2的元素
.filter{$0 > 2}
/// 进行一次转换,转成String类型
.compactMap{"\($0)"}
/// `Sink`订阅数据源
.sink { value in
debugPrint("数据流: \(value)")
}.store(in: &cancel)
/// 使用keypath进行赋值
arr.publisher
.filter{$0 > 2}
.compactMap{"\($0)"}
/// `Assign` keypath 赋值
.assign(to: \.name, on: root).store(in: &cancel)
debugPrint("root name: \(root.name)")
/// 控制台输出
"数据流: 100"
"root name: 100"