Last active
October 20, 2015 07:05
-
-
Save qxj/cf76154d590c39f080d1 to your computer and use it in GitHub Desktop.
tail log files and publish text stream to remote kafka server.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
log_agent.py publish --file '/home/work/log/weblog/web/pp-stats_*.log' --file '/home/work/log/weblog/donatello/web_*.log' --status ~/log_agent.status --throttling 1000 --monitor 10.161.19.223:12121 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8; tab-width: 4; -*- | |
# @(#) la_monitor.py Time-stamp: <Julian Qian 2015-09-29 15:37:30> | |
# Copyright 2015 Julian Qian | |
# Author: Julian Qian <[email protected]> | |
# Version: $Id: la_monitor.py,v 0.1 2015-09-22 10:11:54 jqian Exp $ | |
# | |
import argparse | |
import logging | |
import json | |
import SocketServer | |
import signal | |
import thread | |
import threading | |
import urlparse | |
from wsgiref.simple_server import make_server | |
# FIXME thread sync | |
AGENT_STATUS = {} # 2d vectors (host, pattern) | |
def byteify(input): | |
if isinstance(input, dict): | |
return {byteify(key):byteify(value) for key,value in input.iteritems()} | |
elif isinstance(input, list): | |
return [byteify(element) for element in input] | |
elif isinstance(input, unicode): | |
return input.encode('utf-8') | |
else: | |
return input | |
class MonitorHandler(SocketServer.BaseRequestHandler): | |
def handle(self): | |
global AGENT_STATUS | |
data = self.request[0].strip() | |
# sock = self.request[1] | |
logging.debug('recv from client {}'.format(self.client_address[0])) | |
# parse data & update agent status | |
hosts = byteify(json.loads(data)) # NOTE wsgi can't process unicode | |
logging.debug('hearbeat: %s', hosts) | |
for hs in hosts: | |
host = hs['host'] | |
patn = hs['pattern'] | |
if host not in AGENT_STATUS: | |
AGENT_STATUS[host] = {} | |
AGENT_STATUS[host][patn] = hs | |
class MonitorSvr(threading.Thread): | |
def __init__(self, port=12121): | |
super(MonitorSvr, self).__init__() | |
self.svr = SocketServer.UDPServer(('0.0.0.0', port), MonitorHandler) | |
def stop(self): | |
self.svr.shutdown() | |
def run(self): | |
self.svr.serve_forever() | |
class WebSvr(object): | |
def __init__(self, web_port=12122, monitor_port=12121): | |
self.httpd = make_server('0.0.0.0', web_port, self.application) | |
self.monitor = MonitorSvr(monitor_port) | |
logging.info("web port: %d, minotor port: %d", web_port, monitor_port) | |
signal.signal(signal.SIGINT, self._stop) | |
signal.signal(signal.SIGTERM, self._stop) | |
def _stop(self, signum, frame): | |
logging.info('signal %d catched, exiting ...' % signum) | |
# shutdown MUST be called in another thread | |
thread.start_new_thread(lambda x: x.shutdown(), (self.httpd,)) | |
self.monitor.stop() | |
def application(self, environ, start_response): | |
global AGENT_STATUS | |
logging.info("query string: %s", environ['QUERY_STRING']) | |
# logging.info("agent status: %s", AGENT_STATUS) | |
qs = urlparse.parse_qs(environ['QUERY_STRING'], | |
keep_blank_values=1) | |
body = '404' | |
if 'hosts' in qs: # list all hosts | |
hosts = [] | |
for host, status in AGENT_STATUS.items(): | |
logging.debug('host %s => status %s', host, status) | |
latestTime = '' | |
for patn, info in status.items(): | |
if info['timestamp'] > latestTime: | |
latestTime = info['timestamp'] | |
hosts.append('%s, %s' % (host, latestTime)) | |
body = '\n'.join(hosts) | |
elif 'json' in qs: | |
body = json.dumps(AGENT_STATUS) | |
else: # (host, file, update time) | |
files = [] | |
for status in AGENT_STATUS.values(): | |
for info in status.values(): | |
files.append('%s\t%s\t%s' % (info['host'], | |
info['filename'], | |
info['timestamp'])) | |
body = '\n'.join(files) | |
if not body: | |
body = 'something wrong.' | |
status = '200 OK' | |
headers = [ | |
('Content-Type', 'text/plain'), | |
('Content-Length', str(len(body))) | |
] | |
start_response(status, headers) | |
return [body] | |
def run(self): | |
logging.info('start web svr ..') | |
self.monitor.start() | |
self.httpd.serve_forever() | |
self.monitor.stop() | |
logging.info('end web svr.') | |
def main(): | |
parser = argparse.ArgumentParser(description='log agent monitor') | |
parser.add_argument('--monitor-port', type=int, default=12121, | |
help='monitor port') | |
parser.add_argument('--web-port', type=int, default=12122, | |
help='web view port') | |
parser.add_argument('--dry', action='store_true', help='whether dry run') | |
parser.add_argument('--verbose', action='store_true', help='print verbose log') | |
args = parser.parse_args() | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
svr = WebSvr(args.web_port) | |
svr.run() | |
print "quit monitor..." | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8; tab-width: 4; -*- | |
# @(#) log_agent.py Time-stamp: <Julian Qian 2015-10-20 14:44:04> | |
# Copyright 2015 Julian Qian | |
# Author: Julian Qian <[email protected]> | |
# Version: $Id: log_agent.py,v 0.1 2015-03-31 16:04:48 jqian Exp $ | |
# | |
import argparse | |
import collections | |
import cPickle | |
import fnmatch | |
import fcntl | |
import glob | |
import json | |
import logging | |
import logging.handlers | |
import os | |
import Queue | |
import signal | |
import socket | |
import sys | |
import time | |
import threading | |
import kafka.client | |
import kafka.producer | |
class NullHandler(logging.Handler): | |
def emit(self, record): | |
pass | |
h = NullHandler() | |
logging.getLogger("kafka").addHandler(h) | |
def getLogger(logfile=None, logtostderr=False, | |
logname="logagent", loglevel=logging.INFO): | |
logger = logging.getLogger(logname) | |
logger.setLevel(loglevel) | |
formatter = logging.Formatter( | |
'%(asctime)s %(name)s [%(levelname)s] ' | |
'<%(filename)s:%(lineno)d> %(funcName)s: %(message)s') | |
if logfile: # append to file | |
fh = logging.handlers.RotatingFileHandler(logfile, | |
maxBytes=5*1024*1024, | |
backupCount=5) | |
fh.setFormatter(formatter) | |
logger.addHandler(fh) | |
if logtostderr: # append to sys.stderr | |
ch = logging.StreamHandler() | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
return logger | |
logger = getLogger(None, loglevel=logging.INFO, logtostderr=False) | |
# Global variables | |
MSG_QUEUE = Queue.Queue(2000) | |
LOG_FILES = [] | |
RUN_LOOP = True | |
class FilePosition(object): | |
def __init__(self, filename='', offset=0, inode=0): | |
self.filename = filename | |
self.offset = offset | |
self.inode = inode | |
def __repr__(self): | |
return 'FilePosition<filename {}, offset {}>'.format( | |
self.filename, self.offset) | |
def switch(self, filename): | |
self.filename = filename | |
self.offset = 0 | |
self.inode = os.stat(filename).st_ino | |
def update(self, offset): | |
self.offset = offset | |
def rotate(self): | |
self.inode = os.stat(self.filename).st_ino | |
self.offset = 0 | |
def set(self, filename, offset): | |
self.filename = filename | |
self.offset = offset | |
self.inode = os.stat(filename).st_ino | |
class NomatchPatternError(Exception): | |
def __init__(self, value): | |
self.value = value | |
def __str__(self): | |
return repr(self.value) | |
class NoFileError(Exception): | |
def __init__(self, value): | |
self.value = value | |
def __str__(self): | |
return repr(self.value) | |
class Throttling(object): | |
def __init__(self, limit_per_sec): | |
self.limit_per_sec = limit_per_sec | |
self.curr_ts = 0 | |
self.cnt_this_sec = 0 | |
self.yield_sec = 0.005 # sleep 5 ms | |
def check(self): | |
ts = int(time.time()) | |
while ts == self.curr_ts and self.cnt_this_sec >= self.limit_per_sec: | |
time.sleep(self.yield_sec) | |
ts = int(time.time()) | |
if ts == self.curr_ts: | |
self.cnt_this_sec += 1 | |
else: | |
self.curr_ts = ts | |
self.cnt_this_sec = 0 | |
class LogFile(object): | |
def __init__(self, pattern): | |
self.pattern = pattern | |
self.position = FilePosition() | |
self.cntr = collections.Counter(['files', 'lines']) | |
# TODO integrate fp into position | |
self.fp = None # log file handler | |
self.tail_only = False | |
def __getstate__(self): | |
odict = self.__dict__.copy() | |
del odict['fp'] | |
logger.debug('store file status, pattern: %s, filename: %s, offset: %d', | |
odict['pattern'], odict['position'].filename, | |
odict['position'].offset) | |
return odict | |
def __setstate__(self, odict): | |
logger.debug('reload file status, pattern: %s, filename: %s, offset: %d', | |
odict['pattern'], odict['position'].filename, | |
odict['position'].offset) | |
if not fnmatch.fnmatch(odict['position'].filename, odict['pattern']): | |
logger.warn('saved file and pattern are missmatched. %s != %s', | |
odict['position'].filename, odict['pattern']) | |
# revert position | |
odict['position'] = FilePosition() | |
odict['cntr'] = collections.Counter(['files', 'lines']) | |
# fp = open(odict['position'].filename) | |
# fp.seek(odict['position'].offset) | |
self.__dict__.update(odict) | |
self.fp = None | |
def __repr__(self): | |
return 'LogFile<file pattern: {}, position: {}, counter: {}>'.format( | |
self.pattern, self.position, self.cntr) | |
def __eq__(self, other): | |
return self.pattern == other.pattern | |
def _tell(self, reach_tail=False): | |
'''Get the right file position | |
return boolean (whether to reopen file) | |
''' | |
need_open = False | |
files = glob.glob(self.pattern) | |
if not files: | |
raise NomatchPatternError('no matched files for pattern %s' % self.pattern) | |
files_sorted = sorted(files) | |
if reach_tail: | |
latest_file = files_sorted[-1] | |
with open(latest_file) as fp: | |
fp.seek(0, os.SEEK_END) | |
offset = fp.tell() | |
self.position.set(latest_file, offset) | |
need_open = True | |
return need_open | |
# assume log files are ordered by time asc | |
if not self.position.filename: | |
self.position.switch(files_sorted[0]) | |
need_open = True | |
candidate_filename = None | |
for filename in sorted(files): | |
if filename > self.position.filename: | |
candidate_filename = filename | |
logger.debug('current file %s, candidate file %s' % ( | |
self.position.filename, candidate_filename)) | |
break # two candidates are enough | |
fstat = os.stat(self.position.filename) | |
finode = fstat.st_ino | |
if finode != self.position.inode: # rotated logs | |
logger.debug('filename %s is rotated', self.position.filename) | |
self.position.rotate() | |
need_open = True | |
filesize = fstat.st_size | |
if filesize == self.position.offset: # eof | |
if candidate_filename: | |
logger.debug('current file %s reaches end, switch to %s' % ( | |
self.position.filename, candidate_filename)) | |
self.position.switch(candidate_filename) | |
need_open = True | |
return need_open | |
def reach_tail(self): | |
self._tell(True) | |
return self.position | |
def readline(self): | |
if not self.fp: | |
# init read position | |
self._tell() | |
self.fp = open(self.position.filename) | |
self.fp.seek(self.position.offset) | |
line = self.fp.readline() | |
if not line: # empty string | |
need_open = self._tell() | |
if need_open: | |
self.fp.close() | |
logger.info('open new file %s', self.position.filename) | |
self.fp = open(self.position.filename) | |
line = self.fp.readline() | |
self.cntr['files'] += 1 | |
self.position.update(self.fp.tell()) | |
if line: | |
self.cntr['lines'] += 1 | |
return line | |
class TailThread(threading.Thread): | |
def __init__(self, fid, throughput=600): | |
global LOG_FILES | |
super(TailThread, self).__init__() | |
self.fid = fid | |
self.lf = LOG_FILES[fid] | |
self.throttling = Throttling(throughput) | |
def run(self): | |
global RUN_LOOP | |
while RUN_LOOP: | |
self.throttling.check() | |
line = self.lf.readline() | |
if line: | |
MSG_QUEUE.put(line) | |
else: | |
time.sleep(1) | |
logger.info('quit thread %d', self.fid) | |
class Progress(object): | |
'''serialize file tail progress | |
''' | |
def __init__(self, patterns, checkpoint_file): | |
global LOG_FILES | |
self.files = LOG_FILES | |
self.checkpoint_file = checkpoint_file | |
saved = [] | |
if os.path.exists(checkpoint_file): | |
with open(checkpoint_file, 'rb') as fp: | |
try: | |
saved = cPickle.load(fp) | |
except: | |
logger.error('failed to unpicking: %s' % checkpoint_file) | |
for f in saved: | |
if f.pattern in patterns: | |
self.files.append(f) | |
patterns.remove(f.pattern) | |
for p in patterns: | |
self.files.append(LogFile(p)) | |
logger.info('tracking files: %s.' % self.files) | |
def remove(self, logfile): | |
logger.debug('remove logfile: %s', logfile) | |
self.files.remove(logfile) | |
def empty(self): | |
return len(self.files) == 0 | |
def reach_tail(self): | |
tails = [] | |
for f in self.files: | |
tails.append(f.reach_tail()) | |
return tails | |
def flush(self): | |
with open(self.checkpoint_file, 'wb') as fp: | |
cPickle.dump(self.files, fp) | |
def json(self): | |
files = [] | |
for f in self.files: | |
# FIXME need better serialization | |
files.append({'host': socket.gethostname(), | |
'pid': os.getpid(), | |
'pattern': f.pattern, | |
'filename': f.position.filename, | |
'offset': f.position.offset, | |
'files': f.cntr['files'], | |
'lines': f.cntr['lines'], | |
'timestamp': time.strftime('%Y-%m-%d %H:%I:%S')}) | |
return json.dumps(files) | |
class HeartBeat(threading.Thread): | |
def __init__(self, host, progress, timeout=5): | |
super(HeartBeat, self).__init__() | |
self.host = host | |
self.progress = progress | |
self.timeout = timeout | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP | |
def run(self): | |
global RUN_LOOP | |
logger.info('start heartheat thread ...') | |
while RUN_LOOP: | |
json = self.progress.json() | |
logger.debug('heartbeat -> %s' % json) | |
try: | |
self.sock.sendto(json, self.host) | |
except: | |
logger.error('failed to sendto %s', self.host) | |
time.sleep(self.timeout) | |
class LogAgent(object): | |
def __init__(self, host, topic, batch_num, | |
patterns, checkpoint_file, dry=False): | |
self.dry = dry | |
logger.info('kafka producer %s, topic: %s' % (host, topic)) | |
logger.info('batch: %d, target patterns: %s, progress conf: %s' % ( | |
batch_num, patterns, checkpoint_file)) | |
self.host = host | |
self.topic = topic | |
self.batch = batch_num | |
self.progress = Progress(patterns, checkpoint_file) | |
self.throttling = None | |
self.monitor = None | |
self.skip_missing = False | |
signal.signal(signal.SIGINT, self._stop) | |
signal.signal(signal.SIGTERM, self._stop) | |
def __enter__(self): | |
if not self.dry: | |
self.client = kafka.client.KafkaClient(self.host) | |
self.producer = kafka.producer.SimpleProducer( | |
self.client, batch_send=True if self.batch > 1 else False, | |
batch_send_every_n=self.batch) | |
return self | |
def __exit__(self, type, value, traceback): | |
global RUN_LOOP | |
RUN_LOOP = False | |
if not self.dry: | |
logger.info('stop producer ...') | |
self.producer.stop() | |
if self.monitor: | |
self.monitor.join() | |
self.progress.flush() | |
def _stop(self, signum, frame): | |
global RUN_LOOP | |
logger.info('signal %d catched, exiting ...' % signum) | |
RUN_LOOP = False | |
def throttle(self, throughput): | |
'''throughput per second | |
''' | |
self.throttling = Throttling(throughput) | |
def reach_tail(self): | |
tails = self.progress.reach_tail() | |
for tail in tails: | |
logger.info('reach tail, %s' % tail) | |
def set_monitor(self, host): | |
tmp = host.split(':') | |
addr, port = tmp[0], 12121 | |
if len(tmp) > 1: | |
port = int(tmp[1]) | |
self.monitor = HeartBeat((addr, port), self.progress) | |
self.monitor.start() | |
def skip_missing_files(self): | |
self.skip_missing = True | |
def _start_tail_threads(self): | |
for i, item in enumerate(self.progress.files): | |
t = TailThread(i) | |
t.start() | |
logger.info('start thread %d for %s', i, item) | |
def run(self): | |
global RUN_LOOP, MSG_QUEUE | |
self._start_tail_threads() | |
flushLines = 0 | |
while RUN_LOOP or not MSG_QUEUE.empty(): | |
flushLines += 1 | |
if flushLines >= 1000: | |
self.progress.flush() | |
flushLines = 0 | |
try: | |
line = MSG_QUEUE.get(True, 1) | |
if not self.dry: | |
try: | |
self.producer.send_messages(self.topic, line) | |
except Exception as e: | |
logger.exception('failed to send_message:\n%s' % line) | |
else: | |
sys.stdout.write(line) | |
MSG_QUEUE.task_done() | |
except: | |
logger.debug('queue is empty') | |
logger.info('break run loop, and message queue size %d', | |
MSG_QUEUE.qsize()) | |
MSG_QUEUE.join() | |
class SingletonLock(object): | |
def __init__(self, lockFile): | |
self.lock = lockFile | |
self.fp = None | |
def trylock(self): | |
try: | |
self.fp = open(self.lock, "a") | |
fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
self.fp.truncate() | |
self.fp.write("{}".format(os.getpid())) | |
self.fp.flush() | |
except IOError, e: | |
self.fp.close() | |
self.fp = None | |
return False | |
return True | |
def unlock(self): | |
if self.fp: | |
fcntl.flock(self.fp, fcntl.LOCK_UN) | |
self.fp.close() | |
os.unlink(self.lock) | |
def main(): | |
parser = argparse.ArgumentParser(description='log agent') | |
parser.add_argument('action', choices=('publish',), | |
help='''publish: collect logs and publish to kafka; | |
valid: read json logs from stdin, and validate them''') | |
parser.add_argument('--host', type=str, default='localhost:9092', | |
help='kafka hosts, format: "host1:port1,host2:port2,..."') | |
parser.add_argument('--topic', type=str, help='kafka topic') | |
parser.add_argument('--file', type=str, dest='files', action='append', | |
required=True, help='file to be tailed, can specify multiple files.') | |
parser.add_argument('--status', type=str, default='./agent.progress', | |
help='file to save upload status') | |
parser.add_argument('--only-tail', action='store_true', | |
help='only tail file, ignore saved status') | |
parser.add_argument('--throttling', type=int, default=0, | |
help='throttling throughput, lines per second') | |
parser.add_argument('--batch', type=int, default=10, help='batch number') | |
parser.add_argument('--monitor', type=str, help='monitor host:port') | |
parser.add_argument('--skip-missing', action='store_true', | |
help='will not try unmatched pattern, just skip them') | |
parser.add_argument('--pidfile', type=str, default='./agent.pid', | |
help='pid file') | |
parser.add_argument('--logfile', type=str, default='./agent.log', | |
help='log file path') | |
parser.add_argument('--dry', action='store_true', help='whether dry run') | |
parser.add_argument('--verbose', action='store_true', | |
help='print verbose log') | |
args = parser.parse_args() | |
global logger | |
if args.verbose: | |
logger = getLogger(logfile=args.logfile, loglevel=logging.DEBUG, | |
logtostderr=True) | |
else: | |
logger = getLogger(logfile=args.logfile, loglevel=logging.INFO) | |
sl = SingletonLock(args.pidfile) | |
if not sl.trylock(): | |
logger.warn('another instance is running.') | |
sys.exit(-1) | |
if args.action == 'publish': | |
if not args.dry: | |
if not args.topic: | |
print "topic is missing" | |
elif not args.host: | |
print "kafka host is missing" | |
elif len(args.files) == 0: | |
print "log file is not specified" | |
logger.info("==== LOG AGENT IS STARTING ====") | |
with LogAgent(args.host, args.topic, args.batch, | |
args.files, args.status, dry=args.dry) as agent: | |
if args.throttling: | |
agent.throttle(args.throttling) | |
logger.info('[agent]throttling %d lines per second', args.throttling) | |
if args.only_tail: | |
agent.reach_tail() | |
logger.info('[agent] read logfrom tail of the latest log') | |
if args.monitor: | |
agent.set_monitor(args.monitor) | |
logger.info('[agent] report to monitor %s', args.monitor) | |
if args.skip_missing: | |
agent.skip_missing_files() | |
logger.info('[agent] skip unmatched pattern') | |
agent.run() | |
logger.info("==== LOG AGENT IS END ====") | |
sl.unlock() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment