Skip to content

Instantly share code, notes, and snippets.

@ashokarun
Last active July 27, 2021 09:24
Show Gist options
  • Save ashokarun/0997ca58be24f3b4baa6fb5750d21bb4 to your computer and use it in GitHub Desktop.
Save ashokarun/0997ca58be24f3b4baa6fb5750d21bb4 to your computer and use it in GitHub Desktop.
Find LAG on Kafta topics
#!/bin/python3
import sys
import socket
import kafka
"""
Variables defined
"""
TOPICS = ['topic1', 'topic2']
GROUP = 'group'
BOOTSTRAP_SERVERS = ['IP:9092']
LAG_FOUND = []
"""
Check the connector status
"""
s = socket.socket()
address = 'IP'
port = 9092
try:
s.connect((address, port))
except Exception as e:
print("Connectivity check failed %s:%d. Exception is %s" % (address, port, e))
sys.exit(2)
finally:
s.close()
from kafka import KafkaConsumer, TopicPartition
"""
Loop the topics
"""
for topic in TOPICS:
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP,
enable_auto_commit=False
)
"""
Identify the Lag
"""
for p in consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
lag = (last_offset - committed)
if lag >= 0:
LAG_FOUND.append(["topic: %s partition: %s committed: %s last: %s lag: %s" % (topic, p, committed, last_offset, lag)])
consumer.close(autocommit=False)
"""
Print the topics with LAG > our threshold
"""
if len(LAG_FOUND) == 0:
print("No considerable lag found on given topics")
else:
print("Following topics have lag greater than the threshold we defined. CRITICAL!!!")
for x in range(len(LAG_FOUND)):
print(LAG_FOUND[x])
sys.exit(2)
"""
Output
[arunlal.a@kafka01 scripts]$ /bin/python3 lag.py
Following topics have lag greater than the threshold we defined. CRITICAL!!!
['topic: topic1: 0 committed: 7 last: 7 lag: 0']
['topic: topic2: 0 committed: 2 last: 8 lag: 6']
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment