Skip to content

Instantly share code, notes, and snippets.

@harishb2k
Created December 11, 2021 09:53
Show Gist options
  • Save harishb2k/53e8bdc61ed8720991baae1d324967ca to your computer and use it in GitHub Desktop.
Save harishb2k/53e8bdc61ed8720991baae1d324967ca to your computer and use it in GitHub Desktop.
Python Flask app - record request/response (see end of the file for usage)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
WSGI middleware to record requests and responses.
"""
from __future__ import print_function, unicode_literals
import logging
import time
# Required to be loaded early to avoid hitting deadlock situation when processing requests
# See http://code.google.com/p/modwsgi/wiki/ApplicationIssues (at the bottom, under Non Blocking Module Imports)
import _strptime
import itertools
try:
from StringIO import StringIO ## for Python 2
except ImportError:
from io import StringIO ## for Python 3
from io import BytesIO
class RequestResponseState(object):
"""Capture the data for a request-response."""
def __init__(self, id, method, url, request_headers, content_length, request_body):
self.request_id = id
self.method = method
self.url = url
self.request_headers = request_headers
self.content_length = content_length
self.request_body = request_body
self.status = -1
self.response_headers = None
self.response_chunks = None
self.duration_msecs = 0
self.started_at = time.time()
def start_response(self, status, response_headers):
self.status = status
self.response_headers = response_headers
def finish_response(self, response_chunks):
self.duration_msecs = 1000.0 * (time.time() - self.started_at)
self.response_chunks = response_chunks
return response_chunks
class SessionRecorderMiddleware(object):
"""WSGI Middleware for recording of request-response"""
def __init__(self, app, recorder):
self.app = app
self.recorder = recorder
self.request_counter = itertools.count().__next__ # Threadsafe counter
def __call__(self, environ, start_response):
state = RequestResponseState(
self.request_counter(),
environ['REQUEST_METHOD'],
self.request_url(environ),
[(k, v) for k, v in self.parse_request_headers(environ)],
*self.request_body(environ)
)
def _start_response(status, response_headers, *args):
# Capture status and response_headers for later processing
state.start_response(status, response_headers)
return start_response(status, response_headers, *args)
response_chunks = state.finish_response(self.app(environ, _start_response))
self.recorder(state)
# return data to WSGI server
return response_chunks
def request_url(self, environ):
return '{0}{1}{2}'.format(
environ.get('SCRIPT_NAME', ''),
environ.get('PATH_INFO', ''),
'?' + environ['QUERY_STRING'] if environ.get('QUERY_STRING') else '',
)
_parse_headers_special = {
'HTTP_CGI_AUTHORIZATION': 'Authorization',
'CONTENT_LENGTH': 'Content-Length',
'CONTENT_TYPE': 'Content-Type',
}
def parse_request_headers(self, environ):
try:
for cgi_var, value in environ.iteritems():
if cgi_var in self._parse_headers_special:
yield self._parse_headers_special[cgi_var], value
elif cgi_var.startswith('HTTP_'):
yield cgi_var[5:].title().replace('_', '-'), value
except Exception as e:
pass
def request_body(self, environ):
content_length = environ.get('CONTENT_LENGTH')
body = ''
if content_length:
if content_length == '-1':
# This is a special case, where the content length is basically undetermined
body = environ['wsgi.input'].read(-1)
content_length = len(body)
else:
content_length = int(content_length)
body = environ['wsgi.input'].read(content_length)
try:
environ['wsgi.input'] = StringIO(body) # reset request body for the nested app
except:
environ['wsgi.input'] = BytesIO(body) # reset request body for the nested app
else:
content_length = 0
return content_length, body
def is_binary_content_type(content_type):
type_subtype = content_type.split(';')
_type, subtype = type_subtype.split('/')
if _type == 'text':
return False
elif _type == 'application':
return subtype not in (
'atom+xml', 'ecmascript', 'json', 'javascript', 'rss+xml', 'soap+xml', 'xhtml+xml')
else:
return True
def log_results(state):
print("Request Body", state.request_body, '{0} {1}'.format(state.method, state.url))
print("Response Statue", state.status)
def log_results_1(state):
# TODO: create an HttpArchive
data = [
'SR: {0}'.format(state.request_id),
'{0} {1}'.format(state.method, state.url),
str(state.request_headers),
# TODO: sanitize binary request body => look at request Content-Type
'{0} bytes: {1}'.format(state.content_length, state.request_body or '<EMPTY>'),
'=> {0} :: {1:.3f} ms :: {2}'.format(
state.status, state.duration_msecs, str(state.response_headers)),
] + (
# TODO: sanitize binary response body => look at response Content-Type
state.response_chunks
) + ['========']
logging.info('\n'.join(data))
# TODO: unit tests
# def recorder(state):
# log_results(state)
#
# app.wsgi_app = SessionRecorderMiddleware(app.wsgi_app, recorder)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment