Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Created December 26, 2024 16:40
Show Gist options
  • Save paulwinex/537e649d96c69ae8afdaf9b312b45412 to your computer and use it in GitHub Desktop.
Save paulwinex/537e649d96c69ae8afdaf9b312b45412 to your computer and use it in GitHub Desktop.
Kafka priority queues
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
PUBLISHER OUTPUT:
Message sent to topic 'low-priority-queue': low priority
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'low-priority-queue': low priority
Message sent to topic 'high-priority-queue': HI PRIORITY!!!
Message sent to topic 'low-priority-queue': low priority
WORKER OUTPUT
[High Priority queue] Message: HI PRIORITY!!!
[High Priority queue] Message: HI PRIORITY!!!
[High Priority queue] Message: HI PRIORITY!!!
[High Priority queue] Message: HI PRIORITY!!!
[High Priority queue] Message: HI PRIORITY!!!
[High Priority queue] Message: HI PRIORITY!!!
[High Priority queue] Message: HI PRIORITY!!!
[Low Priority queue] Message: low priority
[Low Priority queue] Message: low priority
[Low Priority queue] Message: low priority
import asyncio
from random import random
from aiokafka import AIOKafkaProducer
high_prio_queue = 'high-priority-queue'
low_prio_queue = 'low-priority-queue'
server_url = 'localhost:9092'
async def send_message(topic: str, message: str):
producer = AIOKafkaProducer(bootstrap_servers=server_url)
await producer.start()
try:
await producer.send_and_wait(topic, message.encode('utf-8'))
print(f"Message sent to topic '{topic}': {message}")
finally:
await producer.stop()
async def main():
for i in range(10):
if random() < 0.5:
await send_message(low_prio_queue, "low priority")
else:
await send_message(high_prio_queue, "HI PRIORITY!!!")
if __name__ == '__main__':
asyncio.run(main())
import asyncio
from aiokafka import AIOKafkaConsumer
high_prio_queue = 'high-priority-queue'
low_prio_queue = 'low-priority-queue'
server_url = 'localhost:9092'
async def process_message(priority: str, message):
print(f"[{priority} queue] Message: {message}")
await asyncio.sleep(1)
async def consume():
high_priority_consumer = AIOKafkaConsumer(
high_prio_queue,
bootstrap_servers=server_url,
group_id="priority_group",
)
normal_priority_consumer = AIOKafkaConsumer(
low_prio_queue,
bootstrap_servers=server_url,
group_id="priority_group",
)
await high_priority_consumer.start()
await normal_priority_consumer.start()
try:
while True:
try:
msg = await asyncio.wait_for(high_priority_consumer.getone(), timeout=0.1)
if msg:
await process_message("High Priority", msg.value.decode('utf-8'))
continue
except asyncio.TimeoutError:
pass
try:
msg = await asyncio.wait_for(normal_priority_consumer.getone(), timeout=0.1)
if msg:
await process_message("Low Priority", msg.value.decode('utf-8'))
continue
except asyncio.TimeoutError:
pass
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
await high_priority_consumer.stop()
await normal_priority_consumer.stop()
async def main():
await consume()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Stopped")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment