Skip to content

Instantly share code, notes, and snippets.

@ogtega
Created August 2, 2022 09:59
Show Gist options
  • Save ogtega/82216711d8bee39d0dd0043cf7b1c0a6 to your computer and use it in GitHub Desktop.
Save ogtega/82216711d8bee39d0dd0043cf7b1c0a6 to your computer and use it in GitHub Desktop.
A class that initializes a RabbitMQ broker interface
from typing import Any, Callable, Coroutine
import aio_pika
from aiormq.abc import ConfirmationFrameType
from aio_pika.abc import (
AbstractIncomingMessage,
AbstractConnection,
AbstractChannel,
AbstractQueue,
AbstractExchange,
AbstractMessage,
ConsumerTag,
)
class Broker:
connection: AbstractConnection
channel: AbstractChannel
exchanges: dict[str, AbstractExchange]
queues: dict[str, AbstractQueue]
def __init__(
self,
connection: AbstractConnection,
channel: AbstractChannel,
exchanges: dict[str, AbstractExchange],
queues: dict[str, AbstractQueue],
):
self.connection = connection
self.channel = channel
self.exchanges = exchanges
self.queues = queues
def publish(
self, ex: str, msg: AbstractMessage, key: str
) -> Coroutine[Any, Any, ConfirmationFrameType | None]:
return self.exchanges.get(ex).publish(msg, key)
def subscribe(
self,
queue: str,
cb: Callable[[AbstractIncomingMessage], Any],
no_ack: bool = False,
) -> Coroutine[Any, Any, ConsumerTag]:
return self.queues.get(queue).consume(cb, no_ack)
@classmethod
async def create(
cls,
*args,
exchanges: tuple[tuple, dict] = (),
queues: dict[str, str] = {},
**kwargs,
) -> "Broker":
connection = await aio_pika.connect_robust(*args, **kwargs)
channel = await connection.channel()
e = dict(
[(k, await channel.declare_exchange(**v)) for k, v in exchanges.items()]
)
q = dict([(k, await channel.declare_queue(**v)) for k, v in queues.items()])
return cls(connection, channel, e, q)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment