Skip to content

Instantly share code, notes, and snippets.

@mattadatta
Last active September 10, 2020 23:48
Show Gist options
  • Save mattadatta/b3dc053ec0fdcefeef96ff2d754bae0f to your computer and use it in GitHub Desktop.
Save mattadatta/b3dc053ec0fdcefeef96ff2d754bae0f to your computer and use it in GitHub Desktop.
Rx's Observable.create -> Swift Combine "Anonymous"
import Foundation
import Combine
struct Anonymous<Output, Failure> : Publisher where Failure : Error {
var onSubscribe: (Observer) -> Cancellable
func receive<S>(subscriber: S) where Output == S.Input, Failure == S.Failure, S : Combine.Subscriber {
subscriber.receive(subscription: Sink(parent: self, downstream: subscriber))
}
}
extension Anonymous {
struct Observer {
private let _send: (Output) -> Void
private let _sendCompletion: (Subscribers.Completion<Failure>) -> Void
fileprivate init<S>(sink: Sink<S>) where S : Combine.Subscriber {
self._send = { sink.receive($0) }
self._sendCompletion = { sink.receive(completion: $0) }
}
func send(_ input: Output) {
self._send(input)
}
func send(completion: Subscribers.Completion<Failure>) {
self._sendCompletion(completion)
}
}
}
private extension Anonymous {
final class Sink<Subscriber> : Subscription where Subscriber : Combine.Subscriber, Output == Subscriber.Input, Failure == Subscriber.Failure {
let parent: Anonymous
let downstream: Subscriber
var onDispose: Cancellable?
var demand: Subscribers.Demand = .none
let lock = NSRecursiveLock()
init(parent: Anonymous, downstream: Subscriber) {
self.parent = parent
self.downstream = downstream
}
deinit {
self.dispose()
}
private func dispose() {
self.lock.lock()
guard let onDispose = self.onDispose else {
self.lock.unlock()
return
}
self.lock.unlock()
onDispose.cancel()
self.lock.lock()
self.onDispose = nil
self.lock.lock()
}
func request(_ demand: Subscribers.Demand) {
self.lock.lock()
if self.demand == .none, self.onDispose == nil {
self.demand += demand
self.lock.unlock()
let onDispose = self.parent.onSubscribe(Observer(sink: self))
self.lock.lock()
self.onDispose = onDispose
self.lock.unlock()
} else {
self.demand += demand
self.lock.unlock()
}
}
func cancel() {
self.dispose()
}
func receive(_ input: Output) {
self.lock.lock()
if self.demand > 0 {
self.demand -= 1
self.lock.unlock()
let newDemand = self.downstream.receive(input)
self.lock.lock()
self.demand += newDemand
self.lock.unlock()
} else {
self.lock.unlock()
}
}
func receive(completion: Subscribers.Completion<Failure>) {
self.downstream.receive(completion: completion)
}
}
}
@Mackarous
Copy link

You could also add this one:

extension AnyPublisher {
    init(_ subscribe: @escaping (PassthroughSubject<Output, Failure>) -> AnyCancellable) {
        self = .create(subscribe: subscribe)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment