Created
December 8, 2016 07:04
-
-
Save tonyseek/67cf7dd7d1f1604c01c9075bfb159503 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
ZooKeeper Monitor | |
================= | |
This is a simple script to monitor ZooKeeper node by "mntr" command. The | |
command response will be collected to the statsd server. | |
""" | |
import argparse | |
import logging | |
import re | |
import socket | |
import time | |
__version__ = '0.1.2' | |
logger = logging.getLogger(__name__) | |
class ZooKeeperBeat(object): | |
_mntr_re = re.compile(r'^zk_([a-z_]+)\s+(.+)$', re.MULTILINE) | |
def __init__(self, host, port, interval): | |
self.host = host | |
self.port = int(port) | |
self.interval = int(interval) | |
def __call__(self): | |
while True: | |
parsed = {} | |
try: | |
response = self.send('mntr', 10240) | |
except socket.error as e: | |
logger.error('receive mntr failed: %r' % e) | |
else: | |
parsed.update(self.parse_mntr(response)) | |
yield parsed | |
time.sleep(self.interval) | |
def send(self, command, chunk_size): | |
sock = socket.socket() | |
sock.connect((self.host, self.port)) | |
sock.send(command) | |
return sock.recv(chunk_size) | |
def parse_mntr(self, response): | |
for m in self._mntr_re.finditer(response): | |
key, value = m.groups() | |
if value.isdigit(): | |
value = int(value) | |
yield key, value | |
class StatsdClient(object): | |
def __init__(self, host, port, prefix): | |
self.host = host | |
self.port = int(port) | |
self.prefix = prefix.rstrip('.') + '.' | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
def send(self, data): | |
data = '\n'.join( | |
'{0}{1}:{2}'.format(self.prefix, k, '|'.join(map(str, v))) | |
for k, v in data.items()) | |
try: | |
self.sock.sendto(data, (self.host, self.port)) | |
except socket.error as e: | |
logger.error('send to statsd failed: %r' % e) | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--zk-host', default='127.0.0.1') | |
parser.add_argument('--zk-port', default=2181, type=int) | |
parser.add_argument('-i', '--interval', default=1, type=int) | |
parser.add_argument('-p', '--prefix', required=True) | |
parser.add_argument('--statsd-host', default='127.0.0.1') | |
parser.add_argument('--statsd-port', default=8125, type=int) | |
parser.add_argument('-d', '--debug', action='store_true', default=False) | |
parser.add_argument('-v', '--verbose', action='store_true', default=False) | |
parser.add_argument('--version', action='version', version=__version__) | |
return parser.parse_args() | |
def main(): | |
args = parse_args() | |
if args.debug: | |
logging_level = logging.DEBUG | |
elif args.verbose: | |
logging_level = logging.INFO | |
else: | |
logging_level = logging.WARNING | |
logging.basicConfig( | |
level=logging_level, | |
format='%(asctime)s [%(name)s:%(levelname)s:%(process)d] %(message)s', | |
) | |
beat = ZooKeeperBeat(args.zk_host, args.zk_port, args.interval) | |
statsd = StatsdClient(args.statsd_host, args.statsd_port, args.prefix) | |
for b in beat(): | |
metrics = { | |
'avg_latency': (b.get('avg_latency', 0), 'g'), | |
'max_latency': (b.get('max_latency', 0), 'g'), | |
'min_latency': (b.get('min_latency', 0), 'g'), | |
'num_alive_connections': (b.get('num_alive_connections', 0), 'g'), | |
'outstanding_requests': (b.get('outstanding_requests', 0), 'g'), | |
'packets_received': (b.get('packets_received', 0), 'g'), | |
'packets_sent': (b.get('packets_sent', 0), 'g'), | |
'znode_count': (b.get('znode_count', 0), 'g'), | |
'watch_count': (b.get('watch_count', 0), 'g'), | |
'ephemerals_count': (b.get('ephemerals_count', 0), 'g'), | |
'open_fd_count': (b.get('open_file_descriptor_count', 0), 'g'), | |
} | |
if b.get('server_state') == 'leader': | |
metrics['followers'] = (b.get('followers', 0), 'g') | |
metrics['synced_followers'] = (b.get('synced_followers', 0), 'g') | |
metrics['pending_syncs'] = (b.get('pending_syncs', 0), 'g') | |
statsd.send(metrics) | |
logger.debug(repr(metrics)) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment