Skip to content

Instantly share code, notes, and snippets.

@daltonclaybrook
Created December 27, 2021 21:52
Show Gist options
  • Select an option

  • Save daltonclaybrook/63abef0fc830d704a23a08cd06198cb2 to your computer and use it in GitHub Desktop.

Select an option

Save daltonclaybrook/63abef0fc830d704a23a08cd06198cb2 to your computer and use it in GitHub Desktop.
import Combine
struct AwaitPublisher<Value>: Publisher {
typealias Output = Value
typealias Failure = Never
private let block: () async -> Value
init(block: @escaping () async -> Value) {
self.block = block
}
func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Value == S.Input {
subscriber.receive(subscription: Subscription(downstream: subscriber, block: block))
}
}
extension AwaitPublisher {
final class Subscription<Downstream: Subscriber>: Combine.Subscription
where Output == Downstream.Input, Failure == Downstream.Failure {
private let downstream: Downstream
private let block: () async -> Value
private var currentTask: Task<Void, Never>?
init(downstream: Downstream, block: @escaping () async -> Value) {
self.downstream = downstream
self.block = block
}
func request(_ demand: Subscribers.Demand) {
guard demand > .none, currentTask == nil else { return }
currentTask = Task { [weak self] in
let value = await block()
_ = self?.downstream.receive(value)
self?.downstream.receive(completion: .finished)
}
}
func cancel() {
guard let task = currentTask, !task.isCancelled else { return }
currentTask = nil
task.cancel()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment