-
-
Save rnapier/a5007ff58a3d49cda4ef0039d6d59c21 to your computer and use it in GitHub Desktop.
import Testing | |
import Combine | |
// From https://stackoverflow.com/questions/78892734/getting-task-isolated-value-of-type-async-passed-as-a-strongly-trans/78899940#78899940 | |
public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable { | |
public typealias Promise = @Sendable (Result<Output, Failure>) -> Void | |
private let work: @Sendable (@escaping Promise) async -> Void | |
public init(_ work: @Sendable @escaping (@escaping Promise) async -> Void) { | |
self.work = work | |
} | |
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable { | |
let subscription = AsyncSubscription(subscriber: subscriber, work: work) | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
private extension AsyncFuture { | |
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable { | |
private var subscriber: S? | |
private let task: Task<Void, Error> | |
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) { | |
self.subscriber = subscriber | |
task = Task { | |
await work { result in | |
switch result { | |
case .success(let output): | |
_ = subscriber.receive(output) | |
subscriber.receive(completion: .finished) | |
case .failure(let failure): | |
subscriber.receive(completion: .failure(failure)) | |
} | |
} | |
} | |
} | |
func request(_ demand: Subscribers.Demand) { } | |
func cancel() { | |
subscriber = nil | |
task.cancel() | |
} | |
} | |
} | |
struct AsyncFutureTests { | |
// Run this test 100 times | |
@Test func immediatelySuccessfulFutureSucceedsImmediately() async throws { | |
try await confirmation() { receiveExactlyOneValue in | |
try await confirmation { loopTerminates in | |
// Create an immediate Future and succeed | |
let future = AsyncFuture<Int, Error> { promise in | |
promise(.success(42)) | |
} | |
// Read its first value | |
for try await value in future.values { | |
receiveExactlyOneValue() // This will fail if it's already been called | |
#expect(value == 42) | |
} | |
loopTerminates() | |
} | |
} | |
} | |
struct TestError: Error {} | |
@Test func immediatelyFailingFutureFailsImmediately() async throws { | |
// Create an immediate Future and fail | |
let future = AsyncFuture<Int, Error> { promise in | |
promise(.failure(TestError())) | |
} | |
await #expect(throws: TestError.self) { | |
// Read its first value; it should throw | |
for try await _ in future.values { | |
Issue.record("Should not receive any values") | |
} | |
} | |
} | |
} |
// Attempt to correctly handle demand. Not sure if this could be done more simply. | |
private extension AsyncFuture { | |
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable { | |
private var subscriber: S? | |
private let workTask: Task<Void, Error> | |
private var readTask: Task<Void, Error>? | |
private let (stream, continuation) = AsyncStream.makeStream(of: Result<Output, Failure>.self, bufferingPolicy: .bufferingOldest(1)) | |
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) { | |
self.subscriber = subscriber | |
workTask = Task { [continuation] in | |
await work { result in | |
switch result { | |
case .success(let output): | |
continuation.yield(.success(output)) | |
case .failure(let failure): | |
continuation.yield(.failure(failure)) | |
} | |
continuation.finish() | |
} | |
} | |
} | |
func request(_ demand: Subscribers.Demand) { | |
if let subscriber, demand > 0 { | |
readTask = Task { | |
for await result in stream { | |
switch result { | |
case .success(let value): | |
_ = subscriber.receive(value) | |
subscriber.receive(completion: .finished) | |
case .failure(let failure): | |
subscriber.receive(completion: .failure(failure)) | |
} | |
} | |
} | |
} | |
} | |
func cancel() { | |
subscriber = nil | |
workTask.cancel() | |
readTask?.cancel() | |
} | |
} | |
} |
private extension AsyncFuture { | |
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable { | |
private var subscriber: S? | |
private var task: Task<Void, Error>? | |
private var work: (@Sendable (@escaping Promise) async -> Void)? | |
init(subscriber: S, work: @Sendable @escaping (@escaping Promise) async -> Void) { | |
self.subscriber = subscriber | |
self.work = work | |
} | |
func request(_ demand: Subscribers.Demand) { | |
if task == nil, let subscriber, let work, demand > 0 { | |
task = Task { | |
await work { result in | |
switch result { | |
case .success(let output): | |
_ = subscriber.receive(output) | |
subscriber.receive(completion: .finished) | |
case .failure(let failure): | |
subscriber.receive(completion: .failure(failure)) | |
} | |
} | |
} | |
self.subscriber = nil | |
self.work = nil | |
} | |
} | |
func cancel() { | |
subscriber = nil | |
task?.cancel() | |
task = nil | |
work = nil | |
} | |
} | |
} |
Having stewed on this a bit (sitting in the dark in my house in Los Angeles that has no power; we’re at no immediate threat of the wildfires, but have been without power for the last 24 hours and will likely not get power until tomorrow; I’m stumbling along on solar power right now; lol), I didn’t like my synchronization in the above. I also wanted to abstract away my subscriber’s business logic into an extension. So perhaps:
/// AsyncAwaitFuture
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable {
public typealias Promise = @Sendable (Result<Output, Failure>) -> Void
public typealias Work = @Sendable (@escaping Promise) async -> Void
private let work: Work
public init(_ work: @escaping Work) {
self.work = work
}
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
let subscription = AsyncFutureSubscription(subscriber: subscriber, work: work)
subscriber.receive(subscription: subscription)
}
}
private extension AsyncFuture {
final class AsyncFutureSubscription<S: Subscriber>: Subscription, @unchecked Sendable where S.Input == Output, S.Failure == Failure, S: Sendable {
private let lock = NSLock()
private var subscriber: S?
private var task: Task<Void, Error>?
private var result: Result<Output, Failure>? // keep buffer of published values; in this case, there is only one value, so a simple variable is sufficient
private var demand: Subscribers.Demand = .none // keep track of how much pending demand by the subscriber
init(subscriber: S, work: @escaping Work) {
self.subscriber = subscriber
task = Task {
await work { [self] result in
lock.withLock {
publisherProvided(result)
}
}
}
}
func cancel() {
lock.withLock {
task?.cancel()
task = nil
subscriber = nil
}
}
func request(_ demand: Subscribers.Demand) {
lock.withLock {
subscriberRequested(demand)
}
}
}
}
private extension AsyncFuture.AsyncFutureSubscription {
/// Publisher has provided a result
///
/// If subscriber has already requested a result, then just send it.
/// If subscriber has not yet requested result, then just save this result for future reference.
func publisherProvided(_ result: Result<Output, Failure>) {
if demand > .none {
sendOutputToSubscriber(result: result)
} else {
self.result = result
}
}
/// Subscriber has requested value
///
/// If publisher has already provided a result and subscriber demand > .none, then send it.
/// If publisher has not, just update the local demand count.
func subscriberRequested(_ additionaDemand: Subscribers.Demand) {
demand += additionaDemand
if let result, demand > .none {
sendOutputToSubscriber(result: result)
}
}
/// Send output to subscriber
///
/// Called only when both of the following are satisfied:
/// * publisher has provided a result to be sent; and
/// * subscriber has requested demand.
func sendOutputToSubscriber(result: Result<Output, Failure>) {
demand -= 1
switch result {
case .success(let output):
_ = subscriber?.receive(output)
subscriber?.receive(completion: .finished)
case .failure(let failure):
subscriber?.receive(completion: .failure(failure))
}
}
}
@rnapier – Frankly, the whole “future/promise” pattern is wrong with Swift concurrency (notably the “promise” part). With Swift concurrency, we don’t return
until we are done awaiting the async routines. We only need promises in legacy patterns (delegates, completion handler closures, etc.). But in Swift concurrency, we don’t return
until the async work is done, anyway. I’ve updated my S.O. answer with that pattern.
@rnapier
You are absolutely right that the subscription should not send data to the subscriber before it requests it. Most “implement custom publisher” solutions gloss over this issue entirely (and only work because the publisher generally publishes more slowly than they are consumed).
IMHO, the right general pattern (as
AsyncStream
illustrates) is to buffer the published values and only send them whenrequest
indicates sufficientDemand
. In the case of aFuture
(the publishing of a single element), this can be quite simple:request
with aDemand
, save the value for later, so we can send it to the subscriber when its readyrequest
with aDemand
:So, this is what I would be inclined to do:
The only thing that makes this a little messy is that in order to satisfy strict concurrency checking and/or Swift 6, I made this
Sendable
(and added the necessary synchronization).But, anyway, this is how I'd be inclined to tackle it: Buffer the value if it has not yet been requested by subscriber, and send it as soon as the subscriber requests it.