Skip to content

Instantly share code, notes, and snippets.

@wildthink
Created November 29, 2024 19:14
Show Gist options
  • Save wildthink/c4497424a113f33b38cfb78b299bb13f to your computer and use it in GitHub Desktop.
Save wildthink/c4497424a113f33b38cfb78b299bb13f to your computer and use it in GitHub Desktop.
A thread-safe publish-subscribe hub for exchanging packets between producers and consumers.
//
// PubSubHub.swift
// Toolchain
//
// Created by Jason Jobe on 11/29/24.
//
import Foundation
/// A thread-safe publish-subscribe hub for exchanging packets between producers and consumers.
///
/// `PubSubHub` provides a mechanism to post packets and notify subscribed callbacks. Each
/// callback is typed, ensuring type safety in the message-passing system.
///
/// Example usage:
/// ```swift
/// let hub = PubSubHub("ExampleHub")
/// let subscription = hub.subscribe(for: String.self) { packet in
/// print("Received: \(packet)")
/// }
/// hub.post(packet: "Hello, world!")
/// subscription.cancel()
/// ```
///
/// - Note: The hub uses a private `DispatchQueue` to ensure thread safety for
/// subscription management and message posting.
public class PubSubHub {
/// A typealias for subscriber callback functions.
/// - Parameter Packet: The type of packet the callback will handle.
public typealias Callback<Packet> = @Sendable (Packet) -> Void
/// The name of the hub, useful for debugging and identification.
public let name: String
/// The private queue used for thread-safe operations.
private let queue: DispatchQueue
/// The set of active subscriptions.
private var subscribers: Set<Subscription> = []
/// Creates a new `PubSubHub` with a given name.
/// - Parameter name: A unique identifier for the hub.
public init(_ name: String) {
self.name = name
self.queue = DispatchQueue(label: "\(name).queue")
}
/// Posts a packet to all matching subscribers.
/// - Parameter packet: The packet to post. It must match the type expected by the subscribers.
public func post<P>(packet: P) {
queue.sync {
for sub in subscribers {
sub(packet)
}
}
}
/// Subscribes to a specific type of packet.
///
/// - Parameters:
/// - p: The type of packet to subscribe to.
/// - callback: The callback function to be invoked when a matching packet is posted.
/// - Returns: A `Subscription` that can be used to cancel the subscription.
public func subscribe<P>(for p: P.Type, callback: @escaping Callback<P>) -> Subscription {
let sub = Subscription(id: .timestamp, hub: self, callback: callback)
return queue.sync {
subscribers.insert(sub)
return sub
}
}
/// Unsubscribes a given subscription from the hub.
/// - Parameter sub: The `Subscription` to be removed.
public func unsubscribe(_ sub: Subscription) {
queue.sync {
subscribers.remove(sub)
return
}
}
}
extension Int64 {
/// A utility to generate a timestamp-based identifier.
static var timestamp: Int64 { Int64(Date().timeIntervalSince1970 * 1000) }
}
/// A handle to manage a subscription within a `PubSubHub`.
///
/// Subscriptions act as a bridge between a hub and a subscriber's callback function.
/// They allow for unsubscribing or invoking callbacks directly.
///
/// Example usage:
/// ```swift
/// let subscription = hub.subscribe(for: Int.self) { packet in
/// print("Received: \(packet)")
/// }
/// subscription.cancel()
/// ```
public struct Subscription: Identifiable, Hashable {
// TODO: track init #fileID, #line
/// The unique identifier for this subscription.
public let id: Int64
/// A weak reference to the associated `PubSubHub`.
private weak var hub: PubSubHub?
/// The internal callback function that handles packet dispatching.
private let _callback: @Sendable (Any) -> Bool
/// Cancels the subscription, removing it from the hub.
public func cancel() {
hub?.unsubscribe(self)
}
/// Invokes the subscription with a packet.
///
/// - Parameter packet: The packet to deliver to the callback.
/// - Returns: A boolean indicating whether the callback was successfully invoked.
@discardableResult
public func callAsFunction<P>(_ packet: P) -> Bool {
_callback(packet)
}
/// Creates a new `Subscription`.
///
/// - Parameters:
/// - id: A unique identifier for the subscription.
/// - hub: The hub managing this subscription.
/// - callback: The callback to be invoked when a matching packet is posted.
public init<P>(id: Int64, hub: PubSubHub, callback: @escaping @Sendable (P) -> Void) {
self.id = id
self._callback = { any_p in
guard let p = any_p as? P else { return false }
callback(p)
return true
}
}
}
extension Subscription {
public func hash(into hasher: inout Hasher) { hasher.combine(id) }
/// Compares two `Subscription` objects for equality.
/// - Parameters:
/// - lhs: The first subscription.
/// - rhs: The second subscription.
/// - Returns: `true` if the two subscriptions have the same identifier, otherwise `false`.
public static func == (lhs: Subscription, rhs: Subscription) -> Bool {
lhs.id == rhs.id
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment