Skip to content

Instantly share code, notes, and snippets.

@mgedmin
Created November 19, 2014 08:42
Show Gist options
  • Select an option

  • Save mgedmin/41b0419f61ea464536e3 to your computer and use it in GitHub Desktop.

Select an option

Save mgedmin/41b0419f61ea464536e3 to your computer and use it in GitHub Desktop.
procmail filter to group emails by traceback
#!/usr/bin/python
"""
Parse email messages with Python tracebacks, add email headers to group them.
Usage with procmail:
# Traceback emails!
:0 fw
* ^From: [email protected]
* ^TO_.*[email protected]
| /home/mg/bin/group-traceback-emails
If there's no Python-like traceback in the email, returns the message without
modification (except whatever normalization Python's email module parser and
formatter apply -- I've seen it add spaces after commas in X-Spam-* headers).
If there's a Python-like traceback, adds an X-Traceback-Hash header that
identifies this traceback. The hashing function is based on Zilch.
If the message didn't already have a References header, adds one with a
reference to a fake message ID of the form <tb-hash-XXXX@localhost>. This
makes mail clients like Mutt group together messages with the same kind of
traceback.
Zilch is https://github.com/bbangert/zilch
"""
import email
import hashlib
import os
import re
import sys
def extract_traceback_from_body(body):
"""Extracts a Python traceback from a multiline string.
Returns None or a list of lines (without trailing newlines).
If there's more than one traceback in the text, returns the first one.
"""
lines = iter(body.splitlines())
tb = []
for line in lines:
if line.rstrip() == 'Traceback (most recent call last):':
break
# custom ivija.reportgen tracebacks
if line.rstrip() == 'Traceback (innermost last):':
break
else:
return None
tb.append(line)
for line in lines:
tb.append(line)
if line and not line[0].isspace():
break
return tb
def guess_module(filename):
"""Try to guess Python module name from filename."""
components = os.path.splitext(filename)[0].split('/')
skip = 0
for idx, name in enumerate(components):
if name == 'src' or name.endswith('.egg') or '.' in name:
skip = idx + 1
if components[-1] == '__init__':
del components[-1]
return '.'.join(components[skip:])
def hash_traceback(tb):
"""Compute a hash for a traceback.
``tb`` should be a list of strings.
Tries to be compatible with Zilch hashes, but due to heuristics taken,
there are no guarantees.
"""
if not tb:
return None
ident = []
for line in tb:
m = re.match(r'^ File "([^"]*)", line \d+, in (\w+)$', line)
if m is not None:
filename, funcname = m.groups()
ident.append(guess_module(filename))
ident.append(funcname)
exc_type = tb[-1].partition(':')[0]
ident.append(exc_type)
return 'E-' + hash_id(' '.join(ident), 5)
def hash_id(id, length):
"""Hash an identifier.
A shorter version of paste.exceptions.serial_number_generator that Zilch
uses.
"""
bin_hash = hashlib.md5(id).digest()
good_characters = "23456789ABCDEFGHJKMNPQRTUVWXYZ"
base = len(good_characters)
modulo = base ** length
number = 0
for c in list(bin_hash):
number = (number * 256 + ord(c)) % modulo
result = []
while number:
number, next = divmod(number, base)
result.append(good_characters[next])
ident = ''.join(result)
ident = good_characters[0] * (length - len(ident)) + ident
return ident
def main():
tb = None
msg = email.message_from_file(sys.stdin)
if msg.is_multipart():
for part in msg.get_payload(decode=False):
part_text = part.get_payload(decode=True)
if part_text:
tb = extract_traceback_from_body(part_text)
if tb:
break
else:
body = msg.get_payload(decode=True)
tb = extract_traceback_from_body(body)
if tb:
tb_hash = hash_traceback(tb)
if tb_hash:
msg['X-Traceback-Hash'] = tb_hash
if not msg.get('References'):
msg['References'] = '<tb-hash-%s@localhost>' % tb_hash
sys.stdout.write(str(msg))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment