Created
February 17, 2021 14:41
-
-
Save rnapier/4a397197a4c698d872301597668d964d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
public class Disposable { | |
private var isDisposed = false | |
private let _dispose: () -> Void | |
public func dispose() { | |
if !isDisposed { | |
_dispose() | |
isDisposed = true | |
} | |
} | |
public init(dispose: @escaping () -> Void) { self._dispose = dispose } | |
deinit { | |
dispose() | |
} | |
} | |
public class DisposeBag: Disposable { | |
private var disposables: [Disposable] | |
public init(_ disposables: [Disposable] = []) { | |
self.disposables = disposables | |
super.init(dispose: {}) | |
} | |
public override func dispose() { | |
disposables.removeAll() | |
} | |
func insert(_ disposable: Disposable) { | |
disposables.append(disposable) | |
} | |
} | |
extension Disposable { | |
public func disposed(by bag: DisposeBag) { | |
bag.insert(self) | |
} | |
} | |
public enum TimeoutResult<T> { | |
case success(T) | |
case timeout | |
} | |
/* | |
An observable stream of values. | |
Note that a base Stream has no way to generate values, so unless you override addObserver, it's not actually useful. | |
*/ | |
public class ValueStream<T> { | |
fileprivate init() {} | |
public func addObserver(didChange: @escaping (T) -> Void) -> Disposable { | |
preconditionFailure("This stream cannot produce values.") | |
} | |
public func addOneShot(didChange: @escaping (T) -> Void) { | |
var remover: Disposable? | |
remover = addObserver { value in | |
didChange(value) | |
remover?.dispose() | |
remover = nil | |
} | |
} | |
public func addOneShot(timeout: TimeInterval, didChange: @escaping (TimeoutResult<T>) -> Void) { | |
var remover: Disposable? | |
remover = addObserver { value in | |
guard let localRemover = remover else { return } | |
remover = nil | |
didChange(.success(value)) | |
localRemover.dispose() | |
} | |
DispatchQueue.main.asyncAfter(deadline: .now() + timeout) { | |
guard let localRemover = remover else { return } | |
remover = nil | |
didChange(.timeout) | |
localRemover.dispose() | |
} | |
} | |
public func map<U>(_ transform: @escaping (T) -> U) -> ValueStream<U> { | |
return ComposedStream(base: self, composer: { didChange in | |
return { value in didChange(transform(value)) } | |
}) | |
} | |
public func compactMap<U>(_ transform: @escaping (T) -> U?) -> ValueStream<U> { | |
return ComposedStream(base: self, composer: { didChange in | |
return { value in | |
if let value = transform(value) { | |
didChange(value) | |
} | |
} | |
}) | |
} | |
public func filter(_ predicate: @escaping (T) -> Bool) -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
return { value in | |
if predicate(value) { didChange(value) } | |
} | |
}) | |
} | |
public func dropFirst(_ count: Int = 1) -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
var remaining = count | |
return { value in | |
if remaining <= 0 { | |
didChange(value) | |
} else { | |
remaining -= 1 | |
} | |
} | |
}) | |
} | |
public func first(_ count: Int = 1) -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
var remaining = count | |
return { value in | |
if remaining > 0 { | |
didChange(value) | |
remaining -= 1 | |
} | |
} | |
}) | |
} | |
public func on(queue: DispatchQueue) -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
return { value in | |
queue.async { didChange(value) } | |
} | |
}) | |
} | |
public func onMainQueue() -> ValueStream<T> { | |
return on(queue: .main) | |
} | |
public func withPrevious() -> ValueStream<(newValue: T, oldValue: T?)> { | |
return ComposedStream(base: self, composer: { didChange in | |
var oldValue: T? | |
return { value in | |
didChange((newValue: value, oldValue: oldValue)) | |
oldValue = value | |
} | |
}) | |
} | |
public func asEventStream() -> ValueStream<Void> { | |
return map { _ in return () } | |
} | |
public func trigger(onRisingEdge predicate: @escaping (T) -> Bool) -> EventStream { | |
return withPrevious().filter { (current, previous) in | |
guard let previous = previous else { return false } // Can't have edge without previous | |
return !predicate(previous) && predicate(current) | |
}.asEventStream() | |
} | |
// Throttle to once per event loop | |
public func throttleTillBlockEnd() -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
var active = true | |
return { value in | |
guard active else { return } | |
didChange(value) | |
active = false | |
DispatchQueue.main.async { active = true } | |
} | |
}) | |
} | |
} | |
extension ValueStream where T: Equatable { | |
public func distinct() -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
var previous: [T] = [] | |
return { value in | |
if !previous.contains(value) { | |
didChange(value) | |
} | |
previous.append(value) | |
} | |
}) | |
} | |
public func distinctUntilChanged() -> ValueStream<T> { | |
return ComposedStream(base: self, composer: { didChange in | |
var previous: T? | |
return { value in | |
if previous != value { | |
didChange(value) | |
} | |
previous = value | |
} | |
}) | |
} | |
} | |
extension ValueStream { | |
public func filterNil<U>() -> ValueStream<U> where T == U? { | |
return compactMap { $0 } | |
} | |
} | |
/* | |
A subject can emit arbitrary values to its observers. | |
(Breaking it out like this prevents calling `emit` on the result of map() which doesn't do anything.) | |
*/ | |
public class Subject<T>: ValueStream<T> { | |
public override init() { super.init() } | |
fileprivate var observers: [UUID: (T) -> Void] = [:] | |
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable { | |
let identifier = UUID() | |
observers[identifier] = didChange | |
return Disposable { [weak self] in | |
if let strongSelf = self { | |
strongSelf.observers.removeValue(forKey: identifier) | |
} | |
} | |
} | |
} | |
public class PublishSubject<T>: Subject<T> { | |
public func emit(_ value: T) { | |
for observer in observers.values { | |
observer(value) | |
} | |
} | |
} | |
public class PublishEventStream: PublishSubject<Void> { | |
public func emit() { | |
super.emit(()) | |
} | |
} | |
private class ComposedStream<Input, Output>: ValueStream<Output> { | |
let base: ValueStream<Input> | |
let composer: (@escaping (Output) -> Void) -> (Input) -> Void | |
init(base: ValueStream<Input>, composer: @escaping (@escaping (Output) -> Void) -> (Input) -> Void) { | |
self.base = base | |
self.composer = composer | |
super.init() | |
} | |
override func addObserver(didChange: @escaping (Output) -> Void) -> Disposable { | |
return base.addObserver(didChange: composer(didChange)) | |
} | |
} | |
/* | |
A Variable is a Stream that makes its value available directly. On subscription it immediately | |
emits its most recent value. (Rx: BehaviorSubject) | |
*/ | |
public class Variable<T>: Subject<T> { | |
public var value: T { | |
didSet { | |
for observer in observers.values { | |
observer(value) | |
} | |
} | |
} | |
public init(_ value: T) { self.value = value } | |
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable { | |
didChange(value) | |
return super.addObserver(didChange: didChange) | |
} | |
} | |
extension Variable: CustomStringConvertible where T: CustomStringConvertible { | |
public var description: String { return value.description } | |
} | |
public class UserDefaultsVariable<T>: Subject<T> { | |
let key: String | |
let defaultValue: T | |
let userDefaults: UserDefaults | |
public var value: T { | |
get { | |
return userDefaults.object(forKey: key) as? T ?? defaultValue | |
} | |
set { | |
userDefaults.set(newValue, forKey: key) | |
for observer in observers.values { | |
observer(value) | |
} | |
} | |
} | |
public init(key: String, defaultValue: T, userDefaults: UserDefaults = .standard) { | |
self.key = key | |
self.defaultValue = defaultValue | |
self.userDefaults = userDefaults | |
} | |
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable { | |
didChange(value) | |
return super.addObserver(didChange: didChange) | |
} | |
} | |
/* | |
Stream of NSNotifications | |
*/ | |
public class NotificationStream: ValueStream<Notification> { | |
let name: NSNotification.Name? | |
let object: Any? | |
let notificationCenter: NotificationCenter | |
public init(forName name: Notification.Name?, object: Any?, notificationCenter: NotificationCenter = .default) { | |
self.name = name | |
self.object = object | |
self.notificationCenter = notificationCenter | |
super.init() | |
} | |
public override func addObserver(didChange: @escaping (Notification) -> Void) -> Disposable { | |
let center = notificationCenter | |
let observer = center.addObserver(forName: name, object: object, queue: nil) { note in | |
didChange(note) | |
} | |
return Disposable { | |
center.removeObserver(observer) | |
} | |
} | |
} | |
/* | |
Stream of KVO changes. Retains observed object. | |
*/ | |
public class KVOStream<Observed: NSObject, Value>: ValueStream<(Observed, NSKeyValueObservedChange<Value>)> { | |
let object: Observed | |
let keyPath: KeyPath<Observed, Value> | |
let options: NSKeyValueObservingOptions | |
public init(object: Observed, keyPath: KeyPath<Observed, Value>, options: NSKeyValueObservingOptions = []) { | |
self.object = object | |
self.keyPath = keyPath | |
self.options = options.union(.new) // Always require the new value for asValues() | |
} | |
public override func addObserver(didChange: @escaping ((Observed, NSKeyValueObservedChange<Value>)) -> Void) -> Disposable { | |
let observation = object.observe(keyPath, options: options, changeHandler: { didChange(($0, $1)) }) | |
return Disposable { observation.invalidate() } | |
} | |
public func asValues() -> ValueStream<Value> { | |
return compactMap { _, change in return change.newValue } | |
} | |
} | |
/* | |
An EventStream is a convenient way to manage Stream<Void>. It is generally used when the observer | |
doesn't care about the value. This avoids the need for "_ in", and makes it easier to combine streams | |
of different types. | |
*/ | |
public typealias EventStream = ValueStream<Void> | |
// Type-erasing protocol in order to ignore "T" | |
public protocol EventStreamConvertible { | |
func asEventStream() -> EventStream | |
} | |
extension ValueStream: EventStreamConvertible {} | |
public class CompositeStream<T>: ValueStream<T> { | |
private let streams: [ValueStream<T>] | |
public init<Streams>(_ streams: Streams) | |
where Streams: Collection, Streams.Element == ValueStream<T> { | |
self.streams = Array(streams) | |
} | |
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable { | |
return DisposeBag(streams.map { $0.addObserver(didChange: didChange) }) | |
} | |
} | |
public func makeTrigger(forAnyOf streams: [EventStreamConvertible]) -> EventStream { | |
return CompositeStream(streams.map { $0.asEventStream() }) | |
} | |
class CombineLatest2Stream<T, U>: ValueStream<(T, U)> { | |
private let streams: (ValueStream<T>, ValueStream<U>) | |
init(_ streams: (ValueStream<T>, ValueStream<U>)) { | |
self.streams = streams | |
} | |
override func addObserver(didChange: @escaping ((T, U)) -> Void) -> Disposable { | |
let disposeBag = DisposeBag() | |
var values: (T?, U?) = (nil, nil) | |
streams.0.addObserver { (t) in | |
values.0 = t | |
if let u = values.1 { didChange((t, u)) } | |
}.disposed(by: disposeBag) | |
streams.1.addObserver { (u) in | |
values.1 = u | |
if let t = values.0 { didChange((t, u)) } | |
}.disposed(by: disposeBag) | |
return disposeBag | |
} | |
} | |
// swiftlint:disable large_tuple | |
class CombineLatest5Stream<T, U, V, W, X>: ValueStream<(T, U, V, W, X)> { | |
private let streams: (ValueStream<T>, ValueStream<U>, ValueStream<V>, ValueStream<W>, ValueStream<X>) | |
init(_ streams: (ValueStream<T>, ValueStream<U>, ValueStream<V>, ValueStream<W>, ValueStream<X>)) { | |
self.streams = streams | |
} | |
override func addObserver(didChange: @escaping ((T, U, V, W, X)) -> Void) -> Disposable { | |
let disposeBag = DisposeBag() | |
var values: (T?, U?, V?, W?, X?) = (nil, nil, nil, nil, nil) | |
streams.0.addObserver { (t) in | |
values.0 = t | |
if let u = values.1, | |
let v = values.2, | |
let w = values.3, | |
let x = values.4 { didChange((t, u, v, w, x)) } | |
}.disposed(by: disposeBag) | |
streams.1.addObserver { (u) in | |
values.1 = u | |
if let t = values.0, | |
let v = values.2, | |
let w = values.3, | |
let x = values.4 { didChange((t, u, v, w, x)) } | |
}.disposed(by: disposeBag) | |
streams.2.addObserver { (v) in | |
values.2 = v | |
if let t = values.0, | |
let u = values.1, | |
let w = values.3, | |
let x = values.4 { didChange((t, u, v, w, x)) } | |
}.disposed(by: disposeBag) | |
streams.3.addObserver { (w) in | |
values.3 = w | |
if let t = values.0, | |
let u = values.1, | |
let v = values.2, | |
let x = values.4 { didChange((t, u, v, w, x)) } | |
}.disposed(by: disposeBag) | |
streams.4.addObserver { (x) in | |
values.4 = x | |
if let t = values.0, | |
let u = values.1, | |
let v = values.2, | |
let w = values.3 { didChange((t, u, v, w, x)) } | |
}.disposed(by: disposeBag) | |
return disposeBag | |
} | |
} | |
public func combineLatest<T, U>(_ stream1: ValueStream<T>, _ stream2: ValueStream<U>) -> ValueStream<(T, U)> { | |
return CombineLatest2Stream((stream1, stream2)) | |
} | |
public func combineLatest<T, U, V, W, X>(_ stream1: ValueStream<T>, | |
_ stream2: ValueStream<U>, | |
_ stream3: ValueStream<V>, | |
_ stream4: ValueStream<W>, | |
_ stream5: ValueStream<X>) -> ValueStream<(T, U, V, W, X)> { | |
return CombineLatest5Stream((stream1, stream2, stream3, stream4, stream5)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment