许多使用过 RxSwift 小伙伴,都比较好奇这个框架里面到底有些什么黑魔法。这次我们尝试揭开 RxSwift 的神秘面纱,接下来就用少量代码来实现其核心功能:
- Observable - 可监听序列
- Observer - 观察者
- Disposable - 可清除资源
- filter 操作符 - 过滤
- map 操作符 - 转换
- scan 操作符 - 扫描
以下代码可以在空白的 Swift Playground 上面运行,不需要依赖任何框架。
我们进入正题吧:
Observable & Observer & Disposable
enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
typealias Observer<Element> = (Event<Element>) -> Void
typealias Disposable = () -> Void
typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable
全剧终 🎉🎉🎉!
其实没这么简单,还有一些细节需要处理。
这里我们把 Observable ,Observer 和 Disposable 都看作是不同类型的函数。
-
Event<Element>
- 还是我们熟悉的那个Event<Element>
-
typealias Observer<Element> = (Event<Element>) -> Void
- 观察者是一个用于消费Event
的函数 -
typealias Disposable = () -> Void
- 可清除资源是一个无入参,无返回值的函数,作用是清除资源 -
typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable
- 可监听序列是一个以Observer
为入参,返回Disposable
的函数,它有时候也被称做 subscribe 函数
我们来创建一个 Observable
吧:
let observable: Observable<Int> = { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
这与我们平时使用 Observable.create 方法相似。
然后再订阅它:
let observer: Observer<Int> = { event in
switch event {
case .next(let element):
print("next: \(element)")
case .error(let error):
print("error: \(error)")
case .completed:
print("completed")
}
}
let disposable = observable(observer) // subscribe
结果:
next: 0
next: 1
next: 2
next: 3
next: 4
completed
next: 5
哇~ 成功了耶、
等等,好像有点问题,为什么在 completed
事件发生后还会打印 next: 5
。序列在产生 completed
这样的终止事件以后,应该就结束了。不会再产生新的元素才对。
是的,接下来我们就修复这个问题:
func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {
return { observer in
// states {
var isDisposed = false
var disposable: Disposable?
// }
disposable = subscribe { event in // subscribe
if isDisposed { return }
switch event {
case .next:
observer(event)
case .error, .completed:
isDisposed = true
observer(event)
disposable?()
}
}
if isDisposed {
disposable?()
}
return {
if isDisposed { return }
isDisposed = true
disposable?()
}
}
}
我们引入一个 createObservable
的全局函数,这个函数的实现在一定程度上还原了 Observable.create。
这个函数接受一个 Observable
并返回一个 Observable
,
而返回的新 Observable
以源 Observable
为基础并加入了一些逻辑,例如:
- 用状态
isDisposed
记录源序列知否已经终止,如果终止了就不会给观察者推新的事件 - 在必要时执行清除逻辑 :
disposable?()
我们用新方法创建 Observable
试试:
let observable1: Observable<Int> = createObservable { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
let observer1: Observer<Int> = { event in
switch event {
case .next(let element):
print("origin next: \(element)")
case .error(let error):
print("origin error: \(error)")
case .completed:
print("origin completed")
}
}
let disposable1 = observable1(observer1) // subscribe
结果:
origin next: 0
origin next: 1
origin next: 2
origin next: 3
origin next: 4
origin completed
dispose
嗯 ,这次结果很满意!
下面我们尝试加入一些操作符。
filter 操作符 - 过滤
我们先加入一个常用的过滤操作符,它的作用就是通过判定来筛选元素:
func filter<Element>(
_ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
...
}
它也是一个全局函数:
- 入参是判定条件 -
predicate: @escaping (Element) -> Bool
- 返回值是一个“奇怪的东西” -
(@escaping Observable<Element>) -> Observable<Element>
天啊,这是啥?
要解释这个“奇怪的东西”,我们先回顾一下之前的 createObservable
函数:
func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> { ... }
如果我们把 createObservable
的函数名和参数名都去掉,就会发现它其实就是 (@escaping Observable<Element>) -> Observable<Element>
(小样,穿个马甲我就不认识你了,哼!)
而我们知道 createObservable
的作用是生成一个新的 Observable
,它以源 Observable
为基础并加入一些自定义逻辑。
这里 filter
操作符也是要生成一个新的 Observable
,并在源 Observable
中加入过滤逻辑。所以它也是 (@escaping Observable<Element>) -> Observable<Element>
func filter<Element>(
_ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
return { source -> Observable<Element> in
...
}
}
我们慢慢地将代码展开,因为消化这些内容需要一个过程:
.
.
.
.
.
.
func filter<Element>(
_ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
return { source -> Observable<Element> in
return createObservable { observer in
...
}
}
}
.
.
.
.
.
.
func filter<Element>(
_ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
return { source -> Observable<Element> in
return createObservable { observer in
return source { event in
....
}
}
}
}
.
.
.
.
.
.
func filter<Element>(
_ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
return { source -> Observable<Element> in
return createObservable { observer in
return source { event in
switch event {
case .next(let element):
let shouldEmitElement = predicate(element)
if shouldEmitElement {
observer(.next(element))
}
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
}
}
这就是 filter 操作符。
跑起来看看:
let observable1: Observable<Int> = createObservable { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
...
let filteredObservable1 = filter({ $0 > 1 })(observable1)
let filteredDisposable1 = filteredObservable1({ event in // subscribe
switch event {
case .next(let element):
print("filter next: \(element)")
case .error(let error):
print("filter error: \(error)")
case .completed:
print("filter completed")
}
})
结果:
filter next: 2
filter next: 3
filter next: 4
filter completed
dispose
呃,结果是美好的,但 filter({ $0 > 1 })(observable1)
写起来很别扭。
这里 filter({ $0 > 1 })
返回 (@escaping Observable<Int>) -> Observable<Int>
,而它的作用是给传入的 observable1
加入过滤逻辑。
而我们希望可以这样写:
// 以下代码只是一种假设
observable1
.filter({ $0 > 1 })
为了解决这个问题,我们可以引入 swift 自定义操作符(swift 爱你哦!):
infix operator |>: AdditionPrecedence
func |><A, B>(
left: A,
right: (A) -> B
) -> B {
return right(left)
}
这样一来:
let filteredObservable1 = filter({ $0 > 1 })(observable1)
就能改写成:
let filteredObservable2 = observable1
|> filter({ $0 > 1 })
可以了,总算把它给掰正了。
下面,我们来引入另一个常用的操作符 map
。
map 操作符 - 转换
map
会将元素转换一遍:
func map<Element, Result>(
_ transform: @escaping (Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
...
}
依然是一个全局函数:
- 入参是转换函数 -
transform: @escaping (Element) -> Result
- 返回值是一个熟悉的函数 -
(@escaping Observable<Element>) -> Observable<Result>
入参是一个函数,返回值也是一个函数。天啊,谁来扶我一把!
func map<Element, Result>(
_ transform: @escaping (Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
return { source -> Observable<Result> in
return createObservable { observer in
return source { event in
switch event {
case .next(let element):
let result = transform(element)
observer(.next(result))
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
}
}
它创建一个新的 Observable
并加入了转换逻辑。
跑一跑:
let observable1: Observable<Int> = createObservable { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
...
let mappedObservable1 = observable1
|> filter { $0 > 1 }
|> map { "\($0) mapped" }
let mappedDisposable1 = mappedObservable1({ event in // subscribe
switch event {
case .next(let element):
print("map next: \(element)")
case .error(let error):
print("map error: \(error)")
case .completed:
print("map completed")
}
})
结果:
map next: 2 mapped
map next: 3 mapped
map next: 4 mapped
map completed
dispose
太棒了!和我们想的一样。
最后我们要实现 scan
操作符:
scan 操作符 - 扫描
scan
的作用是将元素“累加”,然后发送出来:
func scan<Element, Result>(
_ seed: Result,
accumulator: @escaping (Result, Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
return { source -> Observable<Result> in
return createObservable { observer in
// states {
var total = seed
// }
return source { event in
switch event {
case .next(let element):
total = accumulator(total, element)
observer(.next(total))
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
}
}
.
.
.
.
.
.
let observable1: Observable<Int> = createObservable { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
...
let scanedObservable1 = observable1
|> filter { $0 > 1 }
|> scan(0) { $0 + $1 }
let scannedDisposable2 = scanedObservable1({ event in // subscribe
switch event {
case .next(let element):
print("scan next: \(element)")
case .error(let error):
print("scan error: \(error)")
case .completed:
print("scan completed")
}
})
结果:
scan next: 2
scan next: 5
scan next: 9
scan completed
dispose
这里的每一个元素分别是:
2 = 0 + 2
5 = 0 + 2 + 3
9 = 0 + 2 + 3 + 4
大功告成!
如果你觉得文章还不错,欢迎点赞分享。或者在下面留言。
源码:
另外,如果你对这些代码比较感兴趣,可以在 Playground 里面手动输入一遍。也可以试着加入一些其他的操作符。
enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
typealias Observer<Element> = (Event<Element>) -> Void
typealias Disposable = () -> Void
typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable
let observable: Observable<Int> = { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
let observer: Observer<Int> = { event in
switch event {
case .next(let element):
print("next: \(element)")
case .error(let error):
print("error: \(error)")
case .completed:
print("completed")
}
}
let disposable = observable(observer) // subscribe
func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {
return { observer in
// states {
var isDisposed = false
var disposable: Disposable?
// }
disposable = subscribe { event in // subscribe
if isDisposed { return }
switch event {
case .next:
observer(event)
case .error, .completed:
isDisposed = true
observer(event)
disposable?()
}
}
if isDisposed {
disposable?()
}
return {
if isDisposed { return }
isDisposed = true
disposable?()
}
}
}
let observable1: Observable<Int> = createObservable { observer in
observer(Event.next(0))
observer(Event.next(1))
observer(Event.next(2))
observer(Event.next(3))
observer(Event.next(4))
observer(Event.completed)
observer(Event.next(5))
return { print("dispose") }
}
let observer1: Observer<Int> = { event in
switch event {
case .next(let element):
print("origin next: \(element)")
case .error(let error):
print("origin error: \(error)")
case .completed:
print("origin completed")
}
}
let disposable1 = observable1(observer1) // subscribe
func filter<Element>(
_ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
return { source -> Observable<Element> in
return createObservable { observer in
return source { event in
switch event {
case .next(let element):
let shouldEmitElement = predicate(element)
if shouldEmitElement {
observer(.next(element))
}
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
}
}
let filteredObservable1 = filter({ $0 > 1 })(observable1)
let filteredDisposable1 = filteredObservable1({ event in // subscribe
switch event {
case .next(let element):
print("filter next: \(element)")
case .error(let error):
print("filter error: \(error)")
case .completed:
print("filter completed")
}
})
infix operator |>: AdditionPrecedence
func |><A, B>(
left: A,
right: (A) -> B
) -> B {
return right(left)
}
let filteredObservable2 = observable1
|> filter({ $0 > 1 })
func map<Element, Result>(
_ transform: @escaping (Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
return { source -> Observable<Result> in
return createObservable { observer in
return source { event in
switch event {
case .next(let element):
let result = transform(element)
observer(.next(result))
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
}
}
let mappedObservable1 = observable1
|> filter { $0 > 1 }
|> map { "\($0) mapped" }
let mappedDisposable1 = mappedObservable1({ event in // subscribe
switch event {
case .next(let element):
print("map next: \(element)")
case .error(let error):
print("map error: \(error)")
case .completed:
print("map completed")
}
})
func scan<Element, Result>(
_ seed: Result,
accumulator: @escaping (Result, Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
return { source -> Observable<Result> in
return createObservable { observer in
// states {
var total = seed
// }
return source { event in
switch event {
case .next(let element):
total = accumulator(total, element)
observer(.next(total))
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
}
}
let scanedObservable1 = observable1
|> filter { $0 > 1 }
|> scan(0) { $0 + $1 }
let scannedDisposable2 = scanedObservable1({ event in // subscribe
switch event {
case .next(let element):
print("scan next: \(element)")
case .error(let error):
print("scan error: \(error)")
case .completed:
print("scan completed")
}
})