Last active
March 3, 2025 15:25
-
-
Save erezhod/b0ea8340daf74fd0ce2cf414c7ce87a2 to your computer and use it in GitHub Desktop.
A broadcaster that allows multiple consumers to receive the same updates from a single source. Based on `AsyncStream` but designed for multiple subscribers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import Foundation | |
| /** | |
| A broadcaster that allows multiple consumers to receive the same updates from a single source. | |
| Similar to `AsyncStream` but designed for multiple subscribers. | |
| Example usage | |
| ```swift | |
| final class AuthRepository { | |
| private let authStateStream = AsyncStreamBroadcast<AuthState>() | |
| init() { | |
| // Initialize with logged out state | |
| authStateStream.broadcast(.loggedOut) | |
| } | |
| func authStateChanges() -> AsyncStream<AuthState> { | |
| authStateStream.subscribe(bufferingPolicy: .bufferingNewest(10)) | |
| } | |
| func updateAuthState(to newState: AuthState) { | |
| authStateStream.broadcast(newState) | |
| } | |
| } | |
| ``` | |
| **/ | |
| final class AsyncStreamBroadcast<Element: Sendable>: Sendable { | |
| private let state: State | |
| /// Creates a new AsyncStreamBroadcast | |
| init(latestValue: Element? = nil) { | |
| self.state = State(latestValue: latestValue) | |
| } | |
| /// Subscribes to the broadcast and returns an AsyncStream of elements | |
| func subscribe(bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) -> AsyncStream<Element> { | |
| AsyncStream<Element>(bufferingPolicy: bufferingPolicy) { continuation in | |
| let consumer = Consumer(continuation: continuation) | |
| // Capture values safely | |
| Task { | |
| if let latestValue = await state.addConsumer(consumer) { | |
| continuation.yield(latestValue) | |
| } | |
| } | |
| continuation.onTermination = { [weak self, consumer] _ in | |
| guard let self else { return } | |
| Task { | |
| await self.state.removeConsumer(id: consumer.id) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // MARK: - Main APIs | |
| extension AsyncStreamBroadcast { | |
| /// Broadcasts a value to all subscribers | |
| func broadcast(_ value: Element) { | |
| Task { | |
| let consumers = await state.updateLatestValue(value) | |
| for consumer in consumers { | |
| consumer.continuation.yield(value) | |
| } | |
| } | |
| } | |
| /// Terminates all active subscriptions | |
| func finish() { | |
| Task { | |
| let consumers = await state.getAllConsumers() | |
| for consumer in consumers { | |
| consumer.continuation.finish() | |
| } | |
| } | |
| } | |
| } | |
| // MARK: - Private types | |
| private extension AsyncStreamBroadcast { | |
| /// Represents a consumer of the broadcast stream | |
| final class Consumer: Sendable { | |
| let id = UUID() | |
| let continuation: AsyncStream<Element>.Continuation | |
| init(continuation: AsyncStream<Element>.Continuation) { | |
| self.continuation = continuation | |
| } | |
| } | |
| /// Actor to safely manage state | |
| actor State { | |
| var consumers = [UUID: Consumer]() | |
| var latestValue: Element? | |
| init(latestValue: Element? = nil) { | |
| self.latestValue = latestValue | |
| } | |
| func addConsumer(_ consumer: Consumer) -> Element? { | |
| consumers[consumer.id] = consumer | |
| return latestValue | |
| } | |
| func removeConsumer(id: UUID) { | |
| consumers.removeValue(forKey: id) | |
| } | |
| func updateLatestValue(_ value: Element) -> [Consumer] { | |
| latestValue = value | |
| return Array(consumers.values) | |
| } | |
| func getAllConsumers() -> [Consumer] { | |
| let allConsumers = Array(consumers.values) | |
| consumers.removeAll() | |
| return allConsumers | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment