Created
April 14, 2021 13:28
-
-
Save mze3e/de9d1b43759d26cbfea142f5257cc30c to your computer and use it in GitHub Desktop.
Python Kafka Consumer Examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#https://www.programcreek.com/python/example/98440/kafka.KafkaConsumer | |
##Project: scrapy-cluster Author: istresearch File: kafka_monitor.py License: MIT License | |
def _create_consumer(self): | |
"""Tries to establing the Kafka consumer connection""" | |
try: | |
brokers = self.settings['KAFKA_HOSTS'] | |
self.logger.debug("Creating new kafka consumer using brokers: " + | |
str(brokers) + ' and topic ' + | |
self.settings['KAFKA_INCOMING_TOPIC']) | |
return KafkaConsumer( | |
self.settings['KAFKA_INCOMING_TOPIC'], | |
group_id=self.settings['KAFKA_GROUP'], | |
bootstrap_servers=brokers, | |
consumer_timeout_ms=self.settings['KAFKA_CONSUMER_TIMEOUT'], | |
auto_offset_reset=self.settings['KAFKA_CONSUMER_AUTO_OFFSET_RESET'], | |
auto_commit_interval_ms=self.settings['KAFKA_CONSUMER_COMMIT_INTERVAL_MS'], | |
enable_auto_commit=self.settings['KAFKA_CONSUMER_AUTO_COMMIT_ENABLE'], | |
max_partition_fetch_bytes=self.settings['KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES']) | |
except KeyError as e: | |
self.logger.error('Missing setting named ' + str(e), | |
{'ex': traceback.format_exc()}) | |
except: | |
self.logger.error("Couldn't initialize kafka consumer for topic", | |
{'ex': traceback.format_exc(), | |
'topic': self.settings['KAFKA_INCOMING_TOPIC']}) | |
raise | |
##Example 2 | |
##Project: scrapy-cluster Author: istresearch File: rest_service.py License: MIT License | |
def _create_consumer(self): | |
"""Tries to establing the Kafka consumer connection""" | |
if not self.closed: | |
try: | |
self.logger.debug("Creating new kafka consumer using brokers: " + | |
str(self.settings['KAFKA_HOSTS']) + ' and topic ' + | |
self.settings['KAFKA_TOPIC_PREFIX'] + | |
".outbound_firehose") | |
return KafkaConsumer( | |
self.settings['KAFKA_TOPIC_PREFIX'] + ".outbound_firehose", | |
group_id=None, | |
bootstrap_servers=self.settings['KAFKA_HOSTS'], | |
consumer_timeout_ms=self.settings['KAFKA_CONSUMER_TIMEOUT'], | |
auto_offset_reset=self.settings['KAFKA_CONSUMER_AUTO_OFFSET_RESET'], | |
auto_commit_interval_ms=self.settings['KAFKA_CONSUMER_COMMIT_INTERVAL_MS'], | |
enable_auto_commit=self.settings['KAFKA_CONSUMER_AUTO_COMMIT_ENABLE'], | |
max_partition_fetch_bytes=self.settings['KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES']) | |
except KeyError as e: | |
self.logger.error('Missing setting named ' + str(e), | |
{'ex': traceback.format_exc()}) | |
except: | |
self.logger.error("Couldn't initialize kafka consumer for topic", | |
{'ex': traceback.format_exc()}) | |
raise | |
##Example 3 | |
##Project: open-nti Author: Juniper File: open_nti_input_syslog_lib.py License: Apache License 2.0 | |
def check_kafka_msg(topic='events', nbr_msg=100): | |
## Collect Messages from Bus | |
consumer = KafkaConsumer( | |
bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT), | |
auto_offset_reset='earliest') | |
consumer.subscribe([topic]) | |
counter = 0 | |
for message in consumer: | |
counter = counter + 1 | |
if counter == nbr_msg: | |
break | |
return counter | |
##Example 4 | |
##Project: operations-software-druid_exporter Author: wikimedia File: collector.py License: Apache License 2.0 | |
def pull_datapoints_from_kafka(self, kafka_config, stop_threads): | |
log.debug('Kafka datapoints puller thread starting..') | |
consumer = KafkaConsumer( | |
kafka_config['topic'], | |
group_id=kafka_config['group_id'], | |
bootstrap_servers=kafka_config['bootstrap_servers']) | |
while True and not stop_threads.isSet(): | |
consumer.poll() | |
for message in consumer: | |
try: | |
json_message = json.loads(message.value.decode()) | |
log.debug('Datapoint from kafka: %s', json_message) | |
if type(json_message) == list: | |
for datapoint in json_message: | |
self.register_datapoint(datapoint) | |
else: | |
self.register_datapoint(json_message) | |
except json.JSONDecodeError: | |
log.exception("Failed to decode message from Kafka, skipping..") | |
except Exception as e: | |
log.exception("Generic exception while pulling datapoints from Kafka") | |
log.debug('Kafka datapoints puller thread shutting down..') | |
#Example 5 | |
#Project: fooltrader Author: foolcage File: base_bot.py License: MIT License | |
def run(self): | |
self.logger.info("start bot:{}".format(self)) | |
funcs = set(dir(self)) & self.func_map_topic.keys() | |
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_HOST]) | |
current_topics = consumer.topics() | |
for func in funcs: | |
topic = self.func_map_topic.get(func) | |
if topic not in current_topics: | |
self.logger.exception("you implement func:{},but the topic:{} for it not exist".format(func, topic)) | |
continue | |
self.threads.append( | |
threading.Thread(target=self.consume_topic_with_func, args=(self.func_map_topic.get(func), func))) | |
for the_thread in self.threads: | |
the_thread.start() | |
self.consume_topic_with_func(self.quote_topic, 'on_event') | |
self.logger.info("finish bot:{}".format(self)) | |
#Example 6 | |
#Project: fooltrader Author: foolcage File: bot.py License: MIT License | |
def run(self): | |
self.logger.info("start bot:{}".format(self)) | |
funcs = set(dir(self)) & self.func_map_topic.keys() | |
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_HOST]) | |
current_topics = consumer.topics() | |
for func in funcs: | |
topic = self.func_map_topic.get(func) | |
if topic not in current_topics: | |
self.logger.exception("you implement func:{},but the topic:{} for it not exist".format(func, topic)) | |
continue | |
self._threads.append( | |
threading.Thread(target=self.consume_topic_with_func, args=(self.func_map_topic.get(func), func))) | |
for the_thread in self._threads: | |
the_thread.start() | |
self.consume_topic_with_func(self.quote_topic, 'on_event') | |
self.logger.info("finish bot:{}".format(self)) | |
#Example 7 | |
#Project: platypush Author: BlackLight File: __init__.py License: MIT License | |
def run(self): | |
from kafka import KafkaConsumer | |
super().run() | |
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) | |
self.logger.info('Initialized kafka backend - server: {}, topic: {}' | |
.format(self.server, self.topic)) | |
try: | |
for msg in self.consumer: | |
self._on_record(msg) | |
if self.should_stop(): break | |
except Exception as e: | |
self.logger.warning('Kafka connection error, reconnecting in {} seconds'. | |
format(self._conn_retry_secs)) | |
self.logger.exception(e) | |
time.sleep(self._conn_retry_secs) | |
# vim:sw=4:ts=4:et: | |
#Example 8 | |
#Project: py-timeexecution Author: kpn-digital File: test_kafka.py License: Apache License 2.0 | |
def _query_backend(self): | |
consumer = KafkaConsumer( | |
bootstrap_servers=KAFKA_HOST, value_deserializer=lambda v: JSONSerializer().loads(v.decode('utf-8')) | |
) | |
tp = TopicPartition(self.topic, 0) | |
consumer.assign([tp]) | |
count = consumer.position(tp) | |
consumer.seek(tp, 0) | |
metrics = [] | |
for i in range(count): | |
metrics.append(next(consumer)) | |
return metrics | |
#Example 9 | |
#Project: scrapy-kafka-export Author: TeamHG-Memex File: test_extension.py License: MIT License | |
def setup_class(cls): | |
cls.broker = os.getenv('KAFKA_BROKER') | |
if not cls.topic: | |
topic = "%s-%s" % ('topic_test_', random_string(10)) | |
cls.topic = topic | |
create_topic(cls.topic) | |
cls._deserializer = ScrapyJSONDecoder() | |
cls.consumer = KafkaConsumer( | |
bootstrap_servers=[cls.broker], | |
auto_offset_reset='earliest', | |
group_id=None, | |
value_deserializer=lambda x: | |
cls._deserializer.decode(x.decode('utf8')) | |
) | |
cls.consumer.subscribe([cls.topic]) | |
#Example 10 | |
#Project: kzmonitor Author: tqlihuiqi File: client.py License: MIT License | |
def getOffsets(self, topic, partitions, group): | |
""" 指定topic、partition和group, 返回offsets数据 """ | |
try: | |
# 尝试使用zookeeper-storage api获取offsets数据 | |
# 未获得指定group的offsets数据将抛出UnknownTopicOrPartitionError异常 | |
tp = self.client.send_offset_fetch_request(group, [OffsetRequestPayload(topic, p, -1, 1) for p in partitions]) | |
offsets = {p.partition: p.offset for p in tp} | |
except UnknownTopicOrPartitionError: | |
# 收到异常后使用kafka-storage api获取offsets数据 | |
consumer = KafkaConsumer(group_id=group, bootstrap_servers=self.broker, enable_auto_commit=False) | |
tp = [TopicPartition(topic, p) for p in partitions] | |
consumer.assign(tp) | |
offsets = {p.partition: consumer.position(p) for p in tp} | |
return offsets | |
#Example 11 | |
#Project: kq Author: joowani File: test_worker.py License: MIT License | |
def test_worker_properties(worker, hosts, topic, group): | |
assert hosts in repr(worker) | |
assert topic in repr(worker) | |
assert group in repr(worker) | |
assert worker.consumer.config['bootstrap_servers'] == hosts | |
assert worker.consumer.config['group_id'] == group | |
assert isinstance(worker.hosts, str) and worker.hosts == hosts | |
assert isinstance(worker.topic, str) and worker.topic == topic | |
assert isinstance(worker.group, str) and worker.group == group | |
assert isinstance(worker.consumer, KafkaConsumer) | |
assert callable(worker.deserializer) | |
assert callable(worker.callback) or worker.callback is None | |
# noinspection PyTypeChecker | |
#Example 12 | |
#Project: kq Author: joowani File: test_worker.py License: MIT License | |
def test_worker_initialization_with_bad_args(hosts, consumer): | |
with pytest.raises(AssertionError) as e: | |
Worker(topic=True, consumer=consumer) | |
assert str(e.value) == 'topic must be a str' | |
with pytest.raises(AssertionError) as e: | |
Worker(topic='topic', consumer='bar') | |
assert str(e.value) == 'bad consumer instance' | |
with pytest.raises(AssertionError) as e: | |
bad_consumer = KafkaConsumer(bootstrap_servers=hosts) | |
Worker(topic='topic', consumer=bad_consumer) | |
assert str(e.value) == 'consumer must have group_id' | |
with pytest.raises(AssertionError) as e: | |
Worker(topic='topic', consumer=consumer, callback=1) | |
assert str(e.value) == 'callback must be a callable' | |
with pytest.raises(AssertionError) as e: | |
Worker(topic='topic', consumer=consumer, deserializer=1) | |
assert str(e.value) == 'deserializer must be a callable' | |
with pytest.raises(AssertionError) as e: | |
Worker(topic='topic', consumer=consumer, logger=1) | |
assert str(e.value) == 'bad logger instance' | |
#Example 13 | |
#Project: Ad-Insertion-Sample Author: OpenVisualCloud File: messaging.py License: BSD 3-Clause "New" or "Revised" License | |
def debug(self, topic): | |
c=KafkaConsumer(bootstrap_servers=kafka_hosts, client_id=self._client_id , group_id=None, api_version=(0,10)) | |
# assign/subscribe topic | |
partitions=c.partitions_for_topic(topic) | |
if not partitions: raise Exception("Topic "+topic+" not exist") | |
c.assign([TopicPartition(topic,p) for p in partitions]) | |
# seek to beginning if needed | |
c.seek_to_beginning() | |
# fetch messages | |
while True: | |
partitions=c.poll(100) | |
if partitions: | |
for p in partitions: | |
for msg in partitions[p]: | |
yield msg.value.decode('utf-8') | |
yield "" | |
c.close() | |
#Example 14 | |
#Project: karapace Author: aiven File: consumer_manager.py License: Apache License 2.0 | |
def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data): | |
while True: | |
try: | |
c = KafkaConsumer( | |
bootstrap_servers=self.config["bootstrap_uri"], | |
client_id=internal_name, | |
security_protocol=self.config["security_protocol"], | |
ssl_cafile=self.config["ssl_cafile"], | |
ssl_certfile=self.config["ssl_certfile"], | |
ssl_keyfile=self.config["ssl_keyfile"], | |
group_id=group_name, | |
fetch_min_bytes=fetch_min_bytes, | |
fetch_max_bytes=self.config["consumer_request_max_bytes"], | |
request_timeout_ms=request_data["consumer.request.timeout.ms"], | |
enable_auto_commit=request_data["auto.commit.enable"], | |
auto_offset_reset=request_data["auto.offset.reset"] | |
) | |
return c | |
except: # pylint: disable=bare-except | |
self.log.exception("Unable to create consumer, retrying") | |
await asyncio.sleep(1) | |
#Example 15 | |
#Project: InsightAgent Author: insightfinder File: getmessages_kafka2.py License: Apache License 2.0 | |
def start_data_processing(thread_number): | |
# open consumer | |
consumer = KafkaConsumer(**agent_config_vars['kafka_kwargs']) | |
logger.info('Started consumer number ' + str(thread_number)) | |
# subscribe to given topics | |
consumer.subscribe(agent_config_vars['topics']) | |
logger.info('Successfully subscribed to topics' + str(agent_config_vars['topics'])) | |
# start consuming messages | |
parse_messages_kafka(consumer) | |
consumer.close() | |
logger.info('Closed consumer number ' + str(thread_number)) | |
#Example 16 | |
#Project: dino Author: thenetcircle File: kafka_to_rabbitmq.py License: Apache License 2.0 | |
def run(self) -> None: | |
self.create_loggers() | |
logger.info('sleeping for 3 second before consuming') | |
time.sleep(3) | |
kafka_conf = self.conf.get(ConfigKeys.EXTERNAL_QUEUE) | |
bootstrap_servers = kafka_conf.get(ConfigKeys.HOST) | |
logger.info('bootstrapping from servers: %s' % (str(bootstrap_servers))) | |
topic_name = kafka_conf.get(ConfigKeys.QUEUE) | |
logger.info('consuming from topic {}'.format(topic_name)) | |
group_id = 'dino-kafka-to-rabbitmq' | |
logger.info('using Group ID {}'.format(group_id)) | |
self.consumer = KafkaConsumer( | |
topic_name, | |
group_id=group_id, | |
bootstrap_servers=bootstrap_servers, | |
enable_auto_commit=True, | |
connections_max_idle_ms=180 * ONE_MINUTE, # default: 9min | |
max_poll_interval_ms=10 * ONE_MINUTE, # default: 5min | |
session_timeout_ms=ONE_MINUTE, # default: 10s | |
max_poll_records=10 # default: 500 | |
) | |
while True: | |
try: | |
self.try_to_read() | |
except InterruptedError: | |
logger.info('got interrupted, shutting down...') | |
break | |
except Exception as e: | |
logger.error('could not read from kafka: {}'.format(str(e))) | |
logger.exception(e) | |
time.sleep(1) | |
#Example 17 | |
#Project: scrapy-cluster Author: istresearch File: online.py License: MIT License | |
def setUp(self): | |
self.settings = get_project_settings() | |
self.settings.set('KAFKA_TOPIC_PREFIX', "demo_test") | |
# set up redis | |
self.redis_conn = redis.Redis(host=self.settings['REDIS_HOST'], | |
port=self.settings['REDIS_PORT'], | |
db=self.settings['REDIS_DB']) | |
try: | |
self.redis_conn.info() | |
except ConnectionError: | |
print("Could not connect to Redis") | |
# plugin is essential to functionality | |
sys.exit(1) | |
# clear out older test keys if any | |
keys = self.redis_conn.keys("test-spider:*") | |
for key in keys: | |
self.redis_conn.delete(key) | |
# set up kafka to consumer potential result | |
self.consumer = KafkaConsumer( | |
"demo_test.crawled_firehose", | |
bootstrap_servers=self.settings['KAFKA_HOSTS'], | |
group_id="demo-id", | |
auto_commit_interval_ms=10, | |
consumer_timeout_ms=5000, | |
auto_offset_reset='earliest' | |
) | |
time.sleep(1) | |
#Example 18 | |
#Project: scrapy-cluster Author: istresearch File: online.py License: MIT License | |
def setUp(self): | |
self.redis_monitor = RedisMonitor("localsettings.py") | |
self.redis_monitor.settings = self.redis_monitor.wrapper.load("localsettings.py") | |
self.redis_monitor.logger = MagicMock() | |
self.redis_monitor.settings['KAFKA_TOPIC_PREFIX'] = "demo_test" | |
self.redis_monitor.settings['STATS_TOTAL'] = False | |
self.redis_monitor.settings['STATS_PLUGINS'] = False | |
self.redis_monitor.settings['PLUGINS'] = { | |
'plugins.info_monitor.InfoMonitor': None, | |
'plugins.stop_monitor.StopMonitor': None, | |
'plugins.expire_monitor.ExpireMonitor': None, | |
'tests.online.CustomMonitor': 100, | |
} | |
self.redis_monitor.redis_conn = redis.Redis( | |
host=self.redis_monitor.settings['REDIS_HOST'], | |
port=self.redis_monitor.settings['REDIS_PORT'], | |
db=self.redis_monitor.settings['REDIS_DB']) | |
self.redis_monitor._load_plugins() | |
self.redis_monitor.stats_dict = {} | |
self.consumer = KafkaConsumer( | |
"demo_test.outbound_firehose", | |
bootstrap_servers=self.redis_monitor.settings['KAFKA_HOSTS'], | |
group_id="demo-id", | |
auto_commit_interval_ms=10, | |
consumer_timeout_ms=5000, | |
auto_offset_reset='earliest' | |
) | |
sleep(1) | |
#Example 19 | |
#Project: sniffer Author: threathunterX File: kafkadriver.py License: Apache License 2.0 | |
def start(self): | |
self.consumer = KafkaConsumer(self.topics,**self.config) | |
self.bg_task = run_in_thread(self.bg_processing) | |
#Example 20 | |
#Project: sniffer Author: threathunterX File: sniffer_nebula_test.py License: Apache License 2.0 | |
def __init__(self, bootstrap_servers, kafkatopic): | |
self.kafkatopic = kafkatopic | |
self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_servers) | |
#Example 21 | |
#Project: search-MjoLniR Author: wikimedia File: client.py License: MIT License | |
def offsets_for_times(consumer, partitions, timestamp): | |
"""Augment KafkaConsumer.offsets_for_times to not return None | |
Parameters | |
---------- | |
consumer : kafka.KafkaConsumer | |
This consumer must only be used for collecting metadata, and not | |
consuming. API's will be used that invalidate consuming. | |
partitions : list of kafka.TopicPartition | |
timestamp : number | |
Timestamp, in seconds since unix epoch, to return offsets for. | |
Returns | |
------- | |
dict from kafka.TopicPartition to integer offset | |
""" | |
# Kafka uses millisecond timestamps | |
timestamp_ms = int(timestamp * 1000) | |
response = consumer.offsets_for_times({p: timestamp_ms for p in partitions}) | |
offsets = {} | |
for tp, offset_and_timestamp in response.items(): | |
if offset_and_timestamp is None: | |
# No messages exist after timestamp. Fetch latest offset. | |
consumer.assign([tp]) | |
consumer.seek_to_end(tp) | |
offsets[tp] = consumer.position(tp) | |
else: | |
offsets[tp] = offset_and_timestamp.offset | |
return offsets | |
#Example 22 | |
#Project: search-MjoLniR Author: wikimedia File: client.py License: MIT License | |
def offset_range_for_timestamp_range(brokers, start, end, topic): | |
"""Determine OffsetRange for a given timestamp range | |
Parameters | |
---------- | |
client_config : ClientConfig | |
start : number | |
Unix timestamp in seconds | |
end : number | |
Unix timestamp in seconds | |
topic : str | |
Topic to fetch offsets for | |
Returns | |
------- | |
list of OffsetRange or None | |
Per-partition ranges of offsets to read | |
""" | |
consumer = kafka.KafkaConsumer(bootstrap_servers=brokers) | |
partitions = consumer.partitions_for_topic(topic) | |
if partitions is None: | |
# Topic does not exist. | |
return None | |
partitions = [kafka.TopicPartition(topic, p) for p in partitions] | |
o_start = offsets_for_times(consumer, partitions, start) | |
o_end = offsets_for_times(consumer, partitions, end) | |
return [OffsetRange(tp, o_start[tp], o_end[tp]) for tp in partitions] | |
#Example 23 | |
#Project: distributed_framework Author: ydf0509 File: kafka_consumer.py License: Apache License 2.0 | |
def _shedual_task(self): | |
self._producer = KafkaProducer(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS) | |
consumer = OfficialKafkaConsumer(self._queue_name, bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS, | |
group_id='frame_group', enable_auto_commit=True) | |
# REMIND 由于是很高数量的并发消费,线程很多,分区很少,这里设置成自动确认消费了,否则多线程提交同一个分区的偏移量导致超前错乱,就没有意义了。 | |
# REMIND 要保证很高的可靠性和一致性,请用rabbitmq。 | |
# REMIND 好处是并发高。topic像翻书一样,随时可以设置偏移量重新消费。多个分组消费同一个主题,每个分组对相同主题的偏移量互不干扰。 | |
for message in consumer: | |
# 注意: message ,value都是原始的字节数据,需要decode | |
self.logger.debug( | |
f'从kafka的 [{message.topic}] 主题,分区 {message.partition} 中 取出的消息是: {message.value.decode()}') | |
kw = {'consumer': consumer, 'message': message, 'body': json.loads(message.value)} | |
self._submit_task(kw) | |
#Example 24 | |
#Project: open-nti Author: Juniper File: open_nti_input_syslog_lib.py License: Apache License 2.0 | |
def check_kafka_is_running(): | |
# Verify we can connect to Kafka | |
time.sleep(2) | |
consumer = KafkaConsumer(bootstrap_servers=get_external_ip()+':'+str(KAFKA_BROKER_PORT), | |
auto_offset_reset='earliest') | |
mytopic = consumer.topics() | |
return 1 | |
#Example 25 | |
#Project: rafiki Author: nginyc File: inference_cache.py License: Apache License 2.0 | |
def take_prediction_for_worker(self, worker_id: str, query_id: str) -> Union[Prediction, None]: | |
name = f'workers_{worker_id}_{query_id}_prediction' | |
prediction_consumer = KafkaConsumer(name, bootstrap_servers=self.connection_url, auto_offset_reset='earliest', group_id=PREDICTIONS_QUEUE) | |
prediction = None | |
try: | |
prediction = next(prediction_consumer).value | |
prediction_consumer.commit() | |
prediction = pickle.loads(prediction) | |
except KafkaError: | |
pass | |
prediction_consumer.close() | |
logger.info(f'Took prediction for query "{query_id}" from worker "{worker_id}"') | |
return prediction | |
#Example 26 | |
#Project: rafiki Author: nginyc File: inference_cache.py License: Apache License 2.0 | |
def pop_queries_for_worker(self, worker_id: str, batch_size: int) -> List[Query]: | |
name = f'workers_{worker_id}_queries' | |
query_consumer = KafkaConsumer(name, bootstrap_servers=self.connection_url, auto_offset_reset='earliest', group_id=QUERIES_QUEUE) | |
partition = TopicPartition(name, 0) | |
partitiondic = query_consumer.end_offsets([partition]) | |
offsetend = partitiondic.get(partition, None) | |
if offsetend == 0: | |
query_consumer.close() | |
return [] | |
try: | |
queries = [] | |
while True: | |
record = next(query_consumer) | |
queries.append(record.value) | |
query_consumer.commit() | |
if record.offset >= offsetend-1 or len(queries) == batch_size: | |
break | |
queries = [pickle.loads(x) for x in queries] | |
query_consumer.close() | |
return queries | |
except KafkaError: | |
query_consumer.close() | |
return [] | |
#Example 27 | |
#Project: fooltrader Author: foolcage File: kafka_connector.py License: MIT License | |
def list_topics(): | |
try: | |
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_HOST]) | |
return consumer.topics() | |
finally: | |
consumer.close() | |
#Example 28 | |
#Project: fooltrader Author: foolcage File: kafka_utils.py License: MIT License | |
def get_latest_timestamp_order_from_topic(topic): | |
consumer = KafkaConsumer(topic, | |
# client_id='fooltrader', | |
# group_id='fooltrader', | |
value_deserializer=lambda m: json.loads(m.decode('utf8')), | |
bootstrap_servers=[KAFKA_HOST]) | |
topic_partition = TopicPartition(topic=topic, partition=0) | |
end_offset = consumer.end_offsets([topic_partition])[topic_partition] | |
if end_offset > 0: | |
# partition assigned after poll, and we could seek | |
consumer.poll(5, 1) | |
consumer.seek(topic_partition, end_offset - 1) | |
message = consumer.poll(10000, 500) | |
msgs = message[topic_partition] | |
if len(msgs) > 0: | |
record = msgs[-1] | |
timestamp = to_timestamp(record.value['timestamp']) | |
order = None | |
if 'order' in record.value: | |
order = record.value['order'] | |
return timestamp, order | |
return None, None | |
#Example 29 | |
#Project: scrapy-kafka-export Author: TeamHG-Memex File: writer.py License: MIT License | |
def _get_consumer(self, **kwargs): | |
_kwargs = self.ssl_config.copy() | |
_kwargs.update(bootstrap_servers=self.bootstrap_servers) | |
_kwargs.update(kwargs) | |
return KafkaConsumer(**_kwargs) | |
#Example 30 | |
#Project: ozymandias Author: pambot File: ozy_app.py License: MIT License | |
def video_generator(topic): | |
"""Video streaming generator function.""" | |
consumer = KafkaConsumer('flask', | |
bootstrap_servers='localhost:9092', | |
auto_offset_reset='latest', | |
fetch_max_bytes=15728640, | |
max_partition_fetch_bytes=15728640, | |
group_id=topic) | |
for msg in consumer: | |
if msg.key == topic: | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + msg.value + b'\r\n') | |
time.sleep(0.1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment