Skip to content

Instantly share code, notes, and snippets.

@rnapier
Last active January 11, 2025 19:54
Show Gist options
  • Save rnapier/a5007ff58a3d49cda4ef0039d6d59c21 to your computer and use it in GitHub Desktop.
Save rnapier/a5007ff58a3d49cda4ef0039d6d59c21 to your computer and use it in GitHub Desktop.
AsyncFuture
import Testing
import Combine
// From https://stackoverflow.com/questions/78892734/getting-task-isolated-value-of-type-async-passed-as-a-strongly-trans/78899940#78899940
public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable {
public typealias Promise = @Sendable (Result<Output, Failure>) -> Void
private let work: @Sendable (@escaping Promise) async -> Void
public init(_ work: @Sendable @escaping (@escaping Promise) async -> Void) {
self.work = work
}
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
let subscription = AsyncSubscription(subscriber: subscriber, work: work)
subscriber.receive(subscription: subscription)
}
}
private extension AsyncFuture {
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
private var subscriber: S?
private let task: Task<Void, Error>
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) {
self.subscriber = subscriber
task = Task {
await work { result in
switch result {
case .success(let output):
_ = subscriber.receive(output)
subscriber.receive(completion: .finished)
case .failure(let failure):
subscriber.receive(completion: .failure(failure))
}
}
}
}
func request(_ demand: Subscribers.Demand) { }
func cancel() {
subscriber = nil
task.cancel()
}
}
}
struct AsyncFutureTests {
// Run this test 100 times
@Test func immediatelySuccessfulFutureSucceedsImmediately() async throws {
try await confirmation() { receiveExactlyOneValue in
try await confirmation { loopTerminates in
// Create an immediate Future and succeed
let future = AsyncFuture<Int, Error> { promise in
promise(.success(42))
}
// Read its first value
for try await value in future.values {
receiveExactlyOneValue() // This will fail if it's already been called
#expect(value == 42)
}
loopTerminates()
}
}
}
struct TestError: Error {}
@Test func immediatelyFailingFutureFailsImmediately() async throws {
// Create an immediate Future and fail
let future = AsyncFuture<Int, Error> { promise in
promise(.failure(TestError()))
}
await #expect(throws: TestError.self) {
// Read its first value; it should throw
for try await _ in future.values {
Issue.record("Should not receive any values")
}
}
}
}
// Attempt to correctly handle demand. Not sure if this could be done more simply.
private extension AsyncFuture {
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
private var subscriber: S?
private let workTask: Task<Void, Error>
private var readTask: Task<Void, Error>?
private let (stream, continuation) = AsyncStream.makeStream(of: Result<Output, Failure>.self, bufferingPolicy: .bufferingOldest(1))
init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) {
self.subscriber = subscriber
workTask = Task { [continuation] in
await work { result in
switch result {
case .success(let output):
continuation.yield(.success(output))
case .failure(let failure):
continuation.yield(.failure(failure))
}
continuation.finish()
}
}
}
func request(_ demand: Subscribers.Demand) {
if let subscriber, demand > 0 {
readTask = Task {
for await result in stream {
switch result {
case .success(let value):
_ = subscriber.receive(value)
subscriber.receive(completion: .finished)
case .failure(let failure):
subscriber.receive(completion: .failure(failure))
}
}
}
}
}
func cancel() {
subscriber = nil
workTask.cancel()
readTask?.cancel()
}
}
}
private extension AsyncFuture {
final class AsyncSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
private var subscriber: S?
private var task: Task<Void, Error>?
private var work: (@Sendable (@escaping Promise) async -> Void)?
init(subscriber: S, work: @Sendable @escaping (@escaping Promise) async -> Void) {
self.subscriber = subscriber
self.work = work
}
func request(_ demand: Subscribers.Demand) {
if task == nil, let subscriber, let work, demand > 0 {
task = Task {
await work { result in
switch result {
case .success(let output):
_ = subscriber.receive(output)
subscriber.receive(completion: .finished)
case .failure(let failure):
subscriber.receive(completion: .failure(failure))
}
}
}
self.subscriber = nil
self.work = nil
}
}
func cancel() {
subscriber = nil
task?.cancel()
task = nil
work = nil
}
}
}
@robertmryan
Copy link

robertmryan commented Jan 8, 2025

@rnapier

You are absolutely right that the subscription should not send data to the subscriber before it requests it. Most “implement custom publisher” solutions gloss over this issue entirely (and only work because the publisher generally publishes more slowly than they are consumed).

IMHO, the right general pattern (as AsyncStream illustrates) is to buffer the published values and only send them when request indicates sufficient Demand. In the case of a Future (the publishing of a single element), this can be quite simple:

  1. when a publisher sends a value:
    • if the subscriber has previously informed the subscription of a demand, go ahead and send it to the subscriber and you’re done.
    • if the subscriber has not yet called request with a Demand, save the value for later, so we can send it to the subscriber when its ready
  2. when a subscriber calls request with a Demand:
    • if there is a saved value to send, just send it
    • if there is no value from the publisher, just update the demand count.

So, this is what I would be inclined to do:

public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable {
    public typealias Promise = @Sendable (Result<Output, Failure>) -> Void

    private let work: @Sendable (@escaping Promise) async -> Void

    public init(_ work: @Sendable @escaping (@escaping Promise) async -> Void) {
        self.work = work
    }

    public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
        let subscription = AsyncSubscription(subscriber: subscriber, work: work)
        subscriber.receive(subscription: subscription)
    }
}

private extension AsyncFuture {
    final class AsyncSubscription<S: Subscriber>: Subscription, @unchecked Sendable where S.Input == Output, S.Failure == Failure, S: Sendable {
        private let lock = NSLock()

        private var _subscriber: S?
        private var subscriber: S? {
            get { lock.withLock { _subscriber } }
            set { lock.withLock { _subscriber = newValue } }
        }
        private var _task: Task<Void, Error>?
        private var task: Task<Void, Error>? {
            get { lock.withLock { _task } }
            set { lock.withLock { _task = newValue } }
        }
        private var _result: Result<Output, Failure>?
        private var result: Result<Output, Failure>? {
            get { lock.withLock { _result } }
            set { lock.withLock { _result = newValue } }
        }
        private var _demand: Subscribers.Demand = .none
        private var demand: Subscribers.Demand {
            get { lock.withLock { _demand } }
            set { lock.withLock { _demand = newValue } }
        }

        init(subscriber: S, work: sending @escaping (@escaping Promise) async -> Void) {
            self.subscriber = subscriber
            task = Task {
                await work { result in
                    if self.demand == .none {
                        self.result = result
                    } else {
                        self.sendOutputToSubscriber(result: result)
                    }
                }
            }
        }

        func cancel() {
            subscriber = nil
            task?.cancel()
            task = nil
        }

        func sendOutputToSubscriber(result: Result<Output, Failure>) {
            switch result {
            case .success(let output):
                _ = subscriber?.receive(output)
                subscriber?.receive(completion: .finished)
            case .failure(let failure):
                subscriber?.receive(completion: .failure(failure))
            }
        }

        func request(_ demand: Subscribers.Demand) {
            if let result, demand > .none {
                sendOutputToSubscriber(result: result)
            } else {
                lock.withLock {
                    _demand += demand
                }
            }
        }
    }
}

The only thing that makes this a little messy is that in order to satisfy strict concurrency checking and/or Swift 6, I made this Sendable (and added the necessary synchronization).

But, anyway, this is how I'd be inclined to tackle it: Buffer the value if it has not yet been requested by subscriber, and send it as soon as the subscriber requests it.

@robertmryan
Copy link

Having stewed on this a bit (sitting in the dark in my house in Los Angeles that has no power; we’re at no immediate threat of the wildfires, but have been without power for the last 24 hours and will likely not get power until tomorrow; I’m stumbling along on solar power right now; lol), I didn’t like my synchronization in the above. I also wanted to abstract away my subscriber’s business logic into an extension. So perhaps:

/// AsyncAwaitFuture

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public final class AsyncFuture<Output, Failure: Error>: Publisher, Sendable {
    public typealias Promise = @Sendable (Result<Output, Failure>) -> Void
    public typealias Work = @Sendable (@escaping Promise) async -> Void

    private let work: Work

    public init(_ work: @escaping Work) {
        self.work = work
    }

    public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
        let subscription = AsyncFutureSubscription(subscriber: subscriber, work: work)
        subscriber.receive(subscription: subscription)
    }
}

private extension AsyncFuture {
    final class AsyncFutureSubscription<S: Subscriber>: Subscription, @unchecked Sendable where S.Input == Output, S.Failure == Failure, S: Sendable {
        private let lock = NSLock()
        
        private var subscriber: S?
        private var task: Task<Void, Error>?
        private var result: Result<Output, Failure>?   // keep buffer of published values; in this case, there is only one value, so a simple variable is sufficient
        private var demand: Subscribers.Demand = .none // keep track of how much pending demand by the subscriber
        
        init(subscriber: S, work: @escaping Work) {
            self.subscriber = subscriber
            task = Task {
                await work { [self] result in
                    lock.withLock { 
                        publisherProvided(result) 
                    }
                }
            }
        }
        
        func cancel() {
            lock.withLock {
                task?.cancel()
                task = nil
                subscriber = nil
            }
        }
        
        func request(_ demand: Subscribers.Demand) {
            lock.withLock {
                subscriberRequested(demand)
            }
        }
    }
}

private extension AsyncFuture.AsyncFutureSubscription {
    /// Publisher has provided a result
    ///
    /// If subscriber has already requested a result, then just send it.
    /// If subscriber has not yet requested result, then just save this result for future reference.
    
    func publisherProvided(_ result: Result<Output, Failure>) {
        if demand > .none {
            sendOutputToSubscriber(result: result)
        } else {
            self.result = result
        }
    }
    
    /// Subscriber has requested value
    ///
    /// If publisher has already provided a result and subscriber demand > .none, then send it.
    /// If publisher has not, just update the local demand count.
    
    func subscriberRequested(_ additionaDemand: Subscribers.Demand) {
        demand += additionaDemand
        if let result, demand > .none {
            sendOutputToSubscriber(result: result)
        }
    }
    
    /// Send output to subscriber
    ///
    /// Called only when both of the following are satisfied:
    ///    * publisher has provided a result to be sent; and
    ///    * subscriber has requested demand.
    
    func sendOutputToSubscriber(result: Result<Output, Failure>) {
        demand -= 1
        
        switch result {
        case .success(let output):
            _ = subscriber?.receive(output)
            subscriber?.receive(completion: .finished)
        case .failure(let failure):
            subscriber?.receive(completion: .failure(failure))
        }
    }
}

@robertmryan
Copy link

robertmryan commented Jan 9, 2025

@rnapier – Frankly, the whole “future/promise” pattern is wrong with Swift concurrency (notably the “promise” part). With Swift concurrency, we don’t return until we are done awaiting the async routines. We only need promises in legacy patterns (delegates, completion handler closures, etc.). But in Swift concurrency, we don’t return until the async work is done, anyway. I’ve updated my S.O. answer with that pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment