-
-
Save tkersey/b6c83aeb8e20787e9eb8ba4d0d54dba5 to your computer and use it in GitHub Desktop.
An AsyncSequence that allows to be consumed several times. Returning the current state as specified in a reduce function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct ReducedReplayAsyncStream<Element> { | |
typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void | |
private let storage: _Storage | |
private var originalStream: AsyncStream<Element> | |
init( | |
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded, | |
initialResult: Element, | |
reduce: @escaping Reduce, | |
build: (AsyncStream<Element>.Continuation) -> Void | |
) { | |
originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build) | |
storage = _Storage(stored: initialResult, reduce: reduce) | |
} | |
private func makeStream() -> AsyncStream<Element> { | |
AsyncStream<Element> { continuation in | |
Task { | |
var isFirst = false | |
if await !storage.didStart { | |
await storage.setDidStart(true) | |
isFirst = true | |
startConsumingOriginalStream() | |
} | |
if !isFirst { | |
await continuation.yield(storage.stored) | |
} | |
await storage.appendContinuation(continuation) | |
} | |
} | |
} | |
private func startConsumingOriginalStream () { | |
Task { | |
for await value in originalStream { | |
await storage.updateWith(value: value) | |
} | |
await storage.continuations.forEach { $0.finish() } | |
} | |
} | |
} | |
extension ReducedReplayAsyncStream { | |
private actor _Storage { | |
private let reduce: ReducedReplayAsyncStream.Reduce | |
var didStart = false | |
var stored: Element | |
var continuations: [AsyncStream<Element>.Continuation] = [] | |
init(stored: Element, reduce: @escaping Reduce) { | |
self.stored = stored | |
self.reduce = reduce | |
} | |
func updateWith(value: Element) { | |
reduce(&stored, value) | |
continuations.forEach { $0.yield(value) } | |
} | |
func setDidStart(_ value: Bool) { | |
didStart = value | |
} | |
func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) { | |
continuations.append(continuation) | |
} | |
} | |
} | |
extension ReducedReplayAsyncStream: AsyncSequence { | |
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator | |
func makeAsyncIterator() -> AsyncIterator { | |
let stream = makeStream() | |
return stream.makeAsyncIterator() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment