Created
August 2, 2022 09:59
-
-
Save ogtega/82216711d8bee39d0dd0043cf7b1c0a6 to your computer and use it in GitHub Desktop.
A class that initializes a RabbitMQ broker interface
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Callable, Coroutine | |
import aio_pika | |
from aiormq.abc import ConfirmationFrameType | |
from aio_pika.abc import ( | |
AbstractIncomingMessage, | |
AbstractConnection, | |
AbstractChannel, | |
AbstractQueue, | |
AbstractExchange, | |
AbstractMessage, | |
ConsumerTag, | |
) | |
class Broker: | |
connection: AbstractConnection | |
channel: AbstractChannel | |
exchanges: dict[str, AbstractExchange] | |
queues: dict[str, AbstractQueue] | |
def __init__( | |
self, | |
connection: AbstractConnection, | |
channel: AbstractChannel, | |
exchanges: dict[str, AbstractExchange], | |
queues: dict[str, AbstractQueue], | |
): | |
self.connection = connection | |
self.channel = channel | |
self.exchanges = exchanges | |
self.queues = queues | |
def publish( | |
self, ex: str, msg: AbstractMessage, key: str | |
) -> Coroutine[Any, Any, ConfirmationFrameType | None]: | |
return self.exchanges.get(ex).publish(msg, key) | |
def subscribe( | |
self, | |
queue: str, | |
cb: Callable[[AbstractIncomingMessage], Any], | |
no_ack: bool = False, | |
) -> Coroutine[Any, Any, ConsumerTag]: | |
return self.queues.get(queue).consume(cb, no_ack) | |
@classmethod | |
async def create( | |
cls, | |
*args, | |
exchanges: tuple[tuple, dict] = (), | |
queues: dict[str, str] = {}, | |
**kwargs, | |
) -> "Broker": | |
connection = await aio_pika.connect_robust(*args, **kwargs) | |
channel = await connection.channel() | |
e = dict( | |
[(k, await channel.declare_exchange(**v)) for k, v in exchanges.items()] | |
) | |
q = dict([(k, await channel.declare_queue(**v)) for k, v in queues.items()]) | |
return cls(connection, channel, e, q) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment