数百行代码实现 RxSwift 核心功能

许多使用过 RxSwift 小伙伴,都比较好奇这个框架里面到底有些什么黑魔法。这次我们尝试揭开 RxSwift 的神秘面纱,接下来就用少量代码来实现其核心功能:

以下代码可以在空白的 Swift Playground 上面运行,不需要依赖任何框架。

我们进入正题吧:

Observable & Observer & Disposable

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
    
typealias Observer<Element> = (Event<Element>) -> Void
typealias Disposable = () -> Void
typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable

全剧终 🎉🎉🎉!

其实没这么简单,还有一些细节需要处理。

这里我们把 Observable ,Observer 和 Disposable 都看作是不同类型的函数。

  • Event<Element> - 还是我们熟悉的那个 Event<Element>
  • typealias Observer<Element> = (Event<Element>) -> Void - 观察者是一个用于消费 Event 的函数
  • typealias Disposable = () -> Void - 可清除资源是一个无入参,无返回值的函数,作用是清除资源
  • typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable - 可监听序列是一个以 Observer 为入参,返回 Disposable 的函数,它有时候也被称做 subscribe 函数

我们来创建一个 Observable 吧:

let observable: Observable<Int> = { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

这与我们平时使用 Observable.create 方法相似。

然后再订阅它:

let observer: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("next: \(element)")
    case .error(let error):
        print("error: \(error)")
    case .completed:
        print("completed")
    }
}

let disposable = observable(observer) // subscribe

结果:

next: 0
next: 1
next: 2
next: 3
next: 4
completed
next: 5

哇~ 成功了耶、

等等,好像有点问题,为什么在 completed 事件发生后还会打印 next: 5。序列在产生 completed 这样的终止事件以后,应该就结束了。不会再产生新的元素才对。

是的,接下来我们就修复这个问题:

func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {
    
    return { observer in
        // states {
        var isDisposed = false
        var disposable: Disposable?
        // }
        disposable = subscribe { event in  // subscribe
            if isDisposed { return }
            switch event {
            case .next:
                observer(event)
            case .error, .completed:
                isDisposed = true
                observer(event)
                disposable?()
            }
        }
        if isDisposed {
            disposable?()
        }
        return {
            if isDisposed { return }
            isDisposed = true
            disposable?()
        }
    }
}

我们引入一个 createObservable 的全局函数,这个函数的实现在一定程度上还原了 Observable.create

这个函数接受一个 Observable 并返回一个 Observable
而返回的新 Observable 以源 Observable 为基础并加入了一些逻辑,例如:

  • 用状态 isDisposed 记录源序列知否已经终止,如果终止了就不会给观察者推新的事件
  • 在必要时执行清除逻辑 : disposable?()

我们用新方法创建 Observable 试试:

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

let observer1: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("origin next: \(element)")
    case .error(let error):
        print("origin error: \(error)")
    case .completed:
        print("origin completed")
    }
}

let disposable1 = observable1(observer1) // subscribe

结果:

origin next: 0
origin next: 1
origin next: 2
origin next: 3
origin next: 4
origin completed
dispose

嗯 ,这次结果很满意!

下面我们尝试加入一些操作符。

filter 操作符 - 过滤

我们先加入一个常用的过滤操作符,它的作用就是通过判定来筛选元素:

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
    ...
}

它也是一个全局函数:

  • 入参是判定条件 - predicate: @escaping (Element) -> Bool
  • 返回值是一个“奇怪的东西” - (@escaping Observable<Element>) -> Observable<Element>

天啊,这是啥?
要解释这个“奇怪的东西”,我们先回顾一下之前的 createObservable 函数:

func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> { ... }

如果我们把 createObservable 的函数名和参数名都去掉,就会发现它其实就是 (@escaping Observable<Element>) -> Observable<Element> (小样,穿个马甲我就不认识你了,哼!)

而我们知道 createObservable 的作用是生成一个新的 Observable,它以源 Observable 为基础并加入一些自定义逻辑。

这里 filter 操作符也是要生成一个新的 Observable,并在源 Observable 中加入过滤逻辑。所以它也是 (@escaping Observable<Element>) -> Observable<Element>

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {
    
    return { source -> Observable<Element> in
        ...
    }
}

我们慢慢地将代码展开,因为消化这些内容需要一个过程:
.
.
.
.
.
.

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in
            ...
        }
    }
}

.
.
.
.
.
.

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in

            return source { event in
                ....
            }
        }
    }
}

.
.
.
.
.
.

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let shouldEmitElement = predicate(element)
                    if shouldEmitElement {
                        observer(.next(element))
                    }
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

这就是 filter 操作符

跑起来看看:

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

...

let filteredObservable1 = filter({ $0 > 1 })(observable1)

let filteredDisposable1 = filteredObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("filter next: \(element)")
    case .error(let error):
        print("filter error: \(error)")
    case .completed:
        print("filter completed")
    }
})

结果:

filter next: 2
filter next: 3
filter next: 4
filter completed
dispose

呃,结果是美好的,但 filter({ $0 > 1 })(observable1) 写起来很别扭。
这里 filter({ $0 > 1 }) 返回 (@escaping Observable<Int>) -> Observable<Int>,而它的作用是给传入的 observable1 加入过滤逻辑。

而我们希望可以这样写:

// 以下代码只是一种假设
 observable1
  .filter({ $0 > 1 })

为了解决这个问题,我们可以引入 swift 自定义操作符(swift 爱你哦!):

infix operator |>: AdditionPrecedence

func |><A, B>(
    left: A,
    right: (A) -> B
    ) -> B {
    return right(left)
}

这样一来:

let filteredObservable1 = filter({ $0 > 1 })(observable1)

就能改写成:

let filteredObservable2 = observable1
    |> filter({ $0 > 1 })

可以了,总算把它给掰正了。

下面,我们来引入另一个常用的操作符 map

map 操作符 - 转换

map 会将元素转换一遍:

func map<Element, Result>(
    _ transform: @escaping (Element) -> Result
    ) -> (@escaping Observable<Element>) -> Observable<Result> {
    ...
}

依然是一个全局函数:

  • 入参是转换函数 - transform: @escaping (Element) -> Result
  • 返回值是一个熟悉的函数 - (@escaping Observable<Element>) -> Observable<Result>

入参是一个函数,返回值也是一个函数。天啊,谁来扶我一把!

func map<Element, Result>(
    _ transform: @escaping (Element) -> Result
    ) -> (@escaping Observable<Element>) -> Observable<Result> {
    
    return { source -> Observable<Result> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let result = transform(element)
                    observer(.next(result))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

它创建一个新的 Observable 并加入了转换逻辑

跑一跑:

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

...

let mappedObservable1 = observable1
    |> filter { $0 > 1 }
    |> map { "\($0) mapped" }

let mappedDisposable1 = mappedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("map next: \(element)")
    case .error(let error):
        print("map error: \(error)")
    case .completed:
        print("map completed")
    }
})

结果:

map next: 2 mapped
map next: 3 mapped
map next: 4 mapped
map completed
dispose

太棒了!和我们想的一样。

最后我们要实现 scan 操作符:

scan 操作符 - 扫描

scan 的作用是将元素“累加”,然后发送出来:

func scan<Element, Result>(
    _ seed: Result,
    accumulator: @escaping (Result, Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
    return { source -> Observable<Result> in
        return createObservable { observer in
            // states {
            var total = seed
            // }
            return source { event in
                switch event {
                case .next(let element):
                    total = accumulator(total, element)
                    observer(.next(total))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

.
.
.
.
.
.

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

...

let scanedObservable1 = observable1
    |> filter { $0 > 1 }
    |> scan(0) { $0 + $1 }

let scannedDisposable2 = scanedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("scan next: \(element)")
    case .error(let error):
        print("scan error: \(error)")
    case .completed:
        print("scan completed")
    }
})

结果:

scan next: 2
scan next: 5
scan next: 9
scan completed
dispose

这里的每一个元素分别是:
2 = 0 + 2
5 = 0 + 2 + 3
9 = 0 + 2 + 3 + 4

大功告成!

如果你觉得文章还不错,欢迎点赞分享。或者在下面留言。

源码:

另外,如果你对这些代码比较感兴趣,可以在 Playground 里面手动输入一遍。也可以试着加入一些其他的操作符

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
    
typealias Observer<Element> = (Event<Element>) -> Void
typealias Disposable = () -> Void
typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable


let observable: Observable<Int> = { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

let observer: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("next: \(element)")
    case .error(let error):
        print("error: \(error)")
    case .completed:
        print("completed")
    }
}

let disposable = observable(observer) // subscribe


func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {
    
    return { observer in
        // states {
        var isDisposed = false
        var disposable: Disposable?
        // }
        disposable = subscribe { event in  // subscribe
            if isDisposed { return }
            switch event {
            case .next:
                observer(event)
            case .error, .completed:
                isDisposed = true
                observer(event)
                disposable?()
            }
        }
        if isDisposed {
            disposable?()
        }
        return {
            if isDisposed { return }
            isDisposed = true
            disposable?()
        }
    }
}


let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose") }
}

let observer1: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("origin next: \(element)")
    case .error(let error):
        print("origin error: \(error)")
    case .completed:
        print("origin completed")
    }
}

let disposable1 = observable1(observer1) // subscribe


func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element>) -> Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let shouldEmitElement = predicate(element)
                    if shouldEmitElement {
                        observer(.next(element))
                    }
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

let filteredObservable1 = filter({ $0 > 1 })(observable1)

let filteredDisposable1 = filteredObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("filter next: \(element)")
    case .error(let error):
        print("filter error: \(error)")
    case .completed:
        print("filter completed")
    }
})


infix operator |>: AdditionPrecedence

func |><A, B>(
    left: A,
    right: (A) -> B
    ) -> B {
    return right(left)
}


let filteredObservable2 = observable1
    |> filter({ $0 > 1 })


func map<Element, Result>(
    _ transform: @escaping (Element) -> Result
    ) -> (@escaping Observable<Element>) -> Observable<Result> {
    
    return { source -> Observable<Result> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let result = transform(element)
                    observer(.next(result))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}


let mappedObservable1 = observable1
    |> filter { $0 > 1 }
    |> map { "\($0) mapped" }

let mappedDisposable1 = mappedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("map next: \(element)")
    case .error(let error):
        print("map error: \(error)")
    case .completed:
        print("map completed")
    }
})


func scan<Element, Result>(
    _ seed: Result,
    accumulator: @escaping (Result, Element) -> Result
) -> (@escaping Observable<Element>) -> Observable<Result> {
    return { source -> Observable<Result> in
        return createObservable { observer in
            // states {
            var total = seed
            // }
            return source { event in
                switch event {
                case .next(let element):
                    total = accumulator(total, element)
                    observer(.next(total))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

let scanedObservable1 = observable1
    |> filter { $0 > 1 }
    |> scan(0) { $0 + $1 }

let scannedDisposable2 = scanedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("scan next: \(element)")
    case .error(let error):
        print("scan error: \(error)")
    case .completed:
        print("scan completed")
    }
})
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354