Skip to content

Instantly share code, notes, and snippets.

@erezhod
Last active March 3, 2025 15:25
Show Gist options
  • Select an option

  • Save erezhod/b0ea8340daf74fd0ce2cf414c7ce87a2 to your computer and use it in GitHub Desktop.

Select an option

Save erezhod/b0ea8340daf74fd0ce2cf414c7ce87a2 to your computer and use it in GitHub Desktop.
A broadcaster that allows multiple consumers to receive the same updates from a single source. Based on `AsyncStream` but designed for multiple subscribers.
import Foundation
/**
A broadcaster that allows multiple consumers to receive the same updates from a single source.
Similar to `AsyncStream` but designed for multiple subscribers.
Example usage
```swift
final class AuthRepository {
private let authStateStream = AsyncStreamBroadcast<AuthState>()
init() {
// Initialize with logged out state
authStateStream.broadcast(.loggedOut)
}
func authStateChanges() -> AsyncStream<AuthState> {
authStateStream.subscribe(bufferingPolicy: .bufferingNewest(10))
}
func updateAuthState(to newState: AuthState) {
authStateStream.broadcast(newState)
}
}
```
**/
final class AsyncStreamBroadcast<Element: Sendable>: Sendable {
private let state: State
/// Creates a new AsyncStreamBroadcast
init(latestValue: Element? = nil) {
self.state = State(latestValue: latestValue)
}
/// Subscribes to the broadcast and returns an AsyncStream of elements
func subscribe(bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) -> AsyncStream<Element> {
AsyncStream<Element>(bufferingPolicy: bufferingPolicy) { continuation in
let consumer = Consumer(continuation: continuation)
// Capture values safely
Task {
if let latestValue = await state.addConsumer(consumer) {
continuation.yield(latestValue)
}
}
continuation.onTermination = { [weak self, consumer] _ in
guard let self else { return }
Task {
await self.state.removeConsumer(id: consumer.id)
}
}
}
}
}
// MARK: - Main APIs
extension AsyncStreamBroadcast {
/// Broadcasts a value to all subscribers
func broadcast(_ value: Element) {
Task {
let consumers = await state.updateLatestValue(value)
for consumer in consumers {
consumer.continuation.yield(value)
}
}
}
/// Terminates all active subscriptions
func finish() {
Task {
let consumers = await state.getAllConsumers()
for consumer in consumers {
consumer.continuation.finish()
}
}
}
}
// MARK: - Private types
private extension AsyncStreamBroadcast {
/// Represents a consumer of the broadcast stream
final class Consumer: Sendable {
let id = UUID()
let continuation: AsyncStream<Element>.Continuation
init(continuation: AsyncStream<Element>.Continuation) {
self.continuation = continuation
}
}
/// Actor to safely manage state
actor State {
var consumers = [UUID: Consumer]()
var latestValue: Element?
init(latestValue: Element? = nil) {
self.latestValue = latestValue
}
func addConsumer(_ consumer: Consumer) -> Element? {
consumers[consumer.id] = consumer
return latestValue
}
func removeConsumer(id: UUID) {
consumers.removeValue(forKey: id)
}
func updateLatestValue(_ value: Element) -> [Consumer] {
latestValue = value
return Array(consumers.values)
}
func getAllConsumers() -> [Consumer] {
let allConsumers = Array(consumers.values)
consumers.removeAll()
return allConsumers
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment