Created
January 12, 2017 19:53
-
-
Save droopy4096/90ac6085ad65e9ac166f81e1c72a8e16 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from kafka import KafkaConsumer | |
| from kafka.structs import TopicPartition | |
| import argparse | |
| import os | |
| KAFKA_SERVER='kafka.server.com' | |
| KAFKA_PORT=9092 | |
| SVN_TOPIC='svnevent' | |
| GROUP_NAME='svnbrowser' | |
| LOOKBACK=5 | |
| # topic: svnevent | |
| parser=argparse.ArgumentParser() | |
| parser.add_argument("--group", dest='group',default = GROUP_NAME) | |
| parser.add_argument("--topic", dest='topic',default = SVN_TOPIC ) | |
| args = parser.parse_args() | |
| # consumer = KafkaConsumer(args.topic,group_id=args.group, bootstrap_servers=KAFKA_SERVER+':'+str(KAFKA_PORT), auto_offset_reset='earliest', enable_auto_commit=False) | |
| ## For successfull seek_to_beginning we need to manually assign all the partitions to ourselves | |
| consumer = KafkaConsumer(group_id=args.group, bootstrap_servers=KAFKA_SERVER+':'+str(KAFKA_PORT), auto_offset_reset='earliest', enable_auto_commit=False) | |
| topic_partitions=consumer.partitions_for_topic(args.topic) | |
| # assign all those partitions | |
| assign_partitions=[] | |
| for p in topic_partitions: | |
| assign_partitions.append(TopicPartition(topic=args.topic,partition=p)) | |
| consumer.assign(assign_partitions) | |
| current_offsets={} | |
| for p in assign_partitions: | |
| current_offsets[p.partition]=consumer.committed(p) | |
| for p in assign_partitions: | |
| partition_offset=current_offsets[p.partition] | |
| seek_offset=0 | |
| if partition_offset is None: | |
| continue | |
| elif partition_offset>LOOKBACK: | |
| seek_offset=partition_offset-LOOKBACK | |
| consumer.seek(p,seek_offset) | |
| # consumer.seek_to_beginning(*assign_partitions) | |
| for msg in consumer: | |
| print msg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment