Created
February 26, 2025 13:38
-
-
Save s3rius/d33ba2bf6a576e1272cc21ebedc8f1b5 to your computer and use it in GitHub Desktop.
Rustus with kafka example setup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
services: | |
listener: | |
build: . | |
restart: always | |
environment: | |
SERVERS: kafka-0:9092,kafka-1:9092 | |
depends_on: | |
kafka-0: | |
condition: service_healthy | |
kafka-1: | |
condition: service_healthy | |
rustus: | |
image: ghcr.io/s3rius/rustus:1.1.2 | |
ports: | |
- 1081:1081 | |
environment: | |
RUSTUS_SERVER_PORT: "1081" | |
RUSTUS_HOOKS_KAFKA_PREFIX: "rustus" | |
RUSTUS_HOOKS_KAFKA_URLS: "kafka-0:9092,kafka-1:9092" | |
RUSTUS_HOOKS_KAFKA_CLIENT_ID: "rustus-1" | |
RUSTUS_HOOKS_KAFKA_EXTRA_OPTIONS: "allow.auto.create.topics=true;security.protocol=plaintext" | |
depends_on: | |
kafka-0: | |
condition: service_healthy | |
kafka-1: | |
condition: service_healthy | |
kafka-0: &kafka | |
image: bitnami/kafka:3.9-debian-12 | |
healthcheck: | |
test: | |
- CMD | |
- kafka-topics.sh | |
- --list | |
- --bootstrap-server | |
- localhost:9092 | |
interval: 1s | |
timeout: 3s | |
retries: 30 | |
restart: always | |
environment: | |
KAFKA_CFG_NODE_ID: "0" | |
KAFKA_KRAFT_CLUSTER_ID: "0" | |
KAFKA_CFG_PROCESS_ROLES: "controller,broker" | |
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,INTERNAL://:9093,KRAFT://:9094" | |
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-0:9092,INTERNAL://kafka-0:9093,KRAFT://kafka-0:9094" | |
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT,KRAFT:PLAINTEXT" | |
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka-0:9094" | |
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: KRAFT | |
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" | |
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "2" | |
kafka-1: | |
<<: *kafka | |
environment: | |
KAFKA_CFG_NODE_ID: "1" | |
KAFKA_KRAFT_CLUSTER_ID: "0" | |
KAFKA_CFG_PROCESS_ROLES: "broker" | |
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka-0:9094" | |
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,INTERNAL://:9093" | |
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-1:9092,INTERNAL://kafka-1:9093" | |
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,KRAFT:PLAINTEXT" | |
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "KRAFT" | |
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" | |
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "2" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.12-slim-bookworm | |
WORKDIR /app | |
RUN pip install aiokafka | |
COPY main.py /app/main.py | |
CMD ["python", "/app/main.py"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from pprint import pprint | |
import json | |
import aiokafka | |
import os | |
async def main(): | |
print("Starting up") | |
consumer = aiokafka.AIOKafkaConsumer( | |
"rustus-post-create", | |
"rustus-post-finish", | |
client_id="my-consumer", | |
bootstrap_servers=os.environ.get("SERVERS", "kafka-0:9092"), | |
enable_auto_commit=False, | |
auto_offset_reset="earliest", | |
group_id="my-group", | |
) | |
await consumer.start() | |
try: | |
# Consume messages | |
async for msg in consumer: | |
print( | |
"consumed: ", | |
msg.topic, | |
msg.partition, | |
msg.offset, | |
msg.key, | |
msg.timestamp, | |
) | |
pprint(json.loads(msg.value)) | |
await consumer.commit() | |
finally: | |
# Will leave consumer group; perform autocommit if enabled. | |
await consumer.stop() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment