什么是ReactiveSwift?
ReactiveSwift
offers composable, declarative and flexible primitives that are built around the grand concept ofstreams of values over time.
These primitives can be used to uniformly represent common Cocoa and generic programming patterns that are fundamentally an act of observation, e.g. delegate pattern, callback closures, notifications, control actions, responder chain events,futures/promises and key-value observing(KVO).
Because all of these different mechanisms can be represented in the same way, it’s easy to declaratively compose them together, with less spaghetti code and state to bridge the gap.
ReactiveSwift提供了随着时间的推移构建在值流的宏观概念周围的可组合,声明和灵活的基元。
这些原语可以用来统一表示常见的Cocoa和泛型编程模式,这些模式基本上是观察行为,例如, 委托模式,回调关闭,通知,控制行为,响应者连锁事件,期货/承诺和键值观察(KVO)。
ReactiveCocoa的核心概念
这里只是简单的介绍一下几个概念的基本意思和用法,后续会依次详细介绍几种比较重要的概念。Signal
SignalProducer
Property
这三个是使用RAC必须了解和掌握的。
Signal:一个单向的事件流。
Signal拥有者可以单方面控制事件流。观察者可以随时在未来的事件中登记他们的兴趣,但观察对于这个流或它的主人是没有副作用的。没有副作用 没有副作用 没有副作用 (重要的事情说三遍!!!)
这就像电视直播一样 - 您可以观看并对内容做出反应,但您不能对直播或电视台产生副作用。
let channel: Signal<Program, NoError> = tvStation.channelOne
channel.observeValues { program in ... }
Event:事件流的基本传输单元。
Signal可以有任意数量的事件携带一个值,然后是一个特定原因的最终终端事件。
用简单的话来说,或者说用OC里的理解来说,event就像是Signal这个管道传输的水流,或者说是上面的电视机中播放的音频和视频数据。
SignalProducer: 推迟创建一个stream的工作
从字面上的意思来说就是我先创建一个SignalProducer
,等我需要Signal工作时,我就启动SignalProducer
。对于每一个调用来启动SignalProducer
,一个新Signal
的创建和延期的工作随后被调用。
这就像一个点播流媒体服务 - 即使这个插曲是像电视直播一样流动的,你可以选择你观看的内容,什么时候开始观看,什么时候打断。
let frames: SignalProducer<VideoFrame, ConnectionError> = vidStreamer.streamAsset(id: tvShowId)
let interrupter = frames.start { frame in ... }
interrupter.dispose()
Property:一个始终保持有一个值的可订阅框。
an observable box that always holds a value.
observable 这边翻译为 可订阅的 应该更加贴切。
Property
是一个可以观察到其变化的变量。换句话说,这是一个有更强保证的Signal
- 最新的价值永远是可用的,并且流永远不会失败。
这就像视频播放的不断更新的当前时间偏移一样 - 播放始终处于特定的时间偏移,并随着播放的继续而由播放逻辑更新。
let currentTime: Property<TimeInterval> = video.currentTime
print("Current time offset: \(currentTime.value)")
currentTime.signal.observeValues { timeBar.timeLabel.text = "\($0)" }
Property一般是作为VM的观察属性存在,可以理解为Property是一个不用发送Error 和 Completed的 Signal
。
Action:具有预设操作的有序的的工作者。
通过输入调用时,Action将输入和最新状态应用于预设操作,并将输出推送给任何相关方。
这就像一个自动售货机 - 在选择一个插入硬币的选项后,机器将处理订单并最终输出您想要的零食。请注意,整个过程是相互排斥的 - 您不能让机器同时为两个客户服务。
// The vending machine.
class VendingMachine {
let purchase: Action<Int, Snack, VendingMachineError>
let coins: MutableProperty<Int>
// The vending machine is connected with a sales recorder.
init(_ salesRecorder: SalesRecorder) {
coins = MutableProperty(0)
purchase = Action(state: coins, enabledIf: { $0 > 0 }) { coins, snackId in
return SignalProducer { observer, _ in
// The sales magic happens here.
// Fetch a snack based on its id
}
}
// The sales recorders are notified for any successful sales.
purchase.values.observeValues(salesRecorder.record)
}
}
// Purchase from the vending machine with a specific option.
vendingMachine.purchase
.apply(snackId)
.startWithResult { result
switch result {
case let .success(snack):
print("Snack: \(snack)")
case let .failure(error):
// Out of stock? Insufficient fund?
print("Transaction aborted: \(error)")
}
}
Lifetime:限制订阅者(观察者)的生命周期
当观察一个Signal
或者SignalProducer
,如果不再有人观察它,那么继续发出价值是没有意义的。考虑视频流:一旦你停止观看视频,流可以自动关闭,提供Lifetime
:
class VideoPlayer {
private let (lifetime, token) = Lifetime.make()
func play() {
let frames: SignalProducer<VideoFrame, ConnectionError> = ...
frames.take(during: lifetime).start { frame in ... }
}
}
Basic Operators: 基本操作符
Performing side effects with event streams
Performing side effects with event streams
-
Observation:订阅
Signals
可以用observe函数来观察。
signal.observe { event in
switch event {
case let .value(value):
print("Value: \(value)")
case let .failed(error):
print("Failed: \(error)")
case .completed:
print("Completed")
case .interrupted:
print("Interrupted")
}
}
value,failed,completed和interrupted可提供的事件时对应的事件发生时将被调用。
signal.observeValues { value in
print("Value: \(value)")
}
signal.observeFailed { error in
print("Failed: \(error)")
}
signal.observeCompleted {
print("Completed")
}
signal.observeInterrupted {
print("Interrupted")
}
-
Injecting effects
副作用可以与on操作员一起注入事件流,而不需要实际订阅。
let producer = signalProducer
.on(starting: {
print("Starting")
}, started: {
print("Started")
}, event: { event in
print("Event: \(event)")
}, value: { value in
print("Value: \(value)")
}, failed: { error in
print("Failed: \(error)")
}, completed: {
print("Completed")
}, interrupted: {
print("Interrupted")
}, terminated: {
print("Terminated")
}, disposed: {
print("Disposed")
})
请注意,没有必要提供所有参数 - 所有参数都是可选的,您只需要为您关心的事件提供回调。
请注意,直到producer开始(可能在其他地方),没有任何东西会被打印。
Transforming event streams: 转换事件流
-
Mapping
creating a new stream with the results.
Signal
中转换成新的 stream。
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.map { string in string.uppercased() }
.observeValues { value in print(value) }
observer.send(value: "a") // Prints A
observer.send(value: "b") // Prints B
observer.send(value: "c") // Prints C
-
Filtering
过滤事件流
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.filter { number in number % 2 == 0 }
.observeValues { value in print(value) }
observer.send(value: 1) // Not printed
observer.send(value: 2) // Prints 2
observer.send(value: 3) // Not printed
observer.send(value: 4) // prints 4
-
Aggregating
reduce运算符用于聚集一个事件流的值转换成一个单一的组合值。请注意,最终值仅在输入流完成后才会发送。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.reduce(1) { $0 * $1 }
.observeValues { value in print(value) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
observer.send(value: 3) // nothing printed
observer.sendCompleted() // prints 6
collect运算符用于聚集一个事件流的值成一个单一的阵列值。请注意,最终值仅在输入流完成后才会发送。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.collect()
.observeValues { value in print(value) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
observer.send(value: 3) // nothing printed
observer.sendCompleted() // prints [1, 2, 3]
这边需要的注意的是reduce
和 collect
可以用于合并 Signal
,但是必须是所有的Signal
都有变化之后才会有输出。
Combining event streams: 合并事件流
-
Combining latest values
combineLatest
函数结合了两个(或更多)事件流的最新值。
结果流将只在每个输入发送至少一个值之后发送其第一个值。之后,任何输入上的新值都将导致输出上的新值。
let (numbersSignal, numbersObserver) = Signal<Int, NoError>.pipe()
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()
let signal = Signal.combineLatest(numbersSignal, lettersSignal)
signal.observeValues { next in print("Next: \(next)") }
signal.observeCompleted { print("Completed") }
numbersObserver.send(value: 0) // nothing printed
numbersObserver.send(value: 1) // nothing printed
lettersObserver.send(value: "A") // prints (1, A)
numbersObserver.send(value: 2) // prints (2, A)
numbersObserver.sendCompleted() // nothing printed
lettersObserver.send(value: "B") // prints (2, B)
lettersObserver.send(value: "C") // prints (2, C)
lettersObserver.sendCompleted() // prints "Completed"
-
Zipping
zip
函数成对连接两个(或更多)事件流的值。任何第N个元组的元素对应于输入流的第N个元素。
这意味着输出流的第N个值不能发送,直到每个输入发送至少N个值。
let (numbersSignal, numbersObserver) = Signal<Int, NoError>.pipe()
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()
let signal = Signal.zip(numbersSignal, lettersSignal)
signal.observeValues { next in print("Next: \(next)") }
signal.observeCompleted { print("Completed") }
numbersObserver.send(value: 0) // nothing printed
numbersObserver.send(value: 1) // nothing printed
lettersObserver.send(value: "A") // prints (0, A)
numbersObserver.send(value: 2) // nothing printed
numbersObserver.sendCompleted() // nothing printed
lettersObserver.send(value: "B") // prints (1, B)
lettersObserver.send(value: "C") // prints (2, C) & "Completed"
Flattening event streams: 展开事件流
为了理解为什么有不同的策略以及它们如何相互比较,请看一下这个例子,想象列偏移量是时间的:
let values = [
// imagine column offset as time
[ 1, 2, 3 ],
[ 4, 5, 6 ],
[ 7, 8 ],
]
let merge =
[ 1, 4, 2, 7,5, 3,8,6 ]
let concat =
[ 1, 2, 3,4, 5, 6,7, 8]
let latest =
[ 1, 4, 7, 8 ]
-
Merging
.merge
策略立即转发内事件流至外事件流中的每一个数值。在外部事件流或任何内部事件流上发送的任何故障立即在展开事件流上发送并终止。
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()
let (numbersSignal, numbersObserver) = Signal<String, NoError>.pipe()
let (signal, observer) = Signal<Signal<String, NoError>, NoError>.pipe()
signal.flatten(.merge).observeValues { print($0) }
observer.send(value: lettersSignal)
observer.send(value: numbersSignal)
observer.sendCompleted()
lettersObserver.send(value: "a") // prints "a"
numbersObserver.send(value: "1") // prints "1"
lettersObserver.send(value: "b") // prints "b"
numbersObserver.send(value: "2") // prints "2"
lettersObserver.send(value: "c") // prints "c"
numbersObserver.send(value: "3") // prints "3"
-
Concatenating
.concat
策略用于序列化内部事件流的事件。外部事件流开始观察。直到前一个事件流完成,才会观察到每个后续事件流。立即转移到展开的事件流。
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()
let (numbersSignal, numbersObserver) = Signal<String, NoError>.pipe()
let (signal, observer) = Signal<Signal<String, NoError>, NoError>.pipe()
signal.flatten(.concat).observeValues { print($0) }
observer.send(value: lettersSignal)
observer.send(value: numbersSignal)
observer.sendCompleted()
numbersObserver.send(value: "1") // nothing printed
lettersObserver.send(value: "a") // prints "a"
lettersObserver.send(value: "b") // prints "b"
numbersObserver.send(value: "2") // nothing printed
lettersObserver.send(value: "c") // prints "c"
lettersObserver.sendCompleted() // nothing printed
numbersObserver.send(value: "3") // prints "3"
numbersObserver.sendCompleted() // nothing printed
-
Switching to the latest
.latest
仅转发最新输入事件流中的值或失败。
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()
let (numbersSignal, numbersObserver) = Signal<String, NoError>.pipe()
let (signal, observer) = Signal<Signal<String, NoError>, NoError>.pipe()
signal.flatten(.latest).observeValues { print($0) }
observer.send(value: lettersSignal) // nothing printed
numbersObserver.send(value: "1") // nothing printed
lettersObserver.send(value: "a") // prints "a"
lettersObserver.send(value: "b") // prints "b"
numbersObserver.send(value: "2") // nothing printed
observer.send(value: numbersSignal) // nothing printed
lettersObserver.send(value: "c") // nothing printed
numbersObserver.send(value: "3") // prints "3"
Working with errors: 处理错误
这些运算符用于处理事件流上可能发生的故障,或执行可能在事件流上失败的操作。
-
Catching failures
flatMapError
捕捉输入事件流中可能出现的任何故障,然后开始一个新的SignalProducer
在其位置。
let (signal, observer) = Signal<String, NSError>.pipe()
let producer = SignalProducer(signal)
let error = NSError(domain: "domain", code: 0, userInfo: nil)
producer
.flatMapError { _ in SignalProducer<String, NoError>(value: "Default") }
.startWithValues { print($0) }
observer.send(value: "First") // prints "First"
observer.send(value: "Second") // prints "Second"
observer.send(error: error) // prints "Default"
-
Failable transformations
SignalProducer.attempt(_:)
允许您将失败的操作转换为事件流。在attempt(_:)
与attemptMap(_:)
允许您在一个事件流进行failable
操作或变换。
let dictionaryPath = URL(fileURLWithPath: "/usr/share/dict/words")
// Create a `SignalProducer` that lazily attempts the closure
// whenever it is started
let data = SignalProducer.attempt { try Data(contentsOf: dictionaryPath) }
// Lazily apply a failable transformation
let json = data.attemptMap { try JSONSerialization.jsonObject(with: $0) }
json.startWithResult { result in
switch result {
case let .success(words):
print("Dictionary as JSON:")
print(words)
case let .failure(error):
print("Couldn't parse dictionary as JSON: \(error)")
}
}
-
Retrying
retry
将重新启动原来的SignalProducer
失败 次数count
。
var tries = 0
let limit = 2
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let producer = SignalProducer<String, NSError> { (observer, _) in
tries += 1
if tries <= limit {
observer.send(error: error)
} else {
observer.send(value: "Success")
observer.sendCompleted()
}
}
producer
.on(failed: {e in print("Failure")}) // prints "Failure" twice
.retry(upTo: 2)
.start { event in
switch event {
case let .value(next):
print(next) // prints "Success"
case let .failed(error):
print("Failed: \(error)")
case .completed:
print("Completed")
case .interrupted:
print("Interrupted")
}
}
-
Mapping errors
mapError
在转变的事件流的任何失败的错误到一个新的错误。
enum CustomError: String, Error {
case foo = "Foo Error"
case bar = "Bar Error"
case other = "Other Error"
}
let (signal, observer) = Signal<String, NSError>.pipe()
signal
.mapError { (error: NSError) -> CustomError in
switch error.domain {
case "com.example.foo":
return .foo
case "com.example.bar":
return .bar
default:
return .other
}
}
.observeFailed { error in
print(error.rawValue)
}
observer.send(error: NSError(domain: "com.example.foo", code: 42, userInfo: nil)) // prints "Foo Error"
-
Promote
promoteError
使其能够在事件流不会产生故障。
let (numbersSignal, numbersObserver) = Signal<Int, NoError>.pipe()
let (lettersSignal, lettersObserver) = Signal<String, NSError>.pipe()
numbersSignal
.promoteError(NSError.self)
.combineLatest(with: lettersSignal)
结束语
上述是ReactiveCocoa swift版本的一些基础知识。现在的基本上没有介绍ReactiveCocoa 5.0以上的文章,这边只是顺便记录一下。
参考资料:
ReactiveSwift Docs