Created
January 29, 2022 11:12
-
-
Save ABridoux/913dfb95fabc22048b0a1c566059105a to your computer and use it in GitHub Desktop.
An AsyncSequence that allows to be consumed several times. Returning the current state as specified in a reduce function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct ReducedReplayAsyncStream<Element> { | |
typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void | |
private let storage: _Storage | |
private var originalStream: AsyncStream<Element> | |
init( | |
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded, | |
initialResult: Element, | |
reduce: @escaping Reduce, | |
build: (AsyncStream<Element>.Continuation) -> Void | |
) { | |
originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build) | |
storage = _Storage(stored: initialResult, reduce: reduce) | |
} | |
private func makeStream() -> AsyncStream<Element> { | |
AsyncStream<Element> { continuation in | |
Task { | |
var isFirst = false | |
if await !storage.didStart { | |
await storage.setDidStart(true) | |
isFirst = true | |
startConsumingOriginalStream() | |
} | |
if !isFirst { | |
await continuation.yield(storage.stored) | |
} | |
await storage.appendContinuation(continuation) | |
} | |
} | |
} | |
private func startConsumingOriginalStream () { | |
Task { | |
for await value in originalStream { | |
await storage.updateWith(value: value) | |
} | |
await storage.continuations.forEach { $0.finish() } | |
} | |
} | |
} | |
extension ReducedReplayAsyncStream { | |
private actor _Storage { | |
private let reduce: ReducedReplayAsyncStream.Reduce | |
var didStart = false | |
var stored: Element | |
var continuations: [AsyncStream<Element>.Continuation] = [] | |
init(stored: Element, reduce: @escaping Reduce) { | |
self.stored = stored | |
self.reduce = reduce | |
} | |
func updateWith(value: Element) { | |
reduce(&stored, value) | |
continuations.forEach { $0.yield(value) } | |
} | |
func setDidStart(_ value: Bool) { | |
didStart = value | |
} | |
func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) { | |
continuations.append(continuation) | |
} | |
} | |
} | |
extension ReducedReplayAsyncStream: AsyncSequence { | |
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator | |
func makeAsyncIterator() -> AsyncIterator { | |
let stream = makeStream() | |
return stream.makeAsyncIterator() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
An example
Considerations
Efficiency
Using an actor as the private storage might not be as efficient as a dispatch queue or a thread lock but I think it feels more natural with the structured concurrency. This could be improved if high performance is needed.
Copy on write
The structure does not implement copy on write. I wonder if it would make sense with an
AsyncSequence
since some sequence I used do not seem to implement it. And for instance anAsyncStream
will assert against two tasks awaiting its next element.