Skip to content

Instantly share code, notes, and snippets.

@vsajip
Last active February 20, 2025 12:53
Show Gist options
  • Save vsajip/4b227eeec43817465ca835ca66f75e2b to your computer and use it in GitHub Desktop.
Save vsajip/4b227eeec43817465ca835ca66f75e2b to your computer and use it in GitHub Desktop.
Run a logging socket receiver in a production setting with logging from an example webapp

This set of files is to help you set up a socket listener for Python logging in a production environment.

The other files are:

Filename Purpose
prepare.sh A Bash script to prepare the environment for testing.
supervisor.conf The Supervisor configuration file, which has entries for the listener and a multi-process web application.
ensure_app.sh A Bash script to ensure that Supervisor is running with the above configuration.
log_listener.py The socket listener program which receives log events and records them to a file.
main.py A simple web application which performs logging via a socket connected to the listener.
webapp.json A JSON configuration file for the web application.
client.py A Python script to exercise the web application.

The web application uses Gunicorn, which is a popular web application server that starts multiple worker processes to handle requests. This example setup shows how the workers can write to the same log file without conflicting with one another — they all go through the socket listener.

To test these files, do the following in a POSIX environment:

  1. Download the Gist as a ZIP archive using the Download ZIP button.

  2. Unzip the above files from the archive into a scratch directory.

  3. In the scratch directory, run bash prepare.sh to get things ready. This creates a run subdirectory to contain Supervisor-related and log files, and a venv subdirectory to contain a virtual environment into which bottle, gunicorn and supervisor are installed.

  4. Run bash ensure_app.sh to ensure that Supervisor is running with the above configuration.

  5. Run venv/bin/python client.py to exercise the web application, which will lead to records being written to the log.

  6. Inspect the log files in the run subdirectory. You should see the most recent log lines in files matching the pattern app.log*. They won’t be in any particular order, since they have been handled concurrently by different worker processes in a non-deterministic way.

You can shut down the listener and the web application by running venv/bin/supervisorctl -c supervisor.conf shutdown.

The default configuration uses a TCP socket on port 9020. You can use a Unix Domain socket instead of a TCP socket by doing the following:

  1. In listener.json, add a socket key with the path to the domain socket you want to use. If this key is present, the listener listens on the corresponding domain socket and not on a TCP socket (the port key is ignored).
  2. In webapp.json, change the socket handler configuration dictionary so that the host value is the path to the domain socket, and set the port value to null.
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import urllib.request
with open('webapp.json', encoding='utf-8') as f:
config = json.loads(f.read())
URLS = [
'http://localhost:%d/?ident=%d' % (config['port'], ident)
for ident in range(1, 1001)
]
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 1): url for url in URLS}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print('Fetched %s' % url)
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
#!/usr/bin/env bash
PROJECT_DIR=$(dirname $0)
SUCTL=venv/bin/supervisorctl
SUD=venv/bin/supervisord
SUCONF=supervisor.conf
cd "$PROJECT_DIR"
"$SUCTL" -c $SUCONF status > /dev/null
status="$?"
case "$status" in
3)
"$SUCTL" -c "$SUCONF" restart all ;;
4)
"$SUD" -c "$SUCONF" ;;
esac
{
"port": 9020,
"logging": {
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)-8s %(process)s %(name)s %(message)s"
}
},
"handlers": {
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "run/app.log",
"maxBytes": 1024,
"backupCount": 5,
"formatter": "default"
}
},
"root": {
"level": "DEBUG",
"handlers": ["file"]
}
}
}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Red Dove Consultants Limited. BSD-3-Clause licensed.
#
import argparse
import json
import logging
import logging.config
import os
import pickle
import socketserver
import struct
import sys
PRINT_EXC_TYPE = False
class LogRecordStreamHandler(socketserver.BaseRequestHandler):
"""
Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.request.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.request.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.request.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self,
host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.logname = None
class LogRecordDomainSocketReceiver(socketserver.ThreadingUnixStreamServer):
def __init__(self, socket_path, handler=LogRecordStreamHandler):
socketserver.ThreadingUnixStreamServer.__init__(self, socket_path, handler)
self.logname = None
def main():
adhf = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(formatter_class=adhf)
aa = parser.add_argument
aa('config', metavar='CONFIG', help='Configuration file to use (JSON)')
options = parser.parse_args()
fn = options.config
if not os.path.exists(fn):
print(f'Configuration file not found: {fn}')
return 1
with open(fn, encoding='utf-8') as f:
config = json.loads(f.read())
logging.config.dictConfig(config['logging'])
logging.getLogger('log_listener').info('Log listener started.')
if 'socket' in config:
socket_path = config['socket']
server = LogRecordDomainSocketReceiver(socket_path)
print(f'About to start domain socket server using {socket_path} ...')
else:
port = config['port']
server = LogRecordSocketReceiver(port=port)
print(f'About to start TCP server on port {port} ...')
server.serve_forever()
if __name__ == '__main__':
try:
rc = main()
except KeyboardInterrupt:
rc = 2
except Exception as e:
if PRINT_EXC_TYPE:
s = ' %s:' % type(e).__name__
else:
s = ''
sys.stderr.write('Failed:%s %s\n' % (s, e))
if 'PY_DEBUG' in os.environ:
import traceback
traceback.print_exc()
rc = 1
sys.exit(rc)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 Red Dove Consultants Limited
#
import argparse
import datetime
import json
import logging
import logging.config
import os
import sys
from bottle import route, run, request
DEBUGGING = 'PY_DEBUG' in os.environ
logger = logging.getLogger(__name__)
@route('/')
def index():
s = datetime.datetime.now().isoformat(' ', 'seconds')
ident = request.query.get('ident')
logger.debug('Request with ident %s handled', ident)
style = '<style>body { font-family: sans-serif; border: 1px solid silver; padding: 1em; }</style>'
return f"{style}<h4>Hello, world! It's {s}</h4>"
def process(options):
host = options.config['host']
port = options.config['port']
run(host=host, port=port, server='gunicorn', workers=4)
def main():
adhf = argparse.ArgumentDefaultsHelpFormatter
ap = argparse.ArgumentParser(formatter_class=adhf, prog='main')
aa = ap.add_argument
aa('-c',
'--config',
default='webapp.json',
help='Logging configuration (JSON)')
options = ap.parse_args()
with open(options.config, encoding='utf-8') as f:
config = json.loads(f.read())
options.config = config
logging.config.dictConfig(config['logging'])
process(options)
if __name__ == '__main__':
try:
rc = main()
except KeyboardInterrupt:
rc = 2
except Exception as e:
if DEBUGGING:
s = ' %s:' % type(e).__name__
else:
s = ''
sys.stderr.write('Failed:%s %s\n' % (s, e))
if DEBUGGING:
import traceback
traceback.print_exc()
rc = 1
sys.exit(rc)
#!/usr/bin/env bash
mkdir -p run
python3 -m venv venv
venv/bin/pip install bottle gunicorn supervisor
;
; Note: shell expansion ("~" or "$HOME") is not supported. Environment
; variables can be expanded using this syntax: "%(ENV_HOME)s".
[unix_http_server]
file=%(here)s/run/supervisor.sock
[supervisord]
logfile=%(here)s/run/supervisord.log
pidfile=%(here)s/run/supervisord.pid
nodaemon=false
nocleanup=true
childlogdir=%(here)s/run
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix://%(here)s/run/supervisor.sock
[program:listener]
command = venv/bin/python log_listener.py listener.json
priority = 500
autostart = true
autorestart = true
directory = %(here)s
stdout_logfile=%(here)s/run/%(program_name)s_stdout.log
stderr_logfile=%(here)s/run/%(program_name)s_stderr.log
[program:app]
command = venv/bin/python main.py
autostart = true
autorestart = true
directory = %(here)s
stdout_logfile=%(here)s/run/%(program_name)s_stdout.log
stderr_logfile=%(here)s/run/%(program_name)s_stderr.log
{
"host": "0.0.0.0",
"port": 32323,
"logging": {
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)-8s %(name)s %(message)s"
}
},
"handlers": {
"socket": {
"class": "logging.handlers.SocketHandler",
"host": "localhost",
"port": 9020
}
},
"root": {
"level": "DEBUG",
"handlers": ["socket"]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment