Created
November 29, 2024 19:14
-
-
Save wildthink/c4497424a113f33b38cfb78b299bb13f to your computer and use it in GitHub Desktop.
A thread-safe publish-subscribe hub for exchanging packets between producers and consumers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// PubSubHub.swift | |
// Toolchain | |
// | |
// Created by Jason Jobe on 11/29/24. | |
// | |
import Foundation | |
/// A thread-safe publish-subscribe hub for exchanging packets between producers and consumers. | |
/// | |
/// `PubSubHub` provides a mechanism to post packets and notify subscribed callbacks. Each | |
/// callback is typed, ensuring type safety in the message-passing system. | |
/// | |
/// Example usage: | |
/// ```swift | |
/// let hub = PubSubHub("ExampleHub") | |
/// let subscription = hub.subscribe(for: String.self) { packet in | |
/// print("Received: \(packet)") | |
/// } | |
/// hub.post(packet: "Hello, world!") | |
/// subscription.cancel() | |
/// ``` | |
/// | |
/// - Note: The hub uses a private `DispatchQueue` to ensure thread safety for | |
/// subscription management and message posting. | |
public class PubSubHub { | |
/// A typealias for subscriber callback functions. | |
/// - Parameter Packet: The type of packet the callback will handle. | |
public typealias Callback<Packet> = @Sendable (Packet) -> Void | |
/// The name of the hub, useful for debugging and identification. | |
public let name: String | |
/// The private queue used for thread-safe operations. | |
private let queue: DispatchQueue | |
/// The set of active subscriptions. | |
private var subscribers: Set<Subscription> = [] | |
/// Creates a new `PubSubHub` with a given name. | |
/// - Parameter name: A unique identifier for the hub. | |
public init(_ name: String) { | |
self.name = name | |
self.queue = DispatchQueue(label: "\(name).queue") | |
} | |
/// Posts a packet to all matching subscribers. | |
/// - Parameter packet: The packet to post. It must match the type expected by the subscribers. | |
public func post<P>(packet: P) { | |
queue.sync { | |
for sub in subscribers { | |
sub(packet) | |
} | |
} | |
} | |
/// Subscribes to a specific type of packet. | |
/// | |
/// - Parameters: | |
/// - p: The type of packet to subscribe to. | |
/// - callback: The callback function to be invoked when a matching packet is posted. | |
/// - Returns: A `Subscription` that can be used to cancel the subscription. | |
public func subscribe<P>(for p: P.Type, callback: @escaping Callback<P>) -> Subscription { | |
let sub = Subscription(id: .timestamp, hub: self, callback: callback) | |
return queue.sync { | |
subscribers.insert(sub) | |
return sub | |
} | |
} | |
/// Unsubscribes a given subscription from the hub. | |
/// - Parameter sub: The `Subscription` to be removed. | |
public func unsubscribe(_ sub: Subscription) { | |
queue.sync { | |
subscribers.remove(sub) | |
return | |
} | |
} | |
} | |
extension Int64 { | |
/// A utility to generate a timestamp-based identifier. | |
static var timestamp: Int64 { Int64(Date().timeIntervalSince1970 * 1000) } | |
} | |
/// A handle to manage a subscription within a `PubSubHub`. | |
/// | |
/// Subscriptions act as a bridge between a hub and a subscriber's callback function. | |
/// They allow for unsubscribing or invoking callbacks directly. | |
/// | |
/// Example usage: | |
/// ```swift | |
/// let subscription = hub.subscribe(for: Int.self) { packet in | |
/// print("Received: \(packet)") | |
/// } | |
/// subscription.cancel() | |
/// ``` | |
public struct Subscription: Identifiable, Hashable { | |
// TODO: track init #fileID, #line | |
/// The unique identifier for this subscription. | |
public let id: Int64 | |
/// A weak reference to the associated `PubSubHub`. | |
private weak var hub: PubSubHub? | |
/// The internal callback function that handles packet dispatching. | |
private let _callback: @Sendable (Any) -> Bool | |
/// Cancels the subscription, removing it from the hub. | |
public func cancel() { | |
hub?.unsubscribe(self) | |
} | |
/// Invokes the subscription with a packet. | |
/// | |
/// - Parameter packet: The packet to deliver to the callback. | |
/// - Returns: A boolean indicating whether the callback was successfully invoked. | |
@discardableResult | |
public func callAsFunction<P>(_ packet: P) -> Bool { | |
_callback(packet) | |
} | |
/// Creates a new `Subscription`. | |
/// | |
/// - Parameters: | |
/// - id: A unique identifier for the subscription. | |
/// - hub: The hub managing this subscription. | |
/// - callback: The callback to be invoked when a matching packet is posted. | |
public init<P>(id: Int64, hub: PubSubHub, callback: @escaping @Sendable (P) -> Void) { | |
self.id = id | |
self._callback = { any_p in | |
guard let p = any_p as? P else { return false } | |
callback(p) | |
return true | |
} | |
} | |
} | |
extension Subscription { | |
public func hash(into hasher: inout Hasher) { hasher.combine(id) } | |
/// Compares two `Subscription` objects for equality. | |
/// - Parameters: | |
/// - lhs: The first subscription. | |
/// - rhs: The second subscription. | |
/// - Returns: `true` if the two subscriptions have the same identifier, otherwise `false`. | |
public static func == (lhs: Subscription, rhs: Subscription) -> Bool { | |
lhs.id == rhs.id | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment