Last active
July 18, 2022 19:26
-
-
Save jaycosaur/890b3e9a9041ba8f441583af1a1a4e89 to your computer and use it in GitHub Desktop.
Async and Sync queue message multicasting to multiple queues. This is the implementation of a message fanout strategy for worker threads and processes. Note this doesn't create worker threads / processes, it only manages (in a blocking way) multicasting messages.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Type, Set, Any | |
from multiprocessing import Queue | |
import asyncio | |
class MulticastQueue: | |
def __init__(self, queue_constructor: Type[Queue] = Queue) -> None: | |
self.subscribers: Set[Queue] = set() | |
self.constructor = queue_constructor | |
def register(self) -> Queue: | |
queue = self.constructor() | |
self.subscribers.add(queue) | |
return queue | |
def unregister(self, subscriber: Queue) -> None: | |
self.subscribers.remove(subscriber) | |
def multicast(self, message: Any) -> None: | |
for subscriber in self.subscribers: | |
subscriber.put(message) | |
class AsyncMulticastQueue: | |
def __init__(self) -> None: | |
self.subscribers: Set[asyncio.Queue] = set() | |
def register(self) -> asyncio.Queue: | |
queue: asyncio.Queue = asyncio.Queue() | |
self.subscribers.add(queue) | |
return queue | |
def unregister(self, subscriber: asyncio.Queue) -> None: | |
self.subscribers.remove(subscriber) | |
async def multicast(self, message: Any) -> None: | |
for subscriber in self.subscribers: | |
await subscriber.put(message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment