Skip to content

Instantly share code, notes, and snippets.

@Ivlyth
Created August 13, 2015 12:19
Show Gist options
  • Save Ivlyth/f8cf086181e7cb5b4325 to your computer and use it in GitHub Desktop.
Save Ivlyth/f8cf086181e7cb5b4325 to your computer and use it in GitHub Desktop.
using python logging module and redis pub/sub pattern to save API logs from tornado server
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
Author : myth
Date : 15-7-22
Email : belongmyth at 163.com
'''
import redis
import threading
import sys
import json
import pymongo
from pymongo.errors import AutoReconnect, ConnectionFailure
from Queue import Queue
import logging
from logging import handlers
EMAIL_HOST = u'EMAIL_HOST'
EMAIL_PORT = 25
EMAIL_USER = u'USR'
EMAIL_PASS = u'PASSWD'
MAILING_LIST = [u'[email protected]']
queue = Queue(1000)
error_queue = Queue(1000)
def receive():
client = redis.StrictRedis()
ps = client.pubsub()
ps.subscribe(u'api_logs')
for item in ps.listen():
if item[u'type'] != u'message':
continue
chunk = item[u'data']
queue.put(chunk)
def save():
conn = pymongo.Connection()
db = conn[u'mcaccount_access']
col = db[u'api_logs']
while True:
data = queue.get()
try:
api_log = json.loads(data)
col.insert(api_log)
except (AutoReconnect, ConnectionFailure) as e:
queue.put(data)
exc_info = sys.exc_info()
error_queue.put(exc_info)
except Exception as e:
exc_info = sys.exc_info()
error_queue.put(exc_info)
def handler_error():
logger = logging.getLogger(u'apilogger-error-reporter')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(u'%(asctime)s %(name)s %(filename)s:%(lineno)d[%(funcName)s][%(levelname)s] '
u'- %(message)s', u'%Y/%m/%d %H:%M:%S')
smtp_handler = handlers.SMTPHandler((EMAIL_HOST, EMAIL_PORT), EMAIL_USER, MAILING_LIST,
u'Error happened in api log receiver',
(EMAIL_USER, EMAIL_PASS))
smtp_handler.setFormatter(formatter)
smtp_handler.setLevel(logging.FATAL)
logger.addHandler(smtp_handler)
while True:
try:
exc_info = error_queue.get()
qs = error_queue.qsize()
logger.fatal(u'Error happened in api log receiver, %d errors left'%qs, exc_info=exc_info)
except Exception as e:
pass
if __name__ == u'__main__':
rthread = threading.Thread(target=receive)
rthread.start()
sthread = threading.Thread(target=save)
sthread.start()
ethread = threading.Thread(target=handler_error)
ethread.start()
# !/usr/bin/env python
# -*- coding:utf8 -*-
'''
Author : myth
Date : 14-7-9
Email : belongmyth at 163.com
'''
import logging
import redis
from redis import ConnectionError
import time
# 修改 logging.handler.SocketHandler, 改为 redis client模式
class RedisPublishHandler(logging.Handler):
def __init__(self, channel, host=u'localhost', port=6379, db=0, password=None):
logging.Handler.__init__(self)
self.channel = channel
self.host = host
self.port = port
self.db = db
self.password = password
self.client = None
self.closeOnError = 0
self.retryTime = None
self.retryStart = 1.0
self.retryMax = 30.0
self.retryFactor = 2.0
def makeClient(self):
c = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
c.ping()
return c
def createClient(self):
now = time.time()
if self.retryTime is None:
attempt = 1
else:
attempt = (now >= self.retryTime)
if attempt:
try:
self.client = self.makeClient()
self.retryTime = None
except ConnectionError:
if self.retryTime is None:
self.retryPeriod = self.retryStart
else:
self.retryPeriod = self.retryPeriod * self.retryFactor
if self.retryPeriod > self.retryMax:
self.retryPeriod = self.retryMax
self.retryTime = now + self.retryPeriod
def send(self, s):
if self.client is None:
self.createClient()
if self.client:
try:
self.client.publish(self.channel, s)
except ConnectionError:
self.client = None
def makePickle(self, record):
return record.getMessage()
def handleError(self, record):
if self.closeOnError and self.client:
self.client = None #try to reconnect next time
else:
logging.Handler.handleError(self, record)
def emit(self, record):
try:
s = self.makePickle(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self.acquire()
try:
if self.client:
self.client = None
finally:
self.release()
logging.Handler.close(self)
# logger name
_logger = logging.getLogger(u'apilogger')
REDIS_HOST = u'127.0.0.1' # REDIS SERVER IP
redis_handler = RedisPublishHandler(u'api_logs', REDIS_HOST)
redis_handler.setLevel(logging.INFO)
_logger.addHandler(redis_handler)
apilogger = _logger
# override tornado.web.Application
class Application(tornado.web.Application):
def __init__(self):
tornado.web.Application.__init__(self, url_patterns, **server_settings)
self.apilogger = apilogger
# override log_request to save api log
def log_request(self, handlerIns):
if handlerIns.get_status() < 400:
log_method = access_log.info
elif handlerIns.get_status() < 500:
log_method = access_log.warning
else:
log_method = access_log.error
request_time = 1000.0 * handlerIns.request.request_time()
log_method("%d %s %.2fms", handlerIns.get_status(),
handlerIns._request_summary(), request_time)
if getattr(handlerIns, u'__NO_LOG__',False):
return
method = handlerIns.request.method.upper()
if method in getattr(handlerIns, u'__NO_LOG_METHODS__', []):
return
exception_type, exception_value, exception_traceback = sys.exc_info()
post_data = {
u'tan14key':handlerIns.request.headers.get(u'Tan14-Key', None),
u'service': getattr(handlerIns, u'__SERVICE_NAME__',u'NO_SERVICE_NAME'),
u'handler': handlerIns.__class__.__name__,
u'method': handlerIns.request.method.upper(),
u'remote_ip': handlerIns.request.headers.get(u'X-Real-Ip', handlerIns.request.headers.get(u'Remote-Ip',handlerIns.request.remote_ip)),
u'path': handlerIns.request.path,
u'query': handlerIns.request.query,
u'body': handlerIns.request.body,
u'headers':handlerIns.request.headers,
u'http_code':handlerIns._status_code,
u'http_reason':handlerIns._reason,
u'start_time':u'%.3f'%(1000.0*handlerIns.request._start_time),
u'start_time_fmt':datetime.fromtimestamp(handlerIns.request._start_time).strftime(u'%Y-%m-%d %H:%M:%S.%f'),
u'end_time':u'%.3f'%(1000.0*(handlerIns.request._finish_time if handlerIns.request._finish_time else time.time())),
u'end_time_fmt':datetime.fromtimestamp((handlerIns.request._finish_time if handlerIns.request._finish_time else time.time())).strftime(u'%Y-%m-%d %H:%M:%S.%f'),
u'request_time': round(1000.0 * handlerIns.request.request_time(), 3),
u'exception':str(exception_value) if exception_value else None,
u'traceback':traceback.extract_tb(exception_traceback) if exception_traceback else None
}
self.apilogger.info(json.dumps(post_data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment