Skip to content

Instantly share code, notes, and snippets.

@billydh
Last active April 29, 2020 07:42
Show Gist options
  • Select an option

  • Save billydh/ac1a92c969b4c4fd00cec3bf9f09e1f3 to your computer and use it in GitHub Desktop.

Select an option

Save billydh/ac1a92c969b4c4fd00cec3bf9f09e1f3 to your computer and use it in GitHub Desktop.
A Python Avro Consumer
from confluent_kafka.avro import AvroConsumer
from utils.parse_command_line_args import parse_command_line_args
def consume_record(args):
default_group_name = "default-consumer-group"
consumer_config = {"bootstrap.servers": args.bootstrap_servers,
"schema.registry.url": args.schema_registry,
"group.id": default_group_name,
"auto.offset.reset": "earliest"}
consumer = AvroConsumer(consumer_config)
consumer.subscribe([args.topic])
try:
message = consumer.poll(5)
except Exception as e:
print(f"Exception while trying to poll messages - {e}")
else:
if message:
print(f"Successfully poll a record from "
f"Kafka topic: {message.topic()}, partition: {message.partition()}, offset: {message.offset()}\n"
f"message key: {message.key()} || message value: {message.value()}")
consumer.commit()
else:
print("No new messages at this point. Try again later.")
consumer.close()
if __name__ == "__main__":
consume_record(parse_command_line_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment