Created
January 28, 2023 15:30
-
-
Save marcpalmer/6bb596d3cf7ccc3cce7dd751935ec264 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
extension Publishers { | |
/// A publisher that publishers any values that are immediately available on the current thread/queue, | |
/// but anything received after the subscription process completes is published on a different scheduler. | |
/// | |
/// This allows you do perform immediate state updates at the point of subscribing without having | |
/// `.receive(on:)` force every single result to require a scheduler hop. | |
public struct ReceiveAsyncResultsOn<Upstream, SchedulerType>: Publisher where Upstream: Publisher, SchedulerType: Scheduler { | |
public typealias Output = Upstream.Output | |
public typealias Failure = Upstream.Failure | |
public let upstream: Upstream | |
public let scheduler: SchedulerType | |
public init(upstream: Upstream, | |
scheduler: SchedulerType) { | |
self.upstream = upstream | |
self.scheduler = scheduler | |
} | |
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input { | |
// Subscribe our proxy to this publisher pipeline | |
let subscription = ProxySubscription(asyncScheduler: scheduler, target: subscriber) | |
upstream.subscribe(subscription) | |
// Tell the subscriber to receive our proxy | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
public class ProxySubscription<Target: Subscriber, SchedulerType>: Subscription, Subscriber where SchedulerType: Scheduler { | |
public typealias Input = Target.Input | |
public typealias Failure = Target.Failure | |
private var upstreamSubscription: Subscription? | |
private var isFirstDemandRequest = true | |
private let asyncScheduler: SchedulerType | |
private let target: Target | |
init(asyncScheduler: SchedulerType, target: Target) { | |
self.asyncScheduler = asyncScheduler | |
self.target = target | |
} | |
// Subscription | |
public func request(_ demand: Subscribers.Demand) { | |
guard let upstreamSubscription = upstreamSubscription else { | |
fatalError("No upstream subscription received yet") | |
} | |
upstreamSubscription.request(demand) // This will start spraying results at us | |
isFirstDemandRequest = false | |
} | |
public func cancel() { | |
upstreamSubscription?.cancel() | |
} | |
// Subscriber | |
public func receive(completion: Subscribers.Completion<Target.Failure>) { | |
target.receive(completion: completion) | |
} | |
public func receive(_ input: Input) -> Subscribers.Demand { | |
if isFirstDemandRequest { | |
return target.receive(input) | |
} else { | |
asyncScheduler.schedule { [weak self] in | |
guard let strongSelf = self else { | |
return | |
} | |
let newDemand = strongSelf.target.receive(input) | |
if newDemand != .none { | |
strongSelf.upstreamSubscription?.request(newDemand) | |
} | |
} | |
return .none | |
} | |
} | |
public func receive(subscription: Subscription) { | |
// We'll be passed the upstream here, and we forward demand requests to it | |
upstreamSubscription = subscription | |
} | |
} | |
} | |
extension Publisher { | |
// Usage: publisher.receiveAsyncResults(on: DispatchQueue.main) | |
// | |
// This will deliver any existing results in the publisher immediately, and layer results will be received asynchronously | |
// on the main queue. So the first delivery does not require an async thread hop - if the publisher can deliver them | |
// when subscribing you will get them immediately on the current queue, whatever that is. | |
// | |
// This is most useful when the publisher emits values from a non-main queue and you need them on a non-main queue, | |
// but you want immediate results at the point of subscribing. Care must be take to make the subscription | |
// processing threadsafe as it may be called on the publisher's queue or your desired async queue. | |
public func receiveAsyncResults<S>(on scheduler: S) -> Publishers.ReceiveAsyncResultsOn<Self, S> where S: Scheduler { | |
return Publishers.ReceiveAsyncResultsOn(upstream: self, scheduler: scheduler) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment