Last active
November 9, 2018 22:35
-
-
Save danizen/d410c7ed2dc09be1b0e18dcd9626f4e1 to your computer and use it in GitHub Desktop.
A WSGI middleware to check for leaks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tracemalloc | |
import pathlib | |
import random | |
import csv | |
import os | |
from datetime import datetime | |
import re | |
import linecache | |
import logging | |
import threading | |
import portalocker | |
from django.conf import settings | |
logger = logging.getLogger(__name__) | |
class LeakCheckingWSGI: | |
sample_rate = float(os.environ.get('LEAKCHECK_SAMPLE_RATE', 0.1)) | |
top = int(os.environ.get('LEAKCHECK_TOP', 10)) | |
filterpath = os.environ.get('LEAKCHECK_FILTERS', None) | |
def __init__(self, application): | |
self.application = application | |
if self.__class__.filterpath: | |
self.filterpath = pathlib.Path(self.__class__.filterpath) | |
else: | |
self.filterpath = pathlib.Path(settings.BASE_DIR).joinpath('leakcheck-filters.csv') | |
self.filename_expr = re.compile(r'lib[/\\]', re.IGNORECASE) | |
self.basedir_len_plus1 = len(settings.BASE_DIR)+1 | |
self.lock = threading.Lock() | |
self.log = pathlib.Path(settings.LOG_DIR, 'leakcheck.txt') | |
@property | |
def filters(self): | |
if not hasattr(self, '_filters'): | |
filters = [ | |
tracemalloc.Filter(False, '<frozen importlib._bootstrap>'), | |
tracemalloc.Filter(False, '<frozen importlib._bootstrap_external>'), | |
tracemalloc.Filter(False, __file__), | |
] | |
if self.filterpath.exists(): | |
with self.filterpath.open('r', encoding='utf-8') as f: | |
reader = csv.reader(f, dialect='unix') | |
for row in reader: | |
if row == ['inclusive','pattern', 'lineno']: | |
continue | |
filters.append(tracemalloc.Filter( | |
row[0].lower() == 'true', | |
row[1], | |
row[2] if row[2] else None | |
)) | |
self._filters = filters | |
return self._filters | |
def get_filename(self, frame): | |
m = self.filename_expr.search(frame.filename) | |
if m: | |
return frame.filename[m.end():] | |
elif frame.filename.startswith(settings.BASE_DIR): | |
return frame.filename[self.basedir_len_plus1:] | |
else: | |
return frame.filename | |
def report(self, method, path, stats): | |
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') | |
logger.info('%s %s', method, path) | |
with self.log.open('a', encoding='utf-8') as f: | |
portalocker.lock(f, portalocker.LOCK_EX) | |
f.write('{} {} {}\n'.format( | |
timestamp, method, path | |
)) | |
for index, stat in enumerate(stats[:self.top], 1): | |
frame = stat.traceback[0] | |
filename = self.get_filename(frame) | |
f.write(' {}: {}:{}: {:.2f} KB new, {:.2f} KB total\n'.format( | |
index, filename, frame.lineno, | |
stat.size_diff/1024, stat.size/1024 | |
)) | |
line = linecache.getline(frame.filename, frame.lineno).strip() | |
if line: | |
prefix = ' ' if index < 10 else ' ' | |
f.write(prefix+line+'\n') | |
other = stats[self.top:] | |
if other: | |
size_diff = sum(stat.size_diff for stat in other) | |
size = sum(stat.size for stat in other) | |
f.write(' {} other: {:.2f} KB new, {:.2f} KB total\n'.format( | |
len(other), size_diff, size)) | |
f.write('\n') | |
def __call__(self, environ, start_response): | |
path = environ['PATH_INFO'] | |
method = environ['REQUEST_METHOD'] | |
if not path.startswith('/public') and random.uniform(0,1) < self.sample_rate: | |
tracemalloc.start() | |
snapshot_before = tracemalloc.take_snapshot().filter_traces(self.filters) | |
else: | |
snapshot_before = None | |
response = self.iterated_response(environ, start_response) | |
if snapshot_before: | |
snapshot_after = tracemalloc.take_snapshot().filter_traces(self.filters) | |
memdiff = snapshot_after.compare_to(snapshot_before, 'lineno') | |
tracemalloc.stop() | |
total_change = sum(s.size_diff for s in memdiff) | |
if total_change > 0: | |
with self.lock: | |
self.report(method, path, memdiff) | |
return response | |
def iterated_response(self, environ, start_response): | |
""" | |
This forces Django to not be lazy and causes the response to be a tuple of string | |
""" | |
return tuple(a for a in self.application(environ, start_response)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment