Last active
September 10, 2020 23:48
-
-
Save mattadatta/b3dc053ec0fdcefeef96ff2d754bae0f to your computer and use it in GitHub Desktop.
Rx's Observable.create -> Swift Combine "Anonymous"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
struct Anonymous<Output, Failure> : Publisher where Failure : Error { | |
var onSubscribe: (Observer) -> Cancellable | |
func receive<S>(subscriber: S) where Output == S.Input, Failure == S.Failure, S : Combine.Subscriber { | |
subscriber.receive(subscription: Sink(parent: self, downstream: subscriber)) | |
} | |
} | |
extension Anonymous { | |
struct Observer { | |
private let _send: (Output) -> Void | |
private let _sendCompletion: (Subscribers.Completion<Failure>) -> Void | |
fileprivate init<S>(sink: Sink<S>) where S : Combine.Subscriber { | |
self._send = { sink.receive($0) } | |
self._sendCompletion = { sink.receive(completion: $0) } | |
} | |
func send(_ input: Output) { | |
self._send(input) | |
} | |
func send(completion: Subscribers.Completion<Failure>) { | |
self._sendCompletion(completion) | |
} | |
} | |
} | |
private extension Anonymous { | |
final class Sink<Subscriber> : Subscription where Subscriber : Combine.Subscriber, Output == Subscriber.Input, Failure == Subscriber.Failure { | |
let parent: Anonymous | |
let downstream: Subscriber | |
var onDispose: Cancellable? | |
var demand: Subscribers.Demand = .none | |
let lock = NSRecursiveLock() | |
init(parent: Anonymous, downstream: Subscriber) { | |
self.parent = parent | |
self.downstream = downstream | |
} | |
deinit { | |
self.dispose() | |
} | |
private func dispose() { | |
self.lock.lock() | |
guard let onDispose = self.onDispose else { | |
self.lock.unlock() | |
return | |
} | |
self.lock.unlock() | |
onDispose.cancel() | |
self.lock.lock() | |
self.onDispose = nil | |
self.lock.lock() | |
} | |
func request(_ demand: Subscribers.Demand) { | |
self.lock.lock() | |
if self.demand == .none, self.onDispose == nil { | |
self.demand += demand | |
self.lock.unlock() | |
let onDispose = self.parent.onSubscribe(Observer(sink: self)) | |
self.lock.lock() | |
self.onDispose = onDispose | |
self.lock.unlock() | |
} else { | |
self.demand += demand | |
self.lock.unlock() | |
} | |
} | |
func cancel() { | |
self.dispose() | |
} | |
func receive(_ input: Output) { | |
self.lock.lock() | |
if self.demand > 0 { | |
self.demand -= 1 | |
self.lock.unlock() | |
let newDemand = self.downstream.receive(input) | |
self.lock.lock() | |
self.demand += newDemand | |
self.lock.unlock() | |
} else { | |
self.lock.unlock() | |
} | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
self.downstream.receive(completion: completion) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You could also add this one: