Last active
March 30, 2017 12:42
-
-
Save ysegorov/8947d99a016aa00ace51d9ab4d89c428 to your computer and use it in GitHub Desktop.
Logger server using edge-triggered epoll
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import errno | |
import itertools | |
import socket | |
import select | |
import logging | |
import logging.config | |
import pickle | |
import signal | |
import struct | |
import settings | |
logging.config.dictConfig(settings.LOGGING) | |
counter = itertools.count() | |
def unpickle(data): | |
return pickle.loads(data) | |
def server(host, port, backlog): | |
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
srv.bind((host, port)) | |
srv.listen(backlog) | |
srv.setblocking(0) | |
srv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |
print('Logger server started on {}:{}'.format(host, port)) | |
return srv | |
def handle(buffer): | |
while True: | |
if len(buffer) < 4: | |
return buffer | |
slen = struct.unpack('>L', buffer[:4])[0] | |
if len(buffer) < slen + 4: | |
return buffer | |
obj = unpickle(buffer[4:slen + 4]) | |
record = logging.makeLogRecord(obj) | |
logger = logging.getLogger(record.name) | |
logger.handle(record) | |
next(counter) | |
buffer = buffer[slen + 4:] | |
def ioloop(srv, bufsize=4096): | |
epoll = select.epoll() | |
epoll.register(srv.fileno(), select.EPOLLIN | select.EPOLLET) | |
clients = {} | |
buffers = {} | |
def unregister(fileno): | |
epoll.unregister(fileno) | |
buffers.pop(fileno) | |
conn = clients.pop(fileno) | |
conn.close() | |
def shutdown(fileno): | |
epoll.modify(fileno, select.EPOLLET) | |
clients[fileno].shutdown(socket.SHUT_RDWR) | |
try: | |
while True: | |
try: | |
events = epoll.poll(0.5) | |
except (KeyboardInterrupt, IOError): | |
break | |
stop_server = False | |
for fileno, event in events: | |
if fileno == srv.fileno() and event & select.EPOLLHUP: | |
stop_server = True | |
break | |
if fileno == srv.fileno(): | |
while True: | |
try: | |
conn, addr = srv.accept() | |
except socket.error as e: | |
if e.args[0] in (errno.EWOULDBLOCK, | |
errno.EAGAIN): | |
# means all new connections were accepted | |
break | |
raise | |
else: | |
conn.setblocking(0) | |
epoll.register(conn.fileno(), | |
select.EPOLLIN | select.EPOLLET) | |
clients[conn.fileno()] = conn | |
buffers[conn.fileno()] = b'' | |
elif event & select.EPOLLIN: | |
conn = clients[fileno] | |
buf = b'' | |
chunk_len = None | |
while True: | |
try: | |
chunk = conn.recv(bufsize) | |
except socket.error as e: | |
if e.args[0] in (errno.EWOULDBLOCK, | |
errno.EAGAIN): | |
# means all data from socket were received | |
break | |
raise | |
chunk_len = len(chunk) | |
if chunk_len > 0: | |
buf += chunk | |
else: | |
break | |
buffers[fileno] += buf | |
if chunk_len == 0: | |
# means client disconnected | |
shutdown(fileno) | |
buffers[fileno] = handle(buffers[fileno]) | |
elif event & select.EPOLLHUP or event & select.EPOLLERR: | |
unregister(fileno) | |
if stop_server: | |
break | |
finally: | |
epoll.unregister(srv.fileno()) | |
for fileno in list(clients.keys()): | |
unregister(fileno) | |
epoll.close() | |
srv.close() | |
num_records = next(counter) | |
print( | |
'Logger server stopped, {} records processed'.format(num_records)) | |
def main(): | |
host = os.environ.get('HOST', '127.0.0.1') | |
port = int(os.environ.get('PORT', 5000)) | |
backlog = int(os.environ.get('BACKLOG', 1000)) | |
srv = server(host, port, backlog) | |
def shutdown(signum, frame): | |
srv.shutdown(socket.SHUT_RDWR) | |
# signal.signal(signal.SIGINT, shutdown) | |
signal.signal(signal.SIGTERM, shutdown) | |
ioloop(srv) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DO $$ | |
BEGIN | |
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'log_level') THEN | |
CREATE TYPE log_level AS ENUM ('debug', 'info', 'warning', 'error', 'critical'); | |
END IF; | |
END $$; | |
CREATE TABLE IF NOT EXISTS log( | |
"id" serial PRIMARY KEY, | |
"created_at" timestamp with time zone NOT NULL, | |
"level" log_level NOT NULL, | |
"message" text NOT NULL, | |
"logger" varchar(64) NOT NULL, | |
"funcname" varchar(64) NOT NULL, | |
"filename" varchar(64) NOT NULL, | |
"pathname" varchar(255) NOT NULL, | |
"lineno" int NOT NULL, | |
"exc_info" text, | |
"extra" jsonb NOT NULL | |
); | |
CREATE INDEX IF NOT EXISTS "log_extra_idx" ON "log" USING GIN("extra" jsonb_path_ops); | |
CREATE INDEX IF NOT EXISTS "log_created_at_idx" ON "log" USING BTREE("created_at"); | |
CREATE INDEX IF NOT EXISTS "log_logger_idx" ON "log" USING BTREE("logger"); | |
CREATE INDEX IF NOT EXISTS "log_level_idx" ON "log" USING BTREE("level"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import datetime | |
import urllib | |
import urlparse | |
import logging | |
import traceback | |
import sys | |
import psycopg2 | |
import psycopg2.extensions | |
import psycopg2.extras | |
import ujson | |
class PgJson(psycopg2.extras.Json): | |
def dumps(self, obj): | |
return ujson.dumps(obj, ensure_ascii=False) | |
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) | |
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) | |
psycopg2.extensions.register_adapter(dict, PgJson) | |
psycopg2.extras.register_default_json(globally=True, loads=ujson.loads) | |
psycopg2.extras.register_default_jsonb(globally=True, loads=ujson.loads) | |
psycopg2.extras.register_uuid() | |
def pg_uri_to_kwargs(uri): | |
parsed = urlparse.urlparse(uri) | |
if parsed.scheme != 'postgresql': | |
raise ValueError('uri must start with "postgresql://"') | |
mapped = ( | |
('hostname', 'host', lambda x: x), | |
('username', 'user', lambda x: x), | |
('password', 'password', lambda x: x and urllib.unquote(x)), | |
('port', 'port', lambda x: x and int(x) or 5432), | |
('path', 'dbname', lambda x: x and _dbname(x[1:])), | |
) | |
return dict((k, cast(getattr(parsed, pk))) for (pk, k, cast) in mapped) | |
def _dbname(name): | |
return name.partition('.')[0] | |
class PGHandler(logging.Handler): | |
query = """ | |
PREPARE save_log AS | |
INSERT INTO log | |
(created_at, level, message, logger, | |
funcname, filename, pathname, lineno, | |
exc_info, extra) | |
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) | |
""" | |
def __init__(self, pg_uri): | |
self._pg_uri = pg_uri | |
self._conn = None | |
super(PGHandler, self).__init__() | |
@property | |
def conn(self): | |
if self._conn is None: | |
kwargs = pg_uri_to_kwargs(self._pg_uri) | |
self._conn = psycopg2.connect(**kwargs) | |
self._conn.set_session( | |
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED, | |
autocommit=True) | |
self._conn.set_client_encoding('UTF8') | |
cur = self._conn.cursor() | |
cur.execute('SET TIME ZONE "UTC";') | |
cur.execute(self.query) | |
cur.close() | |
print(' -> postgresql connection ready') | |
return self._conn | |
def emit(self, record): | |
level = record.levelname.lower() | |
created = datetime.datetime.utcfromtimestamp(record.created) | |
try: | |
with self.conn.cursor() as cursor: | |
# custom record attributes | |
# to be stored in `extra` field as `jsonb` | |
# edit as needed | |
keys = ('custom_key_1', 'custom_key_2') | |
extra = { | |
k: getattr(record, k) for k in keys if hasattr(record, k)} | |
args = ( | |
created.isoformat(), # 'created_at' | |
level, # 'level' | |
record.getMessage(), # 'message' | |
record.name, # 'logger' | |
record.funcName, # 'funcname' | |
record.filename, # 'filename' | |
record.pathname, # 'pathname' | |
record.lineno, # 'lineno' | |
record.exc_text, # 'exc_info' | |
extra, # 'extra' | |
) | |
cursor.execute( | |
'EXECUTE ' | |
'save_log (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', args) | |
except Exception: | |
sys.stderr.write(traceback.format_exc()) | |
self.handleError(record) | |
def close(self): | |
self.acquire() | |
try: | |
if self._conn: | |
self._conn.close() | |
self._conn = None | |
print(' -> postgresql connection closed') | |
finally: | |
self.release() | |
super(PGHandler, self).close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import time | |
import logging | |
DEBUG = os.environ.get('DEBUG', 'n').lower() in ('y', 'yes', 't', 'true') | |
BASE_DIR = os.path.dirname(os.path.dirname(__name__)) | |
LOG_DIR = os.path.join(BASE_DIR, 'logs') | |
if not os.path.isdir(LOG_DIR): | |
os.mkdir(LOG_DIR) | |
class UTCFormatter(logging.Formatter): | |
converter = time.gmtime | |
LOGGING = { | |
'version': 1, | |
'disable_existing_loggers': False, | |
'formatters': { | |
'default': { | |
'()': UTCFormatter, | |
'format': (u'[%(levelname)1.1s ' | |
u'%(asctime)s %(name)s:' | |
u'%(funcName)s:%(lineno)3d]' | |
u' %(message)s'), | |
'datefmt': '%Y-%m-%d %H:%M:%S', | |
} | |
}, | |
'handlers': { | |
'console': { | |
'level': 'DEBUG', | |
'class': 'logging.StreamHandler', | |
'formatter': 'default', | |
}, | |
'logfile': { | |
'level': 'DEBUG', | |
'formatter': 'default', | |
'class': 'logging.handlers.RotatingFileHandler', | |
'filename': os.path.join(LOG_DIR, 'service.log'), | |
'maxBytes': 100 * 1024 * 1024, | |
'encoding': 'utf8', | |
'backupCount': 60, | |
}, | |
}, | |
'loggers': { | |
'': { | |
'handlers': ['console'] if DEBUG else ['logfile'], | |
'level': 'DEBUG', | |
'propagate': False, | |
}, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
server works under py2 and py3 and is compatible with SocketHandler
epoll-related code is heavily based on this article by Scot Doyle