Skip to content

Instantly share code, notes, and snippets.

@swhitty
Last active January 8, 2022 03:04
Show Gist options
  • Save swhitty/4913a6e7fc7e983fe3f7c656e64f8281 to your computer and use it in GitHub Desktop.
Save swhitty/4913a6e7fc7e983fe3f7c656e64f8281 to your computer and use it in GitHub Desktop.
import Combine
extension Publisher {
/// Converts publisher to AsyncSequence.
@available(iOS, deprecated: 15.0, message: "Use publisher.values directly")
var valuesAsync: AsyncThrowingPublisher<Self> {
AsyncThrowingPublisher(self)
}
}
/// AsyncSequence from a Publisher.
/// Combine.AsyncThrowingPublisher is used when available, otherwise AsyncThrowingStream is used.
@available(iOS, deprecated: 15.0, message: "Use Combine.AsyncThrowingPublisher directly")
struct AsyncThrowingPublisher<P>: AsyncSequence where P: Publisher {
typealias Element = P.Output
private let publisher: P
init(_ publisher: P) {
self.publisher = publisher
}
func makeAsyncIterator() -> Iterator {
if #available(iOS 15.0, *) {
var iterator = Combine.AsyncThrowingPublisher(publisher).makeAsyncIterator()
return Iterator { try await iterator.next() }
} else {
var iterator = makeAsyncStream().makeAsyncIterator()
return Iterator { try await iterator.next() }
}
}
struct Iterator: AsyncIteratorProtocol {
let _next: () async throws -> P.Output?
mutating func next() async throws -> P.Output? {
try await _next()
}
}
private func makeAsyncStream() -> AsyncThrowingStream<Element, Error> {
AsyncThrowingStream(Element.self, bufferingPolicy: .bufferingOldest(1)) { continuation in
publisher
.mapError { $0 as Error }
.receive(subscriber: Inner(continuation: continuation))
}
}
}
private extension AsyncThrowingPublisher {
final class Inner: Subscriber {
typealias Continuation = AsyncThrowingStream<Input, Error>.Continuation
private var subscription: Subscription?
private let continuation: Continuation
init(continuation: Continuation) {
self.continuation = continuation
continuation.onTermination = cancel
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.max(1))
}
func receive(_ input: Element) -> Subscribers.Demand {
continuation.yield(input)
Task { [subscription] in
// Demand for next value is requested asynchronously allowing
// synchronous publishers like Publishers.Sequence to yield and suspend.
subscription?.request(.max(1))
}
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
subscription = nil
switch completion {
case .failure(let error):
continuation.finish(throwing: error)
case .finished:
continuation.finish(throwing: nil)
}
}
@Sendable
func cancel(_: Continuation.Termination) {
subscription?.cancel()
subscription = nil
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment