Skip to content

Instantly share code, notes, and snippets.

@qxj
Last active October 20, 2015 07:05
Show Gist options
  • Save qxj/cf76154d590c39f080d1 to your computer and use it in GitHub Desktop.
Save qxj/cf76154d590c39f080d1 to your computer and use it in GitHub Desktop.
tail log files and publish text stream to remote kafka server.
#!/bin/bash
log_agent.py publish --file '/home/work/log/weblog/web/pp-stats_*.log' --file '/home/work/log/weblog/donatello/web_*.log' --status ~/log_agent.status --throttling 1000 --monitor 10.161.19.223:12121
#!/usr/bin/env python
# -*- coding: utf-8; tab-width: 4; -*-
# @(#) la_monitor.py Time-stamp: <Julian Qian 2015-09-29 15:37:30>
# Copyright 2015 Julian Qian
# Author: Julian Qian <[email protected]>
# Version: $Id: la_monitor.py,v 0.1 2015-09-22 10:11:54 jqian Exp $
#
import argparse
import logging
import json
import SocketServer
import signal
import thread
import threading
import urlparse
from wsgiref.simple_server import make_server
# FIXME thread sync
AGENT_STATUS = {} # 2d vectors (host, pattern)
def byteify(input):
if isinstance(input, dict):
return {byteify(key):byteify(value) for key,value in input.iteritems()}
elif isinstance(input, list):
return [byteify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input
class MonitorHandler(SocketServer.BaseRequestHandler):
def handle(self):
global AGENT_STATUS
data = self.request[0].strip()
# sock = self.request[1]
logging.debug('recv from client {}'.format(self.client_address[0]))
# parse data & update agent status
hosts = byteify(json.loads(data)) # NOTE wsgi can't process unicode
logging.debug('hearbeat: %s', hosts)
for hs in hosts:
host = hs['host']
patn = hs['pattern']
if host not in AGENT_STATUS:
AGENT_STATUS[host] = {}
AGENT_STATUS[host][patn] = hs
class MonitorSvr(threading.Thread):
def __init__(self, port=12121):
super(MonitorSvr, self).__init__()
self.svr = SocketServer.UDPServer(('0.0.0.0', port), MonitorHandler)
def stop(self):
self.svr.shutdown()
def run(self):
self.svr.serve_forever()
class WebSvr(object):
def __init__(self, web_port=12122, monitor_port=12121):
self.httpd = make_server('0.0.0.0', web_port, self.application)
self.monitor = MonitorSvr(monitor_port)
logging.info("web port: %d, minotor port: %d", web_port, monitor_port)
signal.signal(signal.SIGINT, self._stop)
signal.signal(signal.SIGTERM, self._stop)
def _stop(self, signum, frame):
logging.info('signal %d catched, exiting ...' % signum)
# shutdown MUST be called in another thread
thread.start_new_thread(lambda x: x.shutdown(), (self.httpd,))
self.monitor.stop()
def application(self, environ, start_response):
global AGENT_STATUS
logging.info("query string: %s", environ['QUERY_STRING'])
# logging.info("agent status: %s", AGENT_STATUS)
qs = urlparse.parse_qs(environ['QUERY_STRING'],
keep_blank_values=1)
body = '404'
if 'hosts' in qs: # list all hosts
hosts = []
for host, status in AGENT_STATUS.items():
logging.debug('host %s => status %s', host, status)
latestTime = ''
for patn, info in status.items():
if info['timestamp'] > latestTime:
latestTime = info['timestamp']
hosts.append('%s, %s' % (host, latestTime))
body = '\n'.join(hosts)
elif 'json' in qs:
body = json.dumps(AGENT_STATUS)
else: # (host, file, update time)
files = []
for status in AGENT_STATUS.values():
for info in status.values():
files.append('%s\t%s\t%s' % (info['host'],
info['filename'],
info['timestamp']))
body = '\n'.join(files)
if not body:
body = 'something wrong.'
status = '200 OK'
headers = [
('Content-Type', 'text/plain'),
('Content-Length', str(len(body)))
]
start_response(status, headers)
return [body]
def run(self):
logging.info('start web svr ..')
self.monitor.start()
self.httpd.serve_forever()
self.monitor.stop()
logging.info('end web svr.')
def main():
parser = argparse.ArgumentParser(description='log agent monitor')
parser.add_argument('--monitor-port', type=int, default=12121,
help='monitor port')
parser.add_argument('--web-port', type=int, default=12122,
help='web view port')
parser.add_argument('--dry', action='store_true', help='whether dry run')
parser.add_argument('--verbose', action='store_true', help='print verbose log')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
svr = WebSvr(args.web_port)
svr.run()
print "quit monitor..."
if __name__ == "__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8; tab-width: 4; -*-
# @(#) log_agent.py Time-stamp: <Julian Qian 2015-10-20 14:44:04>
# Copyright 2015 Julian Qian
# Author: Julian Qian <[email protected]>
# Version: $Id: log_agent.py,v 0.1 2015-03-31 16:04:48 jqian Exp $
#
import argparse
import collections
import cPickle
import fnmatch
import fcntl
import glob
import json
import logging
import logging.handlers
import os
import Queue
import signal
import socket
import sys
import time
import threading
import kafka.client
import kafka.producer
class NullHandler(logging.Handler):
def emit(self, record):
pass
h = NullHandler()
logging.getLogger("kafka").addHandler(h)
def getLogger(logfile=None, logtostderr=False,
logname="logagent", loglevel=logging.INFO):
logger = logging.getLogger(logname)
logger.setLevel(loglevel)
formatter = logging.Formatter(
'%(asctime)s %(name)s [%(levelname)s] '
'<%(filename)s:%(lineno)d> %(funcName)s: %(message)s')
if logfile: # append to file
fh = logging.handlers.RotatingFileHandler(logfile,
maxBytes=5*1024*1024,
backupCount=5)
fh.setFormatter(formatter)
logger.addHandler(fh)
if logtostderr: # append to sys.stderr
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
logger = getLogger(None, loglevel=logging.INFO, logtostderr=False)
# Global variables
MSG_QUEUE = Queue.Queue(2000)
LOG_FILES = []
RUN_LOOP = True
class FilePosition(object):
def __init__(self, filename='', offset=0, inode=0):
self.filename = filename
self.offset = offset
self.inode = inode
def __repr__(self):
return 'FilePosition<filename {}, offset {}>'.format(
self.filename, self.offset)
def switch(self, filename):
self.filename = filename
self.offset = 0
self.inode = os.stat(filename).st_ino
def update(self, offset):
self.offset = offset
def rotate(self):
self.inode = os.stat(self.filename).st_ino
self.offset = 0
def set(self, filename, offset):
self.filename = filename
self.offset = offset
self.inode = os.stat(filename).st_ino
class NomatchPatternError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class NoFileError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class Throttling(object):
def __init__(self, limit_per_sec):
self.limit_per_sec = limit_per_sec
self.curr_ts = 0
self.cnt_this_sec = 0
self.yield_sec = 0.005 # sleep 5 ms
def check(self):
ts = int(time.time())
while ts == self.curr_ts and self.cnt_this_sec >= self.limit_per_sec:
time.sleep(self.yield_sec)
ts = int(time.time())
if ts == self.curr_ts:
self.cnt_this_sec += 1
else:
self.curr_ts = ts
self.cnt_this_sec = 0
class LogFile(object):
def __init__(self, pattern):
self.pattern = pattern
self.position = FilePosition()
self.cntr = collections.Counter(['files', 'lines'])
# TODO integrate fp into position
self.fp = None # log file handler
self.tail_only = False
def __getstate__(self):
odict = self.__dict__.copy()
del odict['fp']
logger.debug('store file status, pattern: %s, filename: %s, offset: %d',
odict['pattern'], odict['position'].filename,
odict['position'].offset)
return odict
def __setstate__(self, odict):
logger.debug('reload file status, pattern: %s, filename: %s, offset: %d',
odict['pattern'], odict['position'].filename,
odict['position'].offset)
if not fnmatch.fnmatch(odict['position'].filename, odict['pattern']):
logger.warn('saved file and pattern are missmatched. %s != %s',
odict['position'].filename, odict['pattern'])
# revert position
odict['position'] = FilePosition()
odict['cntr'] = collections.Counter(['files', 'lines'])
# fp = open(odict['position'].filename)
# fp.seek(odict['position'].offset)
self.__dict__.update(odict)
self.fp = None
def __repr__(self):
return 'LogFile<file pattern: {}, position: {}, counter: {}>'.format(
self.pattern, self.position, self.cntr)
def __eq__(self, other):
return self.pattern == other.pattern
def _tell(self, reach_tail=False):
'''Get the right file position
return boolean (whether to reopen file)
'''
need_open = False
files = glob.glob(self.pattern)
if not files:
raise NomatchPatternError('no matched files for pattern %s' % self.pattern)
files_sorted = sorted(files)
if reach_tail:
latest_file = files_sorted[-1]
with open(latest_file) as fp:
fp.seek(0, os.SEEK_END)
offset = fp.tell()
self.position.set(latest_file, offset)
need_open = True
return need_open
# assume log files are ordered by time asc
if not self.position.filename:
self.position.switch(files_sorted[0])
need_open = True
candidate_filename = None
for filename in sorted(files):
if filename > self.position.filename:
candidate_filename = filename
logger.debug('current file %s, candidate file %s' % (
self.position.filename, candidate_filename))
break # two candidates are enough
fstat = os.stat(self.position.filename)
finode = fstat.st_ino
if finode != self.position.inode: # rotated logs
logger.debug('filename %s is rotated', self.position.filename)
self.position.rotate()
need_open = True
filesize = fstat.st_size
if filesize == self.position.offset: # eof
if candidate_filename:
logger.debug('current file %s reaches end, switch to %s' % (
self.position.filename, candidate_filename))
self.position.switch(candidate_filename)
need_open = True
return need_open
def reach_tail(self):
self._tell(True)
return self.position
def readline(self):
if not self.fp:
# init read position
self._tell()
self.fp = open(self.position.filename)
self.fp.seek(self.position.offset)
line = self.fp.readline()
if not line: # empty string
need_open = self._tell()
if need_open:
self.fp.close()
logger.info('open new file %s', self.position.filename)
self.fp = open(self.position.filename)
line = self.fp.readline()
self.cntr['files'] += 1
self.position.update(self.fp.tell())
if line:
self.cntr['lines'] += 1
return line
class TailThread(threading.Thread):
def __init__(self, fid, throughput=600):
global LOG_FILES
super(TailThread, self).__init__()
self.fid = fid
self.lf = LOG_FILES[fid]
self.throttling = Throttling(throughput)
def run(self):
global RUN_LOOP
while RUN_LOOP:
self.throttling.check()
line = self.lf.readline()
if line:
MSG_QUEUE.put(line)
else:
time.sleep(1)
logger.info('quit thread %d', self.fid)
class Progress(object):
'''serialize file tail progress
'''
def __init__(self, patterns, checkpoint_file):
global LOG_FILES
self.files = LOG_FILES
self.checkpoint_file = checkpoint_file
saved = []
if os.path.exists(checkpoint_file):
with open(checkpoint_file, 'rb') as fp:
try:
saved = cPickle.load(fp)
except:
logger.error('failed to unpicking: %s' % checkpoint_file)
for f in saved:
if f.pattern in patterns:
self.files.append(f)
patterns.remove(f.pattern)
for p in patterns:
self.files.append(LogFile(p))
logger.info('tracking files: %s.' % self.files)
def remove(self, logfile):
logger.debug('remove logfile: %s', logfile)
self.files.remove(logfile)
def empty(self):
return len(self.files) == 0
def reach_tail(self):
tails = []
for f in self.files:
tails.append(f.reach_tail())
return tails
def flush(self):
with open(self.checkpoint_file, 'wb') as fp:
cPickle.dump(self.files, fp)
def json(self):
files = []
for f in self.files:
# FIXME need better serialization
files.append({'host': socket.gethostname(),
'pid': os.getpid(),
'pattern': f.pattern,
'filename': f.position.filename,
'offset': f.position.offset,
'files': f.cntr['files'],
'lines': f.cntr['lines'],
'timestamp': time.strftime('%Y-%m-%d %H:%I:%S')})
return json.dumps(files)
class HeartBeat(threading.Thread):
def __init__(self, host, progress, timeout=5):
super(HeartBeat, self).__init__()
self.host = host
self.progress = progress
self.timeout = timeout
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
def run(self):
global RUN_LOOP
logger.info('start heartheat thread ...')
while RUN_LOOP:
json = self.progress.json()
logger.debug('heartbeat -> %s' % json)
try:
self.sock.sendto(json, self.host)
except:
logger.error('failed to sendto %s', self.host)
time.sleep(self.timeout)
class LogAgent(object):
def __init__(self, host, topic, batch_num,
patterns, checkpoint_file, dry=False):
self.dry = dry
logger.info('kafka producer %s, topic: %s' % (host, topic))
logger.info('batch: %d, target patterns: %s, progress conf: %s' % (
batch_num, patterns, checkpoint_file))
self.host = host
self.topic = topic
self.batch = batch_num
self.progress = Progress(patterns, checkpoint_file)
self.throttling = None
self.monitor = None
self.skip_missing = False
signal.signal(signal.SIGINT, self._stop)
signal.signal(signal.SIGTERM, self._stop)
def __enter__(self):
if not self.dry:
self.client = kafka.client.KafkaClient(self.host)
self.producer = kafka.producer.SimpleProducer(
self.client, batch_send=True if self.batch > 1 else False,
batch_send_every_n=self.batch)
return self
def __exit__(self, type, value, traceback):
global RUN_LOOP
RUN_LOOP = False
if not self.dry:
logger.info('stop producer ...')
self.producer.stop()
if self.monitor:
self.monitor.join()
self.progress.flush()
def _stop(self, signum, frame):
global RUN_LOOP
logger.info('signal %d catched, exiting ...' % signum)
RUN_LOOP = False
def throttle(self, throughput):
'''throughput per second
'''
self.throttling = Throttling(throughput)
def reach_tail(self):
tails = self.progress.reach_tail()
for tail in tails:
logger.info('reach tail, %s' % tail)
def set_monitor(self, host):
tmp = host.split(':')
addr, port = tmp[0], 12121
if len(tmp) > 1:
port = int(tmp[1])
self.monitor = HeartBeat((addr, port), self.progress)
self.monitor.start()
def skip_missing_files(self):
self.skip_missing = True
def _start_tail_threads(self):
for i, item in enumerate(self.progress.files):
t = TailThread(i)
t.start()
logger.info('start thread %d for %s', i, item)
def run(self):
global RUN_LOOP, MSG_QUEUE
self._start_tail_threads()
flushLines = 0
while RUN_LOOP or not MSG_QUEUE.empty():
flushLines += 1
if flushLines >= 1000:
self.progress.flush()
flushLines = 0
try:
line = MSG_QUEUE.get(True, 1)
if not self.dry:
try:
self.producer.send_messages(self.topic, line)
except Exception as e:
logger.exception('failed to send_message:\n%s' % line)
else:
sys.stdout.write(line)
MSG_QUEUE.task_done()
except:
logger.debug('queue is empty')
logger.info('break run loop, and message queue size %d',
MSG_QUEUE.qsize())
MSG_QUEUE.join()
class SingletonLock(object):
def __init__(self, lockFile):
self.lock = lockFile
self.fp = None
def trylock(self):
try:
self.fp = open(self.lock, "a")
fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.fp.truncate()
self.fp.write("{}".format(os.getpid()))
self.fp.flush()
except IOError, e:
self.fp.close()
self.fp = None
return False
return True
def unlock(self):
if self.fp:
fcntl.flock(self.fp, fcntl.LOCK_UN)
self.fp.close()
os.unlink(self.lock)
def main():
parser = argparse.ArgumentParser(description='log agent')
parser.add_argument('action', choices=('publish',),
help='''publish: collect logs and publish to kafka;
valid: read json logs from stdin, and validate them''')
parser.add_argument('--host', type=str, default='localhost:9092',
help='kafka hosts, format: "host1:port1,host2:port2,..."')
parser.add_argument('--topic', type=str, help='kafka topic')
parser.add_argument('--file', type=str, dest='files', action='append',
required=True, help='file to be tailed, can specify multiple files.')
parser.add_argument('--status', type=str, default='./agent.progress',
help='file to save upload status')
parser.add_argument('--only-tail', action='store_true',
help='only tail file, ignore saved status')
parser.add_argument('--throttling', type=int, default=0,
help='throttling throughput, lines per second')
parser.add_argument('--batch', type=int, default=10, help='batch number')
parser.add_argument('--monitor', type=str, help='monitor host:port')
parser.add_argument('--skip-missing', action='store_true',
help='will not try unmatched pattern, just skip them')
parser.add_argument('--pidfile', type=str, default='./agent.pid',
help='pid file')
parser.add_argument('--logfile', type=str, default='./agent.log',
help='log file path')
parser.add_argument('--dry', action='store_true', help='whether dry run')
parser.add_argument('--verbose', action='store_true',
help='print verbose log')
args = parser.parse_args()
global logger
if args.verbose:
logger = getLogger(logfile=args.logfile, loglevel=logging.DEBUG,
logtostderr=True)
else:
logger = getLogger(logfile=args.logfile, loglevel=logging.INFO)
sl = SingletonLock(args.pidfile)
if not sl.trylock():
logger.warn('another instance is running.')
sys.exit(-1)
if args.action == 'publish':
if not args.dry:
if not args.topic:
print "topic is missing"
elif not args.host:
print "kafka host is missing"
elif len(args.files) == 0:
print "log file is not specified"
logger.info("==== LOG AGENT IS STARTING ====")
with LogAgent(args.host, args.topic, args.batch,
args.files, args.status, dry=args.dry) as agent:
if args.throttling:
agent.throttle(args.throttling)
logger.info('[agent]throttling %d lines per second', args.throttling)
if args.only_tail:
agent.reach_tail()
logger.info('[agent] read logfrom tail of the latest log')
if args.monitor:
agent.set_monitor(args.monitor)
logger.info('[agent] report to monitor %s', args.monitor)
if args.skip_missing:
agent.skip_missing_files()
logger.info('[agent] skip unmatched pattern')
agent.run()
logger.info("==== LOG AGENT IS END ====")
sl.unlock()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment