Skip to content

Instantly share code, notes, and snippets.

@luisdelatorre012
Created November 6, 2024 02:16
Show Gist options
  • Save luisdelatorre012/f667e967192de673dc82ac6a314af062 to your computer and use it in GitHub Desktop.
Save luisdelatorre012/f667e967192de673dc82ac6a314af062 to your computer and use it in GitHub Desktop.
example of using on assign
from confluent_kafka import Consumer, TopicPartition
# Consumer configuration
conf = {
'bootstrap.servers': 'your_broker',
'group.id': 'your_group',
'enable.auto.commit': False,
'auto.offset.reset': 'none',
}
consumer = Consumer(conf)
topic = 'your_topic'
desired_partition_number = 0
desired_offset = 100
partition = TopicPartition(topic, desired_partition_number)
def on_assign(consumer, partitions):
for p in partitions:
if p.partition == desired_partition_number:
low, high = consumer.get_watermark_offsets(p)
if desired_offset < low or desired_offset >= high:
print(f"Desired offset {desired_offset} is out of range; setting to {low}")
consumer.seek(TopicPartition(p.topic, p.partition, low))
else:
p.offset = desired_offset
consumer.seek(p)
print("Partitions assigned and seek applied:", partitions)
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received message at offset {msg.offset()}: {msg.value().decode('utf-8')}")
finally:
consumer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment