Subject 是 Rx 中的基础性元素之一(Scheduler, Observable, Operator).
虽然 Observable 可以很好地满足 发射-观察 这样的模式需要, 但在实际编程中, 很多时候都需要手动在 Observable 上添加新的值并�让值发射出去.
实际上需要的就是一个既可以作为 Observable 又可以作为 Observer 的元素, 这样的元素就是 Subject.
概述
首先来看看 PublishSubject 类型, 它是 Subject 的一种.
从它的名字就可以看得出来, 它是一种发布为主的 Subject. 即它�能够像报纸出版商那样, 接收消息, 然后发布(publish)消息.
来看看它的使用:
example(of: "PublishSubject") {
let subject = PublishSubject<String>()
subject.onNext("Is anyone listening?")
let subscriptionOne = subject
.subscribe(onNext: { string in
print(string)
})
}
上面的代码中, 首先新建了一个 PublishSubject, 然后让他发布了一条消息, 然后再在下面观察�它即可. 但是代码运行后, 却什么也没有打印. 这就涉及到 Subject 的一些特性需要进行学习了.
不打印的原因是: PublishSubject 只会将消息发布给当前的所有观察者, 如果发布消息的时候�这个观察者还没有开始观察, 当然就什么也不会打印了.
可以在观察者添加观察后, 再手动发布一条消息:
subject.on(.next("1"))
这里代码中的 on(.next(_:)) 操作符表示在该 subject 上面添加一个新的 next 事件.
�
实际上下面的代码功能也是一样, 只是更好看:
subject.onNext("1")
这时就会出现打印信息:
--- Example of: PublishSubject ---
1
什么是 Subject
Subject 实际上就是既可以作为 Observable, 也可以作为 Observer 的一种对象.
首先它作为 Observer, 可以接收到 next 事件, 但它接收到事件后并不进行处理, 而是转变为 Observable 的角色, 将该事件发布出来, 从而观察它的观察者可以收到这个消息.
在 RxSwift 中�共有 4 种 Subject 类型:
-
PublishSubject:
被创建出来的时候是空的, 只会发布最新的消息给�订阅者(观察者).
-
BehaviorSubject:
创建的时候会赋一个初始化值给它, 并且如果没有新值的时候, 就会发布这个初始值给观察者, 否则会发布最新的值给观察者.
-
ReplaySubject:
创建的时候会指定一个缓冲区, 并且�维护一个包含消息的缓冲区, 当有观察者开始观察�时, 就会发布这个缓冲区中的所有值给观察者.
-
Variable:
它里面包装了一个 BehaviorSubject, 并将当前的值作为状态来�维护, 同样会发布初始化值或是最新的值给观察者.(即开始观察的时候会发布一次值给观察者, 开始观察时候的这个值可以是初始化的值, 也可以是被新值替换后的值, 有新值的时候就发布新值给观察者.)
PublishSubject 的使用
�如果只想让所有的观察者在有新的消息的时候被通知到, PublishSubject 就是最好的选择.
只有当观察者解除观察(dispose), 或者是产生了 complete 事件或 error 事件, 或是 subject 被销毁, 才会停止这个过程.
还有一个特性就是: 当 PublishSubject 发射了结束事件, 比如 complete 或 error 后, 它就不会再发射任何� next 事件或新的结束事件了. 但它会给所有的新观察者发布之前的那个结束事件.
实际上, 所有的 subject 都具有该特性, 即它作为 Observable 发射过 complete 或 error 后, 就不会再发射 next 事件了, 且以后所有新的观察者开始观察后都只会接收到一个 complete 或 error 结束事件.
所以实践中最好的做法是每次都对结束事件有一个处理, 以防止在观察前就已经结束了.
有时候还希望最新的观察者能够获知之前发射过的最后一个数据的值, 那么就可以使用下面的 subject.
BehaviorSubject
BehaviorSubject 和 publish subject 类似. 不同点在于它们会重新发射上一次的最后一个 next 事件给新的观察者.
比如下面的代码:
let disposebag = DisposeBag()
let subject = BehaviorSubject<String>(value: "hello")
subject.subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed.")
}).addDisposableTo(disposebag)
subject.onNext("world")
输出是:
hello
world
在创建 BehaviorSubject 的时候, 首先给它赋初值, 第一个观察者到达的时候会观察到初始值, 当然之后就是观察到再次发射的那个值. 而又有新的观察者�开始观察的时候, 首先是观察到 world, 也就是最后一个� next 事件.
当然如果是 complete 或者是 error 的话, 也是和一般的 subject 一样, 向新观察者只发送一次.
�但是如果想要展示更多的历史数据, 比如在搜索界面, 可能想要展示最近的 5 条查询条目, 则需要使用下面的 ReplaySubject.
ReplaySubject
Replay subject 带有缓冲区, 能够自动缓存最多缓冲区大小个发送的历史事件(数据). 当新观察者到达时, 就把所有的历史数据都发送给新的观察者.
由于缓冲区是在内存中建立的, 所以使用 replay subject 的时候需要额外小心, 比如缓存了大量的大型数据, 就可能造成内存不足. 另外还要注意尽量避免让 replay subject 缓存数组类型的元素, 数组是值类型的, 所以...
下面的代码�演示了它的使用:
let disposebag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 2)
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
subject.subscribe(onNext: {
print($0)
}).addDisposableTo(disposebag)
subject.onNext("4")
subject.subscribe(onNext: {
print($0)
}).addDisposableTo(disposebag)
输出为:
2
3
3
4
一般情况下, 都是使用 create 方法来创建 �ReplaySubject.
而当遇到 error 或者 complete 之后, 新的观察者同样会先接收到缓冲区的东西, 然后才是 error 或 complete, 只是此后的 next 都不会再发射了.
每次有新的观察者的时候, 都是先把缓冲区中的所有历史事件都发送给新的观察者, 然后再继续发送其他的新的事件.
通过使用 publish, behavior 或者 replay subject, 基本上可以满足平时的大部分需求了. 但是另外有许多情况下, 你只是想要去询问一个 observable : hi, 你当前的值是什么? 这个时候就需要用到 Variable.
既可以发射, 还可以直接询问它的值. 这个东西好用, 所以下面就来看看 Variable.
Variable
�前面也提到过, Variable 中封装了一个 BehaviorSubject, 然后将当前的值保持为它的状态. 可以通过 Variable 的 value 属性来访问它当前的值. 并且和其他 Subject 不同的是, 要让� Variable 接收一个 next 事件, 直接给它的 value 属性赋值即可. 而没有 onNext 方法.
由于它里面封装了 BehaviorSubject, 故需要通过一个初始化值来初始化它. 因此, 当有新的观察者到达时, 会接收到要么初始化的值, 要么是最新的那个值.
要观察的话, 需要通过 asObservable() 来获取它里面的 Observable.
另外 Variable 还有一个与众不同的特点: 它可以保证不会发送 error. 因为无法在它上面放置 error 事件. 且当它被释放的时候, 会自动发送 complete 事件. 所以对于 Variable 而言, 一般情况下不会也不需要手动去添加 error 或 complete 事件.(因为无论如何尝试添加 error 或 complete 最后的结果都是编译错误.)