Skip to content

Instantly share code, notes, and snippets.

@droopy4096
Created January 12, 2017 19:53
Show Gist options
  • Select an option

  • Save droopy4096/90ac6085ad65e9ac166f81e1c72a8e16 to your computer and use it in GitHub Desktop.

Select an option

Save droopy4096/90ac6085ad65e9ac166f81e1c72a8e16 to your computer and use it in GitHub Desktop.
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import argparse
import os
KAFKA_SERVER='kafka.server.com'
KAFKA_PORT=9092
SVN_TOPIC='svnevent'
GROUP_NAME='svnbrowser'
LOOKBACK=5
# topic: svnevent
parser=argparse.ArgumentParser()
parser.add_argument("--group", dest='group',default = GROUP_NAME)
parser.add_argument("--topic", dest='topic',default = SVN_TOPIC )
args = parser.parse_args()
# consumer = KafkaConsumer(args.topic,group_id=args.group, bootstrap_servers=KAFKA_SERVER+':'+str(KAFKA_PORT), auto_offset_reset='earliest', enable_auto_commit=False)
## For successfull seek_to_beginning we need to manually assign all the partitions to ourselves
consumer = KafkaConsumer(group_id=args.group, bootstrap_servers=KAFKA_SERVER+':'+str(KAFKA_PORT), auto_offset_reset='earliest', enable_auto_commit=False)
topic_partitions=consumer.partitions_for_topic(args.topic)
# assign all those partitions
assign_partitions=[]
for p in topic_partitions:
assign_partitions.append(TopicPartition(topic=args.topic,partition=p))
consumer.assign(assign_partitions)
current_offsets={}
for p in assign_partitions:
current_offsets[p.partition]=consumer.committed(p)
for p in assign_partitions:
partition_offset=current_offsets[p.partition]
seek_offset=0
if partition_offset is None:
continue
elif partition_offset>LOOKBACK:
seek_offset=partition_offset-LOOKBACK
consumer.seek(p,seek_offset)
# consumer.seek_to_beginning(*assign_partitions)
for msg in consumer:
print msg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment