Created
December 1, 2019 19:29
-
-
Save ipavlidakis/8c412bd788f5c93bcb72f00ce0398c1a to your computer and use it in GitHub Desktop.
Universal Combine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
// | |
// UniversalCombine.swift | |
// UniversalCombine | |
// | |
// Created by Ilias Pavlidakis on 30/11/2019. | |
// | |
import Foundation | |
class UniversalCombine {} | |
protocol Publisher { | |
associatedtype Output | |
associatedtype Failure: Error | |
func subscribe<S: Subscriber>(_ subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input | |
func subscribe<S: Subscriber>(subscriber: S) -> Cancellable? where Self.Failure == S.Failure, Self.Output == S.Input | |
func eraseToAnyPublisher() -> AnyPublisher<Output, Failure> | |
} | |
extension Publisher { | |
func eraseToAnyPublisher() -> AnyPublisher<Output, Failure> { AnyPublisher<Output, Failure>(publisher: self) } | |
} | |
protocol Subscriber: class { | |
associatedtype Input | |
associatedtype Failure: Error | |
func receive(subscription: Subscription) | |
func receive(_ input: Input) | |
func receive(completion: Subscribers.Completion<Failure>) | |
func eraseToAnySubscriber() -> AnySubscriber<Input, Failure> | |
} | |
extension Subscriber { | |
func eraseToAnySubscriber() -> AnySubscriber<Input, Failure> { AnySubscriber<Input, Failure>(subscriber: self) } | |
} | |
protocol Cancellable { | |
func cancel() | |
} | |
protocol Subscription { | |
func cancel() | |
} | |
protocol Subject: Publisher, Subscriber { | |
associatedtype Upstream: Publisher | |
associatedtype Downstream: Subscriber | |
func send(_ input: Upstream.Output) | |
} | |
// CONCRETE | |
enum Subscribers { | |
enum Completion<Failure> where Failure : Error { | |
case failed(_ error: Failure) | |
case completed | |
} | |
} | |
extension Publisher { | |
func map<T>( | |
_ transform: @escaping (Self.Output) -> T) -> AnyPublisher<T, Self.Failure> { | |
Publishers.Map<Self, _Subscriber<T, Self.Failure>>(upstream: self.eraseToAnyPublisher()) { transform($0) }.eraseToAnyPublisher() | |
} | |
func compactMap<T>( | |
_ transform: @escaping (Self.Output) -> Optional<T>) -> AnyPublisher<Optional<T>, Self.Failure> { | |
Publishers.Map<Self, _Subscriber<Optional<T>, Self.Failure>>(upstream: self.eraseToAnyPublisher()) { transform($0) }.eraseToAnyPublisher() | |
} | |
func print() -> AnyPublisher<Self.Output, Self.Failure> { | |
Publishers.Print<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher()).eraseToAnyPublisher() | |
} | |
func debugPrint() -> AnyPublisher<Self.Output, Self.Failure> { | |
Publishers.DebugPrint<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher()).eraseToAnyPublisher() | |
} | |
func dispatch(on dispatchQueue: DispatchQueue) -> AnyPublisher<Self.Output, Self.Failure> { | |
Publishers.DispatchOn<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher(), dispatchQueue: dispatchQueue).eraseToAnyPublisher() | |
} | |
func threadInfo() -> AnyPublisher<Self.Output, Self.Failure> { | |
Publishers.ThreadReport<Self, _Subscriber<Self.Output, Self.Failure>>(upstream: self.eraseToAnyPublisher()).eraseToAnyPublisher() | |
} | |
} | |
enum Publishers { | |
final class Map<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure {} | |
final class CompactMap<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure {} | |
final class Print<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input { | |
private var transform: (Downstream.Input) -> Void = { Swift.print($0) } | |
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>, _ transform: @escaping (Downstream.Input) -> Void = { Swift.print($0) }) { | |
self.transform = transform | |
super.init(upstream: upstream) { $0 } | |
} | |
override func receive(_ input: Input) { | |
transform(input) | |
super.receive(input) | |
} | |
} | |
final class DebugPrint<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input { | |
private var transform: (Downstream.Input) -> Void = { Swift.debugPrint($0) } | |
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>, _ transform: @escaping (Downstream.Input) -> Void = { Swift.debugPrint($0) }) { | |
self.transform = transform | |
super.init(upstream: upstream) { $0 } | |
} | |
override func receive(_ input: Input) { | |
transform(input) | |
super.receive(input) | |
} | |
} | |
final class DispatchOn<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input { | |
private let dispatchQueue: DispatchQueue | |
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>, dispatchQueue: DispatchQueue) { | |
self.dispatchQueue = dispatchQueue | |
super.init(upstream: upstream) { $0 } | |
} | |
override func receive(_ input: Input) { dispatchQueue.async { super.receive(input) } } | |
} | |
final class ThreadReport<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream> where Upstream.Failure == Downstream.Failure, Upstream.Output == Downstream.Input { | |
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>) { | |
super.init(upstream: upstream) { $0 } | |
} | |
override func receive(_ input: Input) { | |
Swift.print("Is this thread, the main thread: \(Thread.current.isMainThread)") | |
super.receive(input) | |
} | |
} | |
} | |
final class AnySubscription<Input, Output, Failure: Error>: Subscription, Cancellable { | |
private let cancellable: Cancellable | |
private var publisher: AnyPublisher<Output, Failure>? | |
private var subscriber: AnySubscriber<Input, Failure>? | |
init( | |
cancellable: Cancellable, | |
publisher: AnyPublisher<Output, Failure>, | |
subscriber: AnySubscriber<Input, Failure>) { | |
self.cancellable = cancellable | |
self.publisher = publisher | |
self.subscriber = subscriber | |
} | |
func cancel() { | |
cancellable.cancel() | |
publisher = nil | |
subscriber = nil | |
} | |
} | |
final class AnyCancellable: Cancellable { | |
private let cancellationClosure: () -> Void | |
init(cancellationClosure: @escaping () -> Void) { self.cancellationClosure = cancellationClosure } | |
func cancel() { cancellationClosure() } | |
} | |
final class AnyPublisher<Output, Failure: Error>: Publisher, Equatable, Hashable { | |
private let identifier: AnyHashable | |
private let _publisher: Any | |
private let _subscribeWithoutCancellable: (AnySubscriber<Output, Failure>) -> Void | |
private let _subscribeWithCancellable: (AnySubscriber<Output, Failure>) -> Cancellable? | |
static func == (lhs: AnyPublisher<Output, Failure>, rhs: AnyPublisher<Output, Failure>) -> Bool { lhs.identifier == rhs.identifier } | |
class func create() -> AnyPublisher<Output, Failure> { | |
AnyPublisher<Output, Failure>(publisher: _Publisher<Output, Failure>()) | |
} | |
init<P: Publisher>(identifier: AnyHashable = UUID(), publisher: P) where P.Output == Output, P.Failure == Failure { | |
self.identifier = identifier | |
self._publisher = publisher | |
self._subscribeWithoutCancellable = { publisher.subscribe($0) } | |
self._subscribeWithCancellable = { publisher.subscribe(subscriber: $0) } | |
} | |
func hash(into hasher: inout Hasher) { hasher.combine(identifier) } | |
func subscribe<S>(_ subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { _subscribeWithoutCancellable(subscriber.eraseToAnySubscriber()) } | |
func subscribe<S>(subscriber: S) -> Cancellable? where S : Subscriber, Failure == S.Failure, Output == S.Input { _subscribeWithCancellable(subscriber.eraseToAnySubscriber()) } | |
} | |
final class AnySubscriber<Input, Failure: Error>: Subscriber, Equatable, Hashable { | |
private let identifier: AnyHashable | |
private let _subscriber: Any | |
private let _receiveSubscription: (Subscription) -> () | |
private let _receiveInput: (Input) -> () | |
private let _receiveCompletion: (Subscribers.Completion<Failure>) -> Void | |
static func == (lhs: AnySubscriber<Input, Failure>, rhs: AnySubscriber<Input, Failure>) -> Bool { lhs.identifier == rhs.identifier } | |
static func create<Input, Failure: Error>() -> AnySubscriber<Input, Failure> { | |
AnySubscriber<Input, Failure>(subscriber: _Subscriber<Input, Failure>()) | |
} | |
init<S: Subscriber>(identifier: AnyHashable = UUID(), subscriber: S) where S.Input == Input, S.Failure == Failure { | |
self.identifier = identifier | |
self._subscriber = subscriber | |
self._receiveSubscription = { [weak subscriber] in if let subscriber = subscriber { subscriber.receive(subscription: $0) } } | |
self._receiveInput = { [weak subscriber] in if let subscriber = subscriber { subscriber.receive($0) } } | |
self._receiveCompletion = { [weak subscriber] in if let subscriber = subscriber { subscriber.receive(completion: $0) } } | |
} | |
func hash(into hasher: inout Hasher) { hasher.combine(identifier) } | |
func receive(subscription: Subscription) { _receiveSubscription(subscription) } | |
func receive(_ input: Input) { _receiveInput(input) } | |
func receive(completion: Subscribers.Completion<Failure>) { _receiveCompletion(completion) } | |
} | |
final class AnySubject<Upstream: Publisher, Downstream: Subscriber>: _Subject<Upstream, Downstream>, Equatable, Hashable where Downstream.Failure == Upstream.Failure { | |
private let identifier: AnyHashable | |
private let _subject: Any | |
private let _subscribeWithoutCancellable: (AnySubscriber<Downstream.Input, Downstream.Failure>) -> Void | |
private let _subscribeWithCancellable: (AnySubscriber<Downstream.Input, Downstream.Failure>) -> Cancellable? | |
private let _receiveSubscription: (Subscription) -> () | |
private let _receiveInput: (Upstream.Output) -> () | |
private let _receiveCompletion: (Subscribers.Completion<Failure>) -> Void | |
static func == (lhs: AnySubject<Upstream, Downstream>, rhs: AnySubject<Upstream, Downstream>) -> Bool { lhs.identifier == rhs.identifier } | |
class func create( | |
upstream: AnyPublisher<Upstream.Output, Upstream.Failure>? = nil, | |
transform: @escaping (Upstream.Output) -> Downstream.Input | |
) -> AnySubject<Upstream, Downstream> { | |
AnySubject<Upstream, Downstream>(subject: _Subject<Upstream, Downstream>(upstream: upstream, transform)) | |
} | |
init<S: Subject>( | |
identifier: AnyHashable = UUID(), | |
subject: S | |
) where S.Failure == Upstream.Failure, S.Output == Downstream.Input, S.Input == Upstream.Output { | |
self.identifier = identifier | |
self._subject = subject | |
self._subscribeWithoutCancellable = { subject.subscribe($0) } | |
self._subscribeWithCancellable = { subject.subscribe(subscriber: $0) } | |
self._receiveSubscription = { [weak subject] in if let subject = subject { subject.receive(subscription: $0) } } | |
self._receiveInput = { [weak subject] in if let subject = subject { subject.receive($0) } } | |
self._receiveCompletion = { [weak subject] in if let subject = subject { subject.receive(completion: $0) } } | |
super.init { _ in fatalError("Transform call shouldn't be forwarded") } | |
} | |
func hash(into hasher: inout Hasher) { hasher.combine(identifier) } | |
override func subscribe<S: Subscriber>(_ subscriber: S) where Failure == S.Failure, Output == S.Input { _subscribeWithoutCancellable(subscriber.eraseToAnySubscriber()) } | |
override func subscribe<S>(subscriber: S) -> Cancellable? where S : Subscriber, Failure == S.Failure, Output == S.Input { _subscribeWithCancellable(subscriber.eraseToAnySubscriber()) } | |
override func receive(subscription: Subscription) { _receiveSubscription(subscription) } | |
override func receive(_ input: Input) { _receiveInput(input) } | |
override func receive(completion: Subscribers.Completion<Failure>) { _receiveCompletion(completion) } | |
} | |
class _Publisher<Output, Failure: Error>: Publisher { | |
private var subscribers: [AnySubscriber<Output, Failure>] = [] | |
private func cancellable(for anySubscriber: AnySubscriber<Output, Failure>) -> Cancellable { | |
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } } | |
} | |
func subscribe<S: Subscriber>(_ subscriber: S) where Failure == S.Failure, Output == S.Input { | |
let anySubscriber = subscriber.eraseToAnySubscriber() | |
subscribers.append(anySubscriber) | |
subscriber.receive( | |
subscription: AnySubscription( | |
cancellable: cancellable(for: anySubscriber), | |
publisher: eraseToAnyPublisher(), | |
subscriber: anySubscriber)) | |
} | |
func subscribe<S: Subscriber>(subscriber: S) -> Cancellable? where Failure == S.Failure, Output == S.Input { | |
let anySubscriber = subscriber.eraseToAnySubscriber() | |
subscribers.append(anySubscriber) | |
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } } | |
} | |
} | |
class _Subscriber<Input, Failure: Error>: Subscriber { | |
private var subscription: Subscription? | |
func receive(subscription: Subscription) { self.subscription = subscription } | |
func receive(_ input: Input) {} | |
func receive(completion: Subscribers.Completion<Failure>) { subscription?.cancel() } | |
} | |
class _Subject<Upstream: Publisher, Downstream: Subscriber>: _Subscriber<Upstream.Output, Upstream.Failure>, Subject where Downstream.Failure == Upstream.Failure { | |
typealias Output = Downstream.Input | |
typealias Failure = Upstream.Failure | |
typealias Input = Upstream.Output | |
private let _transform: (Upstream.Output) -> Downstream.Input | |
private var subscribers: [AnySubscriber<Output, Failure>] = [] | |
private var subscription: Subscription? | |
convenience init<Down: Subscriber>(_ up: AnyPublisher<Upstream.Output, Upstream.Failure>, _ down: Down) where Down.Input == Downstream.Input, Upstream.Output == Down.Input { | |
self.init(upstream: up) { $0 } | |
} | |
init(upstream: AnyPublisher<Upstream.Output, Upstream.Failure>? = nil, _ transform: @escaping (Upstream.Output) -> Downstream.Input) { | |
self._transform = transform | |
super.init() | |
upstream?.subscribe(self) | |
} | |
private func cancellable(for anySubscriber: AnySubscriber<Output, Failure>) -> Cancellable { | |
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } } | |
} | |
// Publisher | |
func subscribe<S: Subscriber>(_ subscriber: S) where Failure == S.Failure, Output == S.Input { | |
let anySubscriber = subscriber.eraseToAnySubscriber() | |
subscribers.append(anySubscriber) | |
subscriber.receive( | |
subscription: AnySubscription( | |
cancellable: cancellable(for: anySubscriber), | |
publisher: self.eraseToAnyPublisher(), | |
subscriber: anySubscriber)) | |
} | |
func subscribe<S: Subscriber>(subscriber: S) -> Cancellable? where Failure == S.Failure, Output == S.Input { | |
let anySubscriber = subscriber.eraseToAnySubscriber() | |
subscribers.append(anySubscriber) | |
return AnyCancellable { self.subscribers = self.subscribers.filter { $0 != anySubscriber } } | |
} | |
// Subscriber | |
override func receive(_ input: Input) { | |
let transformedValue = _transform(input) | |
subscribers.forEach { $0.receive(transformedValue) } | |
} | |
// Subject | |
func send(_ input: Upstream.Output) { | |
receive(input) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment