Last active
October 22, 2021 13:04
-
-
Save vitonzhangtt/d54f19c01aafe659240c0045e6b59c03 to your computer and use it in GitHub Desktop.
Notes on RxSwift 5.x Source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Producer.swift | |
// RxSwift | |
// | |
// Created by Krunoslav Zaher on 2/20/15. | |
// Copyright © 2015 Krunoslav Zaher. All rights reserved. | |
// | |
class Producer<Element> : Observable<Element> { | |
override init() { | |
super.init() | |
} | |
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { | |
// TODO: CurrentThreadScheduler | |
if !CurrentThreadScheduler.isScheduleRequired { | |
// The returned disposable needs to release all references once it was disposed. | |
let disposer = SinkDisposer() | |
let sinkAndSubscription = self.run(observer, cancel: disposer) | |
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) | |
return disposer | |
} | |
else { | |
return CurrentThreadScheduler.instance.schedule(()) { _ in | |
let disposer = SinkDisposer() | |
let sinkAndSubscription = self.run(observer, cancel: disposer) | |
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) | |
return disposer | |
} | |
} | |
} | |
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { | |
rxAbstractMethod() | |
} | |
} | |
private final class SinkDisposer: Cancelable { | |
private enum DisposeState: Int32 { | |
case disposed = 1 | |
case sinkAndSubscriptionSet = 2 | |
} | |
private let _state = AtomicInt(0) | |
private var _sink: Disposable? | |
private var _subscription: Disposable? | |
var isDisposed: Bool { | |
return isFlagSet(self._state, DisposeState.disposed.rawValue) | |
} | |
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) { | |
self._sink = sink | |
self._subscription = subscription | |
let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue) | |
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 { | |
rxFatalError("Sink and subscription were already set") | |
} | |
if (previousState & DisposeState.disposed.rawValue) != 0 { | |
sink.dispose() | |
subscription.dispose() | |
self._sink = nil | |
self._subscription = nil | |
} | |
} | |
func dispose() { | |
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue) | |
if (previousState & DisposeState.disposed.rawValue) != 0 { | |
return | |
} | |
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 { | |
guard let sink = self._sink else { | |
rxFatalError("Sink not set") | |
} | |
guard let subscription = self._subscription else { | |
rxFatalError("Subscription not set") | |
} | |
sink.dispose() | |
subscription.dispose() | |
self._sink = nil | |
self._subscription = nil | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment