Skip to content

Instantly share code, notes, and snippets.

@rnapier
Last active January 11, 2025 19:54
Show Gist options
  • Save rnapier/a5007ff58a3d49cda4ef0039d6d59c21 to your computer and use it in GitHub Desktop.
Save rnapier/a5007ff58a3d49cda4ef0039d6d59c21 to your computer and use it in GitHub Desktop.
AsyncFuture
import Testing
import Combine
// From https://stackoverflow.com/questions/78892734/getting-task-isolated-value-of-type-async-passed-as-a-strongly-trans/78899940#78899940
public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable {
public typealias Promise = @Sendable (Result<Output, Failure>) -> Void
private let work: @Sendable (@escaping Promise) async -> Void
public init(_ work: @Sendable @escaping (@escaping Promise) async -> Void) {
self.work = work
}
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
let subscription = AsyncSubscription(subscriber: subscriber, work: work)
subscriber.receive(subscription: subscription)
}
}
private extension AsyncFuture {
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
private var subscriber: S?
private let task: Task<Void, Error>
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) {
self.subscriber = subscriber
task = Task {
await work { result in
switch result {
case .success(let output):
_ = subscriber.receive(output)
subscriber.receive(completion: .finished)
case .failure(let failure):
subscriber.receive(completion: .failure(failure))
}
}
}
}
func request(_ demand: Subscribers.Demand) { }
func cancel() {
subscriber = nil
task.cancel()
}
}
}
struct AsyncFutureTests {
// Run this test 100 times
@Test func immediatelySuccessfulFutureSucceedsImmediately() async throws {
try await confirmation() { receiveExactlyOneValue in
try await confirmation { loopTerminates in
// Create an immediate Future and succeed
let future = AsyncFuture<Int, Error> { promise in
promise(.success(42))
}
// Read its first value
for try await value in future.values {
receiveExactlyOneValue() // This will fail if it's already been called
#expect(value == 42)
}
loopTerminates()
}
}
}
struct TestError: Error {}
@Test func immediatelyFailingFutureFailsImmediately() async throws {
// Create an immediate Future and fail
let future = AsyncFuture<Int, Error> { promise in
promise(.failure(TestError()))
}
await #expect(throws: TestError.self) {
// Read its first value; it should throw
for try await _ in future.values {
Issue.record("Should not receive any values")
}
}
}
}
// Attempt to correctly handle demand. Not sure if this could be done more simply.
private extension AsyncFuture {
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
private var subscriber: S?
private let workTask: Task<Void, Error>
private var readTask: Task<Void, Error>?
private let (stream, continuation) = AsyncStream.makeStream(of: Result<Output, Failure>.self, bufferingPolicy: .bufferingOldest(1))
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) {
self.subscriber = subscriber
workTask = Task { [continuation] in
await work { result in
switch result {
case .success(let output):
continuation.yield(.success(output))
case .failure(let failure):
continuation.yield(.failure(failure))
}
continuation.finish()
}
}
}
func request(_ demand: Subscribers.Demand) {
if let subscriber, demand > 0 {
readTask = Task {
for await result in stream {
switch result {
case .success(let value):
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
case .failure(let failure):
subscriber.receive(completion: .failure(failure))
}
}
}
}
}
func cancel() {
subscriber = nil
workTask.cancel()
readTask?.cancel()
}
}
}
private extension AsyncFuture {
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
private var subscriber: S?
private var task: Task<Void, Error>?
private var work: (@Sendable (@escaping Promise) async -> Void)?
init(subscriber: S, work: @Sendable @escaping (@escaping Promise) async -> Void) {
self.subscriber = subscriber
self.work = work
}
func request(_ demand: Subscribers.Demand) {
if task == nil, let subscriber, let work, demand > 0 {
task = Task {
await work { result in
switch result {
case .success(let output):
_ = subscriber.receive(output)
subscriber.receive(completion: .finished)
case .failure(let failure):
subscriber.receive(completion: .failure(failure))
}
}
}
self.subscriber = nil
self.work = nil
}
}
func cancel() {
subscriber = nil
task?.cancel()
task = nil
work = nil
}
}
}
@robertmryan
Copy link

robertmryan commented Jan 9, 2025

@rnapier – Frankly, the whole “future/promise” pattern is wrong with Swift concurrency (notably the “promise” part). With Swift concurrency, we don’t return until we are done awaiting the async routines. We only need promises in legacy patterns (delegates, completion handler closures, etc.). But in Swift concurrency, we don’t return until the async work is done, anyway. I’ve updated my S.O. answer with that pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment