Last active
August 25, 2021 11:15
-
-
Save gtato/14dea777664239e78ded9b3b82a1d1e6 to your computer and use it in GitHub Desktop.
Pause and resume kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import queue | |
from confluent_kafka import Producer, Consumer, TopicPartition, OFFSET_END, OFFSET_BEGINNING | |
from multiprocessing import Queue | |
from threading import Thread | |
import time | |
import string | |
import random | |
def get_topics(nr=5): | |
return [f'topic.{i}' for i in range(nr)] | |
def get_producer(brokers): | |
producer = Producer({'bootstrap.servers': brokers}) | |
return producer | |
def get_consumer(brokers, group): | |
consumer = Consumer({ | |
'bootstrap.servers': brokers, | |
'group.id': group, | |
'delivery.report.only.error': True, | |
'enable.auto.commit': True, | |
'auto.offset.reset': 'latest', | |
# 'max.poll.interval.ms': 10000 | |
}) | |
return consumer | |
# start producing on a randomly in one of the predefined topics | |
def produce_thread(stop): | |
brokers = 'localhost:9092' | |
p = get_producer(brokers) | |
topics = get_topics() | |
indexes = {t: 0 for t in topics} | |
key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)) | |
print(f'producer key: {key}') | |
while True: | |
if stop(): | |
break | |
topic = random.choice(topics) | |
# print(f'producer producing on {topic}') | |
p.produce(topic, f'{key}-{indexes[topic]} on topic {topic}') | |
indexes[topic] += 1 | |
time.sleep(1) | |
# picks randomly if to subscribe or unsubscribe to a topic | |
def topic_selector(queue: Queue, stop, timer=5): | |
topics = get_topics() | |
actions = ['subscribe', 'unsubscribe'] | |
while True: | |
if stop(): | |
break | |
topic = random.choice(topics) | |
action = random.choice(actions) | |
topic_action = {'action': action, 'topic': topic} | |
# print(f'putting {topic_action}') | |
queue.put_nowait(topic_action) | |
time.sleep(timer) | |
def kafka_test(): | |
brokers = 'localhost:9092' | |
c = get_consumer(brokers, '111') | |
topic_queue = Queue() | |
stop_producer = False | |
producer_thread = Thread(target=produce_thread, args=[lambda: stop_producer]) | |
producer_thread.start() | |
topic_selector_thread = Thread(target=topic_selector, args=[topic_queue, lambda: stop_producer, 10]) | |
topic_selector_thread.start() | |
i = 0 | |
nr_topics_subscribed = 0 | |
# the consumer gets directions from the topic selector and subscribes or unsubscribes to specific topics | |
while True: | |
try: | |
topic_action = topic_queue.get_nowait() | |
assignment, changed = handle_subscription(c, topic_action) | |
if changed: | |
print(f'applied {topic_action}') | |
nr_topics_subscribed = len(assignment) | |
except queue.Empty: | |
pass | |
if nr_topics_subscribed == 0: | |
print('not subscribed to any topics, going to sleep') | |
time.sleep(1) | |
continue | |
msg = c.poll(-1.0) | |
if msg: | |
pos = c.position(c.assignment()) | |
print(f'got {msg.value()} ... offset {pos[0].offset}') | |
if i == 30: | |
break | |
i += 1 | |
stop_producer = True | |
producer_thread.join() | |
topic_selector_thread.join() | |
def handle_subscription(consumer: Consumer, topic_action): | |
changed = False | |
assignment = consumer.assignment() | |
assigned_topics = [assig.topic for assig in assignment] | |
subscribed = topic_action['topic'] in assigned_topics | |
subscribe_action = topic_action['action'] == 'subscribe' | |
if not subscribed and subscribe_action: | |
topic_partition = TopicPartition(topic=topic_action['topic'], partition=0, offset=OFFSET_END) | |
assignment.append(topic_partition) | |
changed = True | |
if subscribed and not subscribe_action: | |
assignment.pop(assigned_topics.index(topic_action['topic'])) | |
changed = True | |
if changed: | |
consumer.assign(assignment) | |
return assignment, changed | |
if __name__ == '__main__': | |
kafka_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment