Skip to content

Instantly share code, notes, and snippets.

@s3rius
Created July 23, 2023 10:19
Show Gist options
  • Save s3rius/bb334fbafc7131629115fc96aaf3f473 to your computer and use it in GitHub Desktop.
Save s3rius/bb334fbafc7131629115fc96aaf3f473 to your computer and use it in GitHub Desktop.
Taskiq router for multiple brokers

This router manages to operate with multiple brokers. The idea is pretty straightforward, you define which broker runs on which node, they use the same url, but different queues.

Running default worker command will not work, since it wouldn't be able to decide which broker to run. To surpass this issue, we can define custom receiver class, which picks a broker based on node parameter.

Now insted of

taskiq worker a:router

We shoud run

taskiq worker a:router --receiver router:RoutedReceiver --receiver_arg node=NODE1

Of course, on different nodes you would have to specify different node arguments to our receiver.

import asyncio
import enum
from taskiq import Context, TaskiqDepends
from taskiq_redis import ListQueueBroker
from router import TaskiqRouter
class Nodes(enum.StrEnum):
NODE1 = "NODE1"
NODE2 = "NODE2"
NODE3 = "NODE3"
router = TaskiqRouter(
brokers={
Nodes.NODE1.value: ListQueueBroker("redis://localhost/0", queue_name="node1"),
Nodes.NODE2.value: ListQueueBroker("redis://localhost/0", queue_name="node2"),
Nodes.NODE3.value: ListQueueBroker("redis://localhost/0", queue_name="node3"),
}
)
@router.task
async def my_task(context: Context = TaskiqDepends()):
print(context.broker.queue_name)
async def main():
await router.startup()
await my_task.kicker().with_labels(node=Nodes.NODE1.value).kiq()
await my_task.kicker().with_labels(node=Nodes.NODE2.value).kiq()
await my_task.kicker().with_labels(node=Nodes.NODE3.value).kiq()
await router.shutdown()
if __name__ == "__main__":
asyncio.run(main())
from concurrent.futures import Executor
from typing import AsyncGenerator, Dict, Optional
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
from taskiq.receiver import Receiver
class TaskiqRouter(AsyncBroker):
def __init__(self, brokers: Dict[str, AsyncBroker]) -> None:
self.brokers = brokers
super().__init__()
async def startup(self) -> None:
for broker in self.brokers.values():
await broker.startup()
async def shutdown(self) -> None:
for broker in self.brokers.values():
await broker.shutdown()
async def kick(self, message: BrokerMessage) -> None:
node = message.labels.get("node")
if node is None:
raise ValueError("Node cannot be empty")
broker = self.brokers.get(node)
if broker is None:
raise ValueError("Broker not assigned for this node")
await broker.kick(message)
def listen(self) -> AsyncGenerator[bytes | AckableMessage, None]:
raise ValueError(
"This is router. To listen to specific broker, use "
"--receiver taskiq_router.router:RoutedReceiver "
" --receiver_arg assigned_node={node}"
)
class RoutedReceiver(Receiver):
def __init__(
self,
broker: TaskiqRouter,
executor: Executor | None = None,
validate_params: bool = True,
max_async_tasks: int | None = None,
max_prefetch: int = 0,
propagate_exceptions: bool = True,
node: Optional[str] = None,
) -> None:
if node is None:
raise ValueError("Please specify node")
broker = broker.brokers.get(node)
if broker is None:
raise ValueError("Unknown node.")
super().__init__(
broker,
executor,
validate_params,
max_async_tasks,
max_prefetch,
propagate_exceptions,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment