Last active
October 19, 2024 04:01
-
-
Save mrbodich/05b4e05a0a166b624354ce16da59b3ec to your computer and use it in GitHub Desktop.
SubSequenceManager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fileprivate actor SubSequenceManager<Base: AsyncSequence & Sendable> where Base.Element: Sendable { | |
fileprivate typealias Element = Base.Element | |
// Private | |
private var base: Base | |
private var continuations: [String : AsyncStream<Base.Element>.Continuation] = [:] | |
private var subscriptionTask: Task<Void, Never>? | |
private var actions: [StreamAction<Base.Element>] = [] | |
// MARK: Initialization | |
fileprivate init(_ base: Base) { | |
self.base = base | |
} | |
deinit { | |
self.subscriptionTask?.cancel() | |
} | |
// MARK: API | |
/// Creates an new stream and returns its async iterator that emits elements of base async sequence. | |
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`. | |
nonisolated fileprivate func makeAsyncIterator() -> PassthroughAsyncSequence<Base.Element>.AsyncIterator { | |
let id = UUID().uuidString | |
let sequence = AsyncStream<Element> { | |
$0.onTermination = { @Sendable _ in | |
self.remove(id) | |
} | |
await self.add(id: id, continuation: $0) | |
} | |
return sequence.makeAsyncIterator() | |
} | |
// MARK: Sequence management | |
nonisolated private func remove(_ id: String) { | |
Task { | |
await self._remove(id) | |
} | |
} | |
private func _remove(_ id: String) { | |
self.continuations.removeValue(forKey: id) | |
} | |
private func add(id: String, continuation: AsyncStream<Base.Element>.Continuation) { | |
self.continuations[id] = continuation | |
for action in self.actions { | |
switch action { | |
case let .yield(value): continuation.yield(value) | |
case .finish: continuation.finish() | |
} | |
} | |
self.subscribeToBaseSequenceIfNeeded() | |
} | |
private func subscribeToBaseSequenceIfNeeded() { | |
guard self.subscriptionTask == nil else { return } | |
self.subscriptionTask = Task { [weak self, base] in | |
guard let self = self else { return } | |
guard !Task.isCancelled else { | |
await self.continuations.values.forEach { | |
$0.finish() | |
} | |
return | |
} | |
do { | |
for try await value in base { | |
await addAction(.yield(value)) | |
await self.continuations.values.forEach { [value] in | |
$0.yield(value) | |
} | |
} | |
await addAction(.finish) | |
await self.continuations.values.forEach { $0.finish() } | |
} catch { | |
await addAction(.finish) | |
await self.continuations.values.forEach { $0.finish() } | |
} | |
} | |
} | |
private func addAction(_ action: StreamAction<Base.Element>) { | |
self.actions.append(action) | |
} | |
} | |
fileprivate enum StreamAction<Element> { | |
case yield(Element) | |
case finish | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment