With the introduction of async/await, you might wonder how you can use the new feature in combination with Combine.
The code posted below allows you to write the following constructs:
let somePublisher = [1, 2, 3].publisher
func someAsyncFunction(_ int: Int) async -> Int {
return int
}
let newPublisher = somePublisher.asyncMap { someAsyncFunction($0) }
Here's how it is created:
extension Publisher {
public func asyncMap<O>(
priority: TaskPriority? = nil,
operation: @escaping (Output) async -> O
) -> Publishers.FlatMap<TaskPublisher<O>, Self> where Failure == Never {
flatMap { value in
TaskPublisher(priority: priority) {
await operation(value)
}
}
}
}
public struct TaskPublisher<Output>: Publisher {
// MARK: Nested Types
public typealias Failure = Never
private class Subscription: Combine.Subscription {
let task: Task<Output, Error>
init<S: Subscriber>(subscriber: S, priority: TaskPriority?, operation: @Sendable @escaping () async -> Output) where S.Input == Output, S.Failure == Failure {
task = Task(priority: priority) {
let result = await operation()
_ = subscriber.receive(result)
subscriber.receive(completion: .finished)
return result
}
}
func request(_ demand: Subscribers.Demand) {}
func cancel() {
task.cancel()
}
}
// MARK: Stored Properties
public let priority: TaskPriority?
public let operation: @Sendable () async -> Output
// MARK: Initialization
public init(priority: TaskPriority? = nil, operation: @Sendable @escaping () async -> Output) {
self.priority = priority
self.operation = operation
}
// MARK: Methods
public func receive<S: Subscriber>(
subscriber: S
) where Failure == S.Failure, Output == S.Input {
let subscription = Subscription(subscriber: subscriber, priority: priority, operation: operation)
subscriber.receive(subscription: subscription)
}
}
The error-throwing equivalent is quite similar:
extension Publisher {
public func tryAsyncMap<O>(
priority: TaskPriority? = nil,
operation: @escaping (Output) async throws -> O
) -> Publishers.FlatMap<ThrowingTaskPublisher<O>, Self> where Failure == Error {
flatMap { value in
ThrowingTaskPublisher(priority: priority) {
try await operation(value)
}
}
}
}
public struct ThrowingTaskPublisher<Output>: Publisher {
// MARK: Nested Types
public typealias Failure = Error
private class Subscription: Combine.Subscription {
let task: Task<Output, Error>
init<S: Subscriber>(
subscriber: S,
priority: TaskPriority?,
operation: @Sendable @escaping () async throws -> Output
) where S.Input == Output, S.Failure == Failure {
task = Task(priority: priority) {
do {
let result = try await operation()
_ = subscriber.receive(result)
subscriber.receive(completion: .finished)
return result
} catch {
subscriber.receive(completion: .failure(error))
throw error
}
}
}
func request(_ demand: Subscribers.Demand) {}
func cancel() {
task.cancel()
}
}
// MARK: Stored Properties
public let priority: TaskPriority?
public let operation: @Sendable () async throws -> Output
// MARK: Initialization
public init(priority: TaskPriority? = nil, operation: @Sendable @escaping () async throws -> Output) {
self.priority = priority
self.operation = operation
}
// MARK: Methods
public func receive<S: Subscriber>(
subscriber: S
) where Failure == S.Failure, Output == S.Input {
let subscription = Subscription(subscriber: subscriber, priority: priority, operation: operation)
subscriber.receive(subscription: subscription)
}
}