Skip to content

Instantly share code, notes, and snippets.

@mcejp
Created May 9, 2021 10:14
Show Gist options
  • Save mcejp/148397ca415405f69bbc9b8cc0308b47 to your computer and use it in GitHub Desktop.
Save mcejp/148397ca415405f69bbc9b8cc0308b47 to your computer and use it in GitHub Desktop.
from queue import Queue
from threading import Lock
from typing import List, Optional
class Subscription:
_bus: "MessageBus"
_queue: Queue
# Class can be also gevent.queue.Queue
def __init__(self, bus: "MessageBus", queue_class=Queue):
self._bus = bus
self._queue = queue_class()
def __enter__(self):
self._bus._subscribe_queue(self._queue)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._bus._unsubscribe_queue(self._queue)
def await_message(self):
return self._queue.get()
# Asynchronous (fire-and-forget) message bus which retains the last message and re-plays it to new subscribers
class MessageBus:
_subscriptions: List[Queue]
_subscriptions_lock: Lock
_last_message: Optional[object]
def __init__(self):
self._subscriptions = []
self._subscriptions_lock = Lock()
self._last_message = None
def _subscribe_queue(self, queue) -> None:
with self._subscriptions_lock:
self._subscriptions.append(queue)
# Repeat last message to subscriber
message = self._last_message
if message is not None:
queue.put(message)
def _unsubscribe_queue(self, queue: Queue) -> None:
with self._subscriptions_lock:
self._subscriptions.remove(queue)
def publish(self, message):
with self._subscriptions_lock:
for queue in self._subscriptions:
queue.put(message)
self._last_message = message
@mcejp
Copy link
Author

mcejp commented May 9, 2021

Usage:

bus = MessageBus()


# Producer: receive blobs via AMQP and publish internally
def producer_thread():
    with pika.BlockingConnection(pika.ConnectionParameters("localhost")) as connection:
        channel = connection.channel()

        result = channel.queue_declare(queue="", exclusive=True)
        queue_name = result.method.queue

        channel.queue_bind(exchange=app.config["exchange_name"], queue=queue_name)

        for method, properties, body in channel.consume(queue=queue_name):
            bus.publish(body)
            channel.basic_ack(method.delivery_tag)


# Consumer: forward received blobs into websocket
@sockets.route("/ws")
def ws(socket: WebSocket):
    with Subscription(bus) as sub:
        while not socket.closed:
            message = sub.await_message()
            socket.send(message.body)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment