Skip to content

Instantly share code, notes, and snippets.

@mrbodich
Last active October 19, 2024 04:01
Show Gist options
  • Save mrbodich/05b4e05a0a166b624354ce16da59b3ec to your computer and use it in GitHub Desktop.
Save mrbodich/05b4e05a0a166b624354ce16da59b3ec to your computer and use it in GitHub Desktop.
SubSequenceManager
fileprivate actor SubSequenceManager<Base: AsyncSequence & Sendable> where Base.Element: Sendable {
fileprivate typealias Element = Base.Element
// Private
private var base: Base
private var continuations: [String : AsyncStream<Base.Element>.Continuation] = [:]
private var subscriptionTask: Task<Void, Never>?
private var actions: [StreamAction<Base.Element>] = []
// MARK: Initialization
fileprivate init(_ base: Base) {
self.base = base
}
deinit {
self.subscriptionTask?.cancel()
}
// MARK: API
/// Creates an new stream and returns its async iterator that emits elements of base async sequence.
/// - Returns: An instance that conforms to `AsyncIteratorProtocol`.
nonisolated fileprivate func makeAsyncIterator() -> PassthroughAsyncSequence<Base.Element>.AsyncIterator {
let id = UUID().uuidString
let sequence = AsyncStream<Element> {
$0.onTermination = { @Sendable _ in
self.remove(id)
}
await self.add(id: id, continuation: $0)
}
return sequence.makeAsyncIterator()
}
// MARK: Sequence management
nonisolated private func remove(_ id: String) {
Task {
await self._remove(id)
}
}
private func _remove(_ id: String) {
self.continuations.removeValue(forKey: id)
}
private func add(id: String, continuation: AsyncStream<Base.Element>.Continuation) {
self.continuations[id] = continuation
for action in self.actions {
switch action {
case let .yield(value): continuation.yield(value)
case .finish: continuation.finish()
}
}
self.subscribeToBaseSequenceIfNeeded()
}
private func subscribeToBaseSequenceIfNeeded() {
guard self.subscriptionTask == nil else { return }
self.subscriptionTask = Task { [weak self, base] in
guard let self = self else { return }
guard !Task.isCancelled else {
await self.continuations.values.forEach {
$0.finish()
}
return
}
do {
for try await value in base {
await addAction(.yield(value))
await self.continuations.values.forEach { [value] in
$0.yield(value)
}
}
await addAction(.finish)
await self.continuations.values.forEach { $0.finish() }
} catch {
await addAction(.finish)
await self.continuations.values.forEach { $0.finish() }
}
}
}
private func addAction(_ action: StreamAction<Base.Element>) {
self.actions.append(action)
}
}
fileprivate enum StreamAction<Element> {
case yield(Element)
case finish
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment