Skip to content

Instantly share code, notes, and snippets.

@KaQuMiQ
Last active February 25, 2022 09:09
Show Gist options
  • Save KaQuMiQ/1800b0e55c76a3c99b8b7c2555293392 to your computer and use it in GitHub Desktop.
Save KaQuMiQ/1800b0e55c76a3c99b8b7c2555293392 to your computer and use it in GitHub Desktop.
Combine Publisher to AsyncSequence
import Combine
extension Publisher {
public func asAsyncThrowingSequence() -> StreamedAsyncThrowingSequence<Output> {
StreamedAsyncThrowingSequence(self)
}
}
extension Publisher where Failure == Never {
public func asAsyncSequence() -> StreamedAsyncSequence<Output> {
StreamedAsyncSequence(self)
}
}
public final class StreamedAsyncThrowingSequence<Element> {
private let stream: AsyncThrowingStream<Element, Error>
private var cancellable: AnyCancellable?
public init<Upstream: Publisher>(_ upstream: Upstream)
where Upstream.Output == Element {
var cancellable: AnyCancellable? = nil
self.stream = .init { continuation in
cancellable =
upstream
.handleEvents(
receiveOutput: { output in
continuation.yield(output)
},
receiveCompletion: { completion in
switch completion {
case .finished:
continuation.finish(throwing: nil)
case let .failure(error):
continuation.finish(throwing: error)
}
},
receiveCancel: {
continuation.finish(throwing: CancellationError())
}
)
.sinkDrop()
}
self.cancellable = cancellable
}
}
extension StreamedAsyncThrowingSequence: AsyncSequence {
public typealias AsyncIterator = AsyncThrowingStream<Element, Error>.AsyncIterator
public func makeAsyncIterator() -> AsyncIterator {
self.stream.makeAsyncIterator()
}
}
public final class StreamedAsyncSequence<Element> {
private let stream: AsyncStream<Element>
private var cancellable: AnyCancellable?
public init<Upstream: Publisher>(_ upstream: Upstream)
where Upstream.Output == Element {
var cancellable: AnyCancellable? = nil
self.stream = .init { continuation in
cancellable =
upstream
.handleEvents(
receiveOutput: { output in
continuation.yield(output)
},
receiveCompletion: { completion in
switch completion {
case .finished:
continuation.finish()
case .failure:
unreachable("Failure is Never")
}
},
receiveCancel: {
continuation.finish() // just finishing on cancel
}
)
.sinkDrop()
}
self.cancellable = cancellable
}
}
extension StreamedAsyncSequence: AsyncSequence {
public typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
public func makeAsyncIterator() -> AsyncIterator {
self.stream.makeAsyncIterator()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment