Skip to content

Instantly share code, notes, and snippets.

@sgade
Created August 15, 2024 19:05
Show Gist options
  • Save sgade/20e7839ec680b2ecb5e439adf30aeb64 to your computer and use it in GitHub Desktop.
Save sgade/20e7839ec680b2ecb5e439adf30aeb64 to your computer and use it in GitHub Desktop.
A simple wrapper to create AsyncStream and AsyncThrowingStream from any Publisher
import Combine
public extension Publisher where Failure == Never {
/// Creates an ``AsyncStream`` for the publisher that publishes values until either the publisher finishes or
/// the stream is terminated.
var asyncStream: AsyncStream<Output> {
AsyncStream { continuation in
let cancellable = sink { completion in
switch completion {
case .finished:
continuation.finish()
}
} receiveValue: { output in
continuation.yield(output)
}
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}
}
public extension Publisher where Failure: Error {
/// Creates an ``AsyncThrowingStream`` for the publisher that publishes values until either publisher finishes,
/// throws and error or the stream is terminated.
var asyncThrowingStream: AsyncThrowingStream<Output, some Error> {
AsyncThrowingStream { continuation in
let cancellable = sink { completion in
switch completion {
case let .failure(error):
continuation.finish(throwing: error)
case .finished:
continuation.finish()
}
} receiveValue: { output in
continuation.yield(output)
}
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment