Last active
July 27, 2021 09:24
-
-
Save ashokarun/0997ca58be24f3b4baa6fb5750d21bb4 to your computer and use it in GitHub Desktop.
Find LAG on Kafta topics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
import sys | |
import socket | |
import kafka | |
""" | |
Variables defined | |
""" | |
TOPICS = ['topic1', 'topic2'] | |
GROUP = 'group' | |
BOOTSTRAP_SERVERS = ['IP:9092'] | |
LAG_FOUND = [] | |
""" | |
Check the connector status | |
""" | |
s = socket.socket() | |
address = 'IP' | |
port = 9092 | |
try: | |
s.connect((address, port)) | |
except Exception as e: | |
print("Connectivity check failed %s:%d. Exception is %s" % (address, port, e)) | |
sys.exit(2) | |
finally: | |
s.close() | |
from kafka import KafkaConsumer, TopicPartition | |
""" | |
Loop the topics | |
""" | |
for topic in TOPICS: | |
consumer = KafkaConsumer( | |
bootstrap_servers=BOOTSTRAP_SERVERS, | |
group_id=GROUP, | |
enable_auto_commit=False | |
) | |
""" | |
Identify the Lag | |
""" | |
for p in consumer.partitions_for_topic(topic): | |
tp = TopicPartition(topic, p) | |
consumer.assign([tp]) | |
committed = consumer.committed(tp) | |
consumer.seek_to_end(tp) | |
last_offset = consumer.position(tp) | |
lag = (last_offset - committed) | |
if lag >= 0: | |
LAG_FOUND.append(["topic: %s partition: %s committed: %s last: %s lag: %s" % (topic, p, committed, last_offset, lag)]) | |
consumer.close(autocommit=False) | |
""" | |
Print the topics with LAG > our threshold | |
""" | |
if len(LAG_FOUND) == 0: | |
print("No considerable lag found on given topics") | |
else: | |
print("Following topics have lag greater than the threshold we defined. CRITICAL!!!") | |
for x in range(len(LAG_FOUND)): | |
print(LAG_FOUND[x]) | |
sys.exit(2) | |
""" | |
Output | |
[arunlal.a@kafka01 scripts]$ /bin/python3 lag.py | |
Following topics have lag greater than the threshold we defined. CRITICAL!!! | |
['topic: topic1: 0 committed: 7 last: 7 lag: 0'] | |
['topic: topic2: 0 committed: 2 last: 8 lag: 6'] | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment