Created
December 26, 2024 16:40
-
-
Save paulwinex/537e649d96c69ae8afdaf9b312b45412 to your computer and use it in GitHub Desktop.
Kafka priority queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:latest | |
container_name: zookeeper | |
environment: | |
ZOOKEEPER_CLIENT_PORT: 2181 | |
ZOOKEEPER_TICK_TIME: 2000 | |
ports: | |
- "2181:2181" | |
kafka: | |
image: confluentinc/cp-kafka:latest | |
container_name: kafka | |
depends_on: | |
- zookeeper | |
environment: | |
KAFKA_BROKER_ID: 1 | |
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | |
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 | |
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | |
ports: | |
- "9092:9092" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PUBLISHER OUTPUT: | |
Message sent to topic 'low-priority-queue': low priority | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'low-priority-queue': low priority | |
Message sent to topic 'high-priority-queue': HI PRIORITY!!! | |
Message sent to topic 'low-priority-queue': low priority | |
WORKER OUTPUT | |
[High Priority queue] Message: HI PRIORITY!!! | |
[High Priority queue] Message: HI PRIORITY!!! | |
[High Priority queue] Message: HI PRIORITY!!! | |
[High Priority queue] Message: HI PRIORITY!!! | |
[High Priority queue] Message: HI PRIORITY!!! | |
[High Priority queue] Message: HI PRIORITY!!! | |
[High Priority queue] Message: HI PRIORITY!!! | |
[Low Priority queue] Message: low priority | |
[Low Priority queue] Message: low priority | |
[Low Priority queue] Message: low priority |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from random import random | |
from aiokafka import AIOKafkaProducer | |
high_prio_queue = 'high-priority-queue' | |
low_prio_queue = 'low-priority-queue' | |
server_url = 'localhost:9092' | |
async def send_message(topic: str, message: str): | |
producer = AIOKafkaProducer(bootstrap_servers=server_url) | |
await producer.start() | |
try: | |
await producer.send_and_wait(topic, message.encode('utf-8')) | |
print(f"Message sent to topic '{topic}': {message}") | |
finally: | |
await producer.stop() | |
async def main(): | |
for i in range(10): | |
if random() < 0.5: | |
await send_message(low_prio_queue, "low priority") | |
else: | |
await send_message(high_prio_queue, "HI PRIORITY!!!") | |
if __name__ == '__main__': | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from aiokafka import AIOKafkaConsumer | |
high_prio_queue = 'high-priority-queue' | |
low_prio_queue = 'low-priority-queue' | |
server_url = 'localhost:9092' | |
async def process_message(priority: str, message): | |
print(f"[{priority} queue] Message: {message}") | |
await asyncio.sleep(1) | |
async def consume(): | |
high_priority_consumer = AIOKafkaConsumer( | |
high_prio_queue, | |
bootstrap_servers=server_url, | |
group_id="priority_group", | |
) | |
normal_priority_consumer = AIOKafkaConsumer( | |
low_prio_queue, | |
bootstrap_servers=server_url, | |
group_id="priority_group", | |
) | |
await high_priority_consumer.start() | |
await normal_priority_consumer.start() | |
try: | |
while True: | |
try: | |
msg = await asyncio.wait_for(high_priority_consumer.getone(), timeout=0.1) | |
if msg: | |
await process_message("High Priority", msg.value.decode('utf-8')) | |
continue | |
except asyncio.TimeoutError: | |
pass | |
try: | |
msg = await asyncio.wait_for(normal_priority_consumer.getone(), timeout=0.1) | |
if msg: | |
await process_message("Low Priority", msg.value.decode('utf-8')) | |
continue | |
except asyncio.TimeoutError: | |
pass | |
await asyncio.sleep(1) | |
except asyncio.CancelledError: | |
pass | |
finally: | |
await high_priority_consumer.stop() | |
await normal_priority_consumer.stop() | |
async def main(): | |
await consume() | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
print("Stopped") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment