Skip to content

Instantly share code, notes, and snippets.

@rajeshpv
Created February 11, 2026 06:02
Show Gist options
  • Select an option

  • Save rajeshpv/ba5e45b51cae5fda7626865813260426 to your computer and use it in GitHub Desktop.

Select an option

Save rajeshpv/ba5e45b51cae5fda7626865813260426 to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "confluent-kafka",
# ]
# ///
"""Connect to Kafka and read messages from the fcs topic."""
import os
from confluent_kafka import Consumer
BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
TOPIC = os.getenv("TOPIC_NAME")
conf = {
"bootstrap.servers": BOOTSTRAP_SERVERS,
"group.id": "kafka-test-consumer-1",
"auto.offset.reset": "earliest",
}
consumer = Consumer(conf)
consumer.subscribe([TOPIC])
try:
count = 0
while count < 10:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
key = msg.key().decode() if msg.key() else None
value = msg.value().decode() if msg.value() else None
print(f"partition={msg.partition()} offset={msg.offset()} key={key} value={value}")
count += 1
finally:
consumer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment