func disposableDYZ() {
let observable = Observable<Int>.create { (observer) -> Disposable in
observer.onCompleted()
return Disposables.create {
print("subscription 销毁啦")
}
}
let disposable = observable.subscribe(onNext: { (a) in
print("订阅到",a)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成了")
}) {
print("销毁啦")
}
disposable.disposed(by: disposeBag)
}
完成了
销毁啦
subscription 销毁啦
- sink 销毁响应关系
- subscription create创建闭包回调里面创建的销毁者
- 1、subscribe(onNext..)中的onDisposed闭包,的销毁是在AnonymousObserver初始化闭包执行的时候,当中执行error和completed时
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
- 2、SinkDisposer,的销毁是在 AnonymousObservableSink中on方法,执行.error, .completed -> Sink中dispose
- self._cancel是 Producer中的subscribe中的SinkDisposer
- 3、
self.forwardOn(event) self.dispose()
决定了,先执行了1(subscribe(onNext..)中的onDisposed闭包)的闭包,再执行SinkDisposer的销毁(包括,关系的销毁,和create创建闭包回调里面创建的销毁者的销毁),所以会先打印“销毁啦”,然后打印“subscription 销毁啦”
AnonymousObservableSink类
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
Sink类
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
- 4、返回disposable,以便可以放到DisposeBag,然后在VC释放的时候在deinit中销毁。必须添加到DisposeBag,不然会导致释放不干净,可以使用RxSwift-内存管理中的RxSwift.Resources.total检测,虽然此时的VC的deinit被调用了。nice
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
*****show:RxSwift的引用计数: 0
deinit被调用了 走了
*****show:RxSwift的引用计数: 7
deinit被调用了 走了
*****show:RxSwift的引用计数: 14
deinit被调用了 走了
下面可以不看了
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
extension ObservableType {
public func subscribe(_ on: @escaping (Event<Element>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
fileprivate final class SinkDisposer: Cancelable {
fileprivate enum DisposeState: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private let _state = AtomicInt(0)
private var _sink: Disposable?
private var _subscription: Disposable?
var isDisposed: Bool {
return isFlagSet(self._state, DisposeState.disposed.rawValue)
}
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self._sink = sink
self._subscription = subscription
let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}
func dispose() {
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = self._sink else {
rxFatalError("Sink not set")
}
guard let subscription = self._subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}
}
1、subscribe(onNext...) 中disposable = Disposables.create(with: disposed)
extension Disposables {
/// Constructs a new disposable with the given action used for disposal.
///
/// - parameter dispose: Disposal action which will be run upon calling `dispose`.
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
return AnonymousDisposable(disposeAction: dispose)
}
}
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
public typealias DisposeAction = () -> Void
private let _isDisposed = AtomicInt(0)
private var _disposeAction: DisposeAction?
/// - returns: Was resource disposed.
public var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
/// Constructs a new disposable with the given action used for disposal.
///
/// - parameter disposeAction: Disposal action which will be run upon calling `dispose`.
fileprivate init(_ disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
// Non-deprecated version of the constructor, used by `Disposables.create(with:)`
fileprivate init(disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
if let action = self._disposeAction {
self._disposeAction = nil
action()
}
}
}
}
2、subscribe(onNext...) 中return Disposables.create(self.asObservable().subscribe(observer),disposable)
extension Disposables {
/// Creates a disposable with the given disposables.
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)
}
}
private final class BinaryDisposable : DisposeBase, Cancelable {
private let _isDisposed = AtomicInt(0)
// state
private var _disposable1: Disposable?
private var _disposable2: Disposable?
/// - returns: Was resource disposed.
var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
/// Constructs new binary disposable from two disposables.
///
/// - parameter disposable1: First disposable
/// - parameter disposable2: Second disposable
init(_ disposable1: Disposable, _ disposable2: Disposable) {
self._disposable1 = disposable1
self._disposable2 = disposable2
super.init()
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
}