Last active
February 25, 2022 09:09
-
-
Save KaQuMiQ/1800b0e55c76a3c99b8b7c2555293392 to your computer and use it in GitHub Desktop.
Combine Publisher to AsyncSequence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
extension Publisher { | |
public func asAsyncThrowingSequence() -> StreamedAsyncThrowingSequence<Output> { | |
StreamedAsyncThrowingSequence(self) | |
} | |
} | |
extension Publisher where Failure == Never { | |
public func asAsyncSequence() -> StreamedAsyncSequence<Output> { | |
StreamedAsyncSequence(self) | |
} | |
} | |
public final class StreamedAsyncThrowingSequence<Element> { | |
private let stream: AsyncThrowingStream<Element, Error> | |
private var cancellable: AnyCancellable? | |
public init<Upstream: Publisher>(_ upstream: Upstream) | |
where Upstream.Output == Element { | |
var cancellable: AnyCancellable? = nil | |
self.stream = .init { continuation in | |
cancellable = | |
upstream | |
.handleEvents( | |
receiveOutput: { output in | |
continuation.yield(output) | |
}, | |
receiveCompletion: { completion in | |
switch completion { | |
case .finished: | |
continuation.finish(throwing: nil) | |
case let .failure(error): | |
continuation.finish(throwing: error) | |
} | |
}, | |
receiveCancel: { | |
continuation.finish(throwing: CancellationError()) | |
} | |
) | |
.sinkDrop() | |
} | |
self.cancellable = cancellable | |
} | |
} | |
extension StreamedAsyncThrowingSequence: AsyncSequence { | |
public typealias AsyncIterator = AsyncThrowingStream<Element, Error>.AsyncIterator | |
public func makeAsyncIterator() -> AsyncIterator { | |
self.stream.makeAsyncIterator() | |
} | |
} | |
public final class StreamedAsyncSequence<Element> { | |
private let stream: AsyncStream<Element> | |
private var cancellable: AnyCancellable? | |
public init<Upstream: Publisher>(_ upstream: Upstream) | |
where Upstream.Output == Element { | |
var cancellable: AnyCancellable? = nil | |
self.stream = .init { continuation in | |
cancellable = | |
upstream | |
.handleEvents( | |
receiveOutput: { output in | |
continuation.yield(output) | |
}, | |
receiveCompletion: { completion in | |
switch completion { | |
case .finished: | |
continuation.finish() | |
case .failure: | |
unreachable("Failure is Never") | |
} | |
}, | |
receiveCancel: { | |
continuation.finish() // just finishing on cancel | |
} | |
) | |
.sinkDrop() | |
} | |
self.cancellable = cancellable | |
} | |
} | |
extension StreamedAsyncSequence: AsyncSequence { | |
public typealias AsyncIterator = AsyncStream<Element>.AsyncIterator | |
public func makeAsyncIterator() -> AsyncIterator { | |
self.stream.makeAsyncIterator() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment