Last active
April 7, 2020 16:50
-
-
Save edenhill/628f4cbfc97f7b213c1938957b2b5d91 to your computer and use it in GitHub Desktop.
Start consumer at latest message
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_END | |
import sys | |
if __name__ == '__main__': | |
broker = "localhost:51895" | |
topics = ["test"] | |
group = "mygroup" | |
# Create Consumer instance | |
c = Consumer({'bootstrap.servers': broker, 'group.id': group, | |
'enable.auto.commit': False, # no need for commits when setting a logical fixed start offset | |
'auto.offset.reset': 'smallest'}) | |
def do_assign(consumer, partitions): | |
print('Assignment:', partitions) | |
for tp in partitions: | |
lo, hi = consumer.get_watermark_offsets(tp) | |
if hi <= 0: | |
# No previous offset (empty partition): skip to end | |
tp.offset = OFFSET_END | |
else: | |
tp.offset = hi - 1 | |
print('Setting start offset for %s' % tp) | |
consumer.assign(partitions) | |
# Subscribe to topics | |
c.subscribe(topics, on_assign=do_assign) | |
# Read messages from Kafka, print to stdout | |
try: | |
while True: | |
msg = c.poll(timeout=1.0) | |
if msg is None: | |
continue | |
if msg.error(): | |
# Error or event | |
if msg.error().code() == KafkaError._PARTITION_EOF: | |
# End of partition event | |
sys.stderr.write('%% %s [%d] reached end at offset %d\n' % | |
(msg.topic(), msg.partition(), msg.offset())) | |
elif msg.error(): | |
# Error | |
raise KafkaException(msg.error()) | |
else: | |
# Proper message | |
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % | |
(msg.topic(), msg.partition(), msg.offset(), | |
str(msg.key()))) | |
print(msg.value()) | |
except KeyboardInterrupt: | |
sys.stderr.write('%% Aborted by user\n') | |
# Close down consumer | |
c.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@edenhill i can't thank you enough for pointing out powers of do_assign(). This marks end of my 3 days of struggle.