Skip to content

Instantly share code, notes, and snippets.

@edenhill
Last active April 7, 2020 16:50
Show Gist options
  • Save edenhill/628f4cbfc97f7b213c1938957b2b5d91 to your computer and use it in GitHub Desktop.
Save edenhill/628f4cbfc97f7b213c1938957b2b5d91 to your computer and use it in GitHub Desktop.
Start consumer at latest message
#!/usr/bin/env python
from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_END
import sys
if __name__ == '__main__':
broker = "localhost:51895"
topics = ["test"]
group = "mygroup"
# Create Consumer instance
c = Consumer({'bootstrap.servers': broker, 'group.id': group,
'enable.auto.commit': False, # no need for commits when setting a logical fixed start offset
'auto.offset.reset': 'smallest'})
def do_assign(consumer, partitions):
print('Assignment:', partitions)
for tp in partitions:
lo, hi = consumer.get_watermark_offsets(tp)
if hi <= 0:
# No previous offset (empty partition): skip to end
tp.offset = OFFSET_END
else:
tp.offset = hi - 1
print('Setting start offset for %s' % tp)
consumer.assign(partitions)
# Subscribe to topics
c.subscribe(topics, on_assign=do_assign)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
# Error
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
# Close down consumer
c.close()
@MayankSharma-MS
Copy link

@edenhill i can't thank you enough for pointing out powers of do_assign(). This marks end of my 3 days of struggle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment