Skip to content

Instantly share code, notes, and snippets.

@s3rius
Created February 26, 2025 13:38
Show Gist options
  • Save s3rius/d33ba2bf6a576e1272cc21ebedc8f1b5 to your computer and use it in GitHub Desktop.
Save s3rius/d33ba2bf6a576e1272cc21ebedc8f1b5 to your computer and use it in GitHub Desktop.
Rustus with kafka example setup
services:
listener:
build: .
restart: always
environment:
SERVERS: kafka-0:9092,kafka-1:9092
depends_on:
kafka-0:
condition: service_healthy
kafka-1:
condition: service_healthy
rustus:
image: ghcr.io/s3rius/rustus:1.1.2
ports:
- 1081:1081
environment:
RUSTUS_SERVER_PORT: "1081"
RUSTUS_HOOKS_KAFKA_PREFIX: "rustus"
RUSTUS_HOOKS_KAFKA_URLS: "kafka-0:9092,kafka-1:9092"
RUSTUS_HOOKS_KAFKA_CLIENT_ID: "rustus-1"
RUSTUS_HOOKS_KAFKA_EXTRA_OPTIONS: "allow.auto.create.topics=true;security.protocol=plaintext"
depends_on:
kafka-0:
condition: service_healthy
kafka-1:
condition: service_healthy
kafka-0: &kafka
image: bitnami/kafka:3.9-debian-12
healthcheck:
test:
- CMD
- kafka-topics.sh
- --list
- --bootstrap-server
- localhost:9092
interval: 1s
timeout: 3s
retries: 30
restart: always
environment:
KAFKA_CFG_NODE_ID: "0"
KAFKA_KRAFT_CLUSTER_ID: "0"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,INTERNAL://:9093,KRAFT://:9094"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-0:9092,INTERNAL://kafka-0:9093,KRAFT://kafka-0:9094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT,KRAFT:PLAINTEXT"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka-0:9094"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: KRAFT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "2"
kafka-1:
<<: *kafka
environment:
KAFKA_CFG_NODE_ID: "1"
KAFKA_KRAFT_CLUSTER_ID: "0"
KAFKA_CFG_PROCESS_ROLES: "broker"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka-0:9094"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,INTERNAL://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092,INTERNAL://kafka-1:9093"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,KRAFT:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "KRAFT"
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "2"
FROM python:3.12-slim-bookworm
WORKDIR /app
RUN pip install aiokafka
COPY main.py /app/main.py
CMD ["python", "/app/main.py"]
import asyncio
from pprint import pprint
import json
import aiokafka
import os
async def main():
print("Starting up")
consumer = aiokafka.AIOKafkaConsumer(
"rustus-post-create",
"rustus-post-finish",
client_id="my-consumer",
bootstrap_servers=os.environ.get("SERVERS", "kafka-0:9092"),
enable_auto_commit=False,
auto_offset_reset="earliest",
group_id="my-group",
)
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print(
"consumed: ",
msg.topic,
msg.partition,
msg.offset,
msg.key,
msg.timestamp,
)
pprint(json.loads(msg.value))
await consumer.commit()
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment