Last active
January 11, 2025 19:54
-
-
Save rnapier/a5007ff58a3d49cda4ef0039d6d59c21 to your computer and use it in GitHub Desktop.
AsyncFuture
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Testing | |
import Combine | |
// From https://stackoverflow.com/questions/78892734/getting-task-isolated-value-of-type-async-passed-as-a-strongly-trans/78899940#78899940 | |
public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable { | |
public typealias Promise = @Sendable (Result<Output, Failure>) -> Void | |
private let work: @Sendable (@escaping Promise) async -> Void | |
public init(_ work: @Sendable @escaping (@escaping Promise) async -> Void) { | |
self.work = work | |
} | |
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable { | |
let subscription = AsyncSubscription(subscriber: subscriber, work: work) | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
private extension AsyncFuture { | |
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable { | |
private var subscriber: S? | |
private let task: Task<Void, Error> | |
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) { | |
self.subscriber = subscriber | |
task = Task { | |
await work { result in | |
switch result { | |
case .success(let output): | |
_ = subscriber.receive(output) | |
subscriber.receive(completion: .finished) | |
case .failure(let failure): | |
subscriber.receive(completion: .failure(failure)) | |
} | |
} | |
} | |
} | |
func request(_ demand: Subscribers.Demand) { } | |
func cancel() { | |
subscriber = nil | |
task.cancel() | |
} | |
} | |
} | |
struct AsyncFutureTests { | |
// Run this test 100 times | |
@Test func immediatelySuccessfulFutureSucceedsImmediately() async throws { | |
try await confirmation() { receiveExactlyOneValue in | |
try await confirmation { loopTerminates in | |
// Create an immediate Future and succeed | |
let future = AsyncFuture<Int, Error> { promise in | |
promise(.success(42)) | |
} | |
// Read its first value | |
for try await value in future.values { | |
receiveExactlyOneValue() // This will fail if it's already been called | |
#expect(value == 42) | |
} | |
loopTerminates() | |
} | |
} | |
} | |
struct TestError: Error {} | |
@Test func immediatelyFailingFutureFailsImmediately() async throws { | |
// Create an immediate Future and fail | |
let future = AsyncFuture<Int, Error> { promise in | |
promise(.failure(TestError())) | |
} | |
await #expect(throws: TestError.self) { | |
// Read its first value; it should throw | |
for try await _ in future.values { | |
Issue.record("Should not receive any values") | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Attempt to correctly handle demand. Not sure if this could be done more simply. | |
private extension AsyncFuture { | |
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable { | |
private var subscriber: S? | |
private let workTask: Task<Void, Error> | |
private var readTask: Task<Void, Error>? | |
private let (stream, continuation) = AsyncStream.makeStream(of: Result<Output, Failure>.self, bufferingPolicy: .bufferingOldest(1)) | |
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) { | |
self.subscriber = subscriber | |
workTask = Task { [continuation] in | |
await work { result in | |
switch result { | |
case .success(let output): | |
continuation.yield(.success(output)) | |
case .failure(let failure): | |
continuation.yield(.failure(failure)) | |
} | |
continuation.finish() | |
} | |
} | |
} | |
func request(_ demand: Subscribers.Demand) { | |
if let subscriber, demand > 0 { | |
readTask = Task { | |
for await result in stream { | |
switch result { | |
case .success(let value): | |
_ = subscriber.receive(value) | |
subscriber.receive(completion: .finished) | |
case .failure(let failure): | |
subscriber.receive(completion: .failure(failure)) | |
} | |
} | |
} | |
} | |
} | |
func cancel() { | |
subscriber = nil | |
workTask.cancel() | |
readTask?.cancel() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private extension AsyncFuture { | |
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable { | |
private var subscriber: S? | |
private var task: Task<Void, Error>? | |
private var work: (@Sendable (@escaping Promise) async -> Void)? | |
init(subscriber: S, work: @Sendable @escaping (@escaping Promise) async -> Void) { | |
self.subscriber = subscriber | |
self.work = work | |
} | |
func request(_ demand: Subscribers.Demand) { | |
if task == nil, let subscriber, let work, demand > 0 { | |
task = Task { | |
await work { result in | |
switch result { | |
case .success(let output): | |
_ = subscriber.receive(output) | |
subscriber.receive(completion: .finished) | |
case .failure(let failure): | |
subscriber.receive(completion: .failure(failure)) | |
} | |
} | |
} | |
self.subscriber = nil | |
self.work = nil | |
} | |
} | |
func cancel() { | |
subscriber = nil | |
task?.cancel() | |
task = nil | |
work = nil | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@rnapier – Frankly, the whole “future/promise” pattern is wrong with Swift concurrency (notably the “promise” part). With Swift concurrency, we don’t
return
until we are done awaiting the async routines. We only need promises in legacy patterns (delegates, completion handler closures, etc.). But in Swift concurrency, we don’treturn
until the async work is done, anyway. I’ve updated my S.O. answer with that pattern.