Skip to content

Instantly share code, notes, and snippets.

@pauljohanneskraft
Last active September 8, 2023 09:40
Show Gist options
  • Save pauljohanneskraft/250c2bf54b32e6d33d0c2152552c546f to your computer and use it in GitHub Desktop.
Save pauljohanneskraft/250c2bf54b32e6d33d0c2152552c546f to your computer and use it in GitHub Desktop.
Using async/await from Combine

With the introduction of async/await, you might wonder how you can use the new feature in combination with Combine.

The code posted below allows you to write the following constructs:

let somePublisher = [1, 2, 3].publisher

func someAsyncFunction(_ int: Int) async -> Int {
    return int
}

let newPublisher = somePublisher.asyncMap { someAsyncFunction($0) }

Here's how it is created:

extension Publisher {

    public func asyncMap<O>(
        priority: TaskPriority? = nil,
        operation: @escaping (Output) async -> O
    ) -> Publishers.FlatMap<TaskPublisher<O>, Self> where Failure == Never {

        flatMap { value in
            TaskPublisher(priority: priority) {
                await operation(value)
            }
        }
    }

}

public struct TaskPublisher<Output>: Publisher {

    // MARK: Nested Types

    public typealias Failure = Never

    private class Subscription: Combine.Subscription {

        let task: Task<Output, Error>

        init<S: Subscriber>(subscriber: S, priority: TaskPriority?, operation: @Sendable @escaping () async -> Output) where S.Input == Output, S.Failure == Failure {
            task = Task(priority: priority) {
                let result = await operation()
                _ = subscriber.receive(result)
                subscriber.receive(completion: .finished)
                return result
            }
        }

        func request(_ demand: Subscribers.Demand) {}

        func cancel() {
            task.cancel()
        }

    }

    // MARK: Stored Properties

    public let priority: TaskPriority?
    public let operation: @Sendable () async -> Output

    // MARK: Initialization

    public init(priority: TaskPriority? = nil, operation: @Sendable @escaping () async -> Output) {
        self.priority = priority
        self.operation = operation
    }

    // MARK: Methods

    public func receive<S: Subscriber>(
        subscriber: S
    ) where Failure == S.Failure, Output == S.Input {

        let subscription = Subscription(subscriber: subscriber, priority: priority, operation: operation)
        subscriber.receive(subscription: subscription)
    }

}

The error-throwing equivalent is quite similar:

extension Publisher {

    public func tryAsyncMap<O>(
        priority: TaskPriority? = nil,
        operation: @escaping (Output) async throws -> O
    ) -> Publishers.FlatMap<ThrowingTaskPublisher<O>, Self> where Failure == Error {

        flatMap { value in
            ThrowingTaskPublisher(priority: priority) {
                try await operation(value)
            }
        }
    }
    
}

public struct ThrowingTaskPublisher<Output>: Publisher {

    // MARK: Nested Types

    public typealias Failure = Error

    private class Subscription: Combine.Subscription {

        let task: Task<Output, Error>

        init<S: Subscriber>(
            subscriber: S,
            priority: TaskPriority?,
            operation: @Sendable @escaping () async throws -> Output
        ) where S.Input == Output, S.Failure == Failure {

            task = Task(priority: priority) {
                do {
                    let result = try await operation()
                    _ = subscriber.receive(result)
                    subscriber.receive(completion: .finished)
                    return result
                } catch {
                    subscriber.receive(completion: .failure(error))
                    throw error
                }
            }
        }

        func request(_ demand: Subscribers.Demand) {}

        func cancel() {
            task.cancel()
        }

    }

    // MARK: Stored Properties

    public let priority: TaskPriority?
    public let operation: @Sendable () async throws -> Output

    // MARK: Initialization

    public init(priority: TaskPriority? = nil, operation: @Sendable @escaping () async throws -> Output) {
        self.priority = priority
        self.operation = operation
    }

    // MARK: Methods

    public func receive<S: Subscriber>(
        subscriber: S
    ) where Failure == S.Failure, Output == S.Input {

        let subscription = Subscription(subscriber: subscriber, priority: priority, operation: operation)
        subscriber.receive(subscription: subscription)
    }

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment