Last active
August 29, 2015 14:04
-
-
Save msiemens/509416f4e171d213b56e to your computer and use it in GitHub Desktop.
My helpers.py file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
My compilation of small helper functions. | |
""" | |
import logging | |
import os | |
import threading | |
import traceback | |
import sys | |
from logging.handlers import SMTPHandler | |
from pygments import highlight | |
from pygments.lexers import get_lexer_by_name | |
from pygments.formatters import TerminalFormatter | |
def install_thread_excepthook(): | |
""" | |
Workaround for sys.excepthook thread bug | |
From | |
http://spyced.blogspot.com/2007/06/workaround-for-sysexcepthook-bug.html | |
(https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1230540&group_id=5470). | |
Call once from __main__ before creating any threads. | |
If using psyco, call psyco.cannotcompile(threading.Thread.run) | |
since this replaces a new-style class method. | |
""" | |
init_old = threading.Thread.__init__ | |
def init(self, *args, **kwargs): | |
init_old(self, *args, **kwargs) | |
run_old = self.run | |
def run_with_except_hook(*args, **kw): | |
try: | |
run_old(*args, **kw) | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except: | |
sys.excepthook(*sys.exc_info()) | |
self.run = run_with_except_hook | |
threading.Thread.__init__ = init | |
def highlited_excepthook(): | |
""" | |
Let all traceback messages be highlighted :) | |
Needs Pygments package. | |
Source: http://stackoverflow.com/a/14776693/997063 | |
""" | |
def excepthook(type, value, tb): | |
tbtext = ''.join(traceback.format_exception(type, value, tb)) | |
lexer = get_lexer_by_name("pytb", stripall=True) | |
formatter = TerminalFormatter() | |
sys.stderr.write(highlight(tbtext, lexer, formatter)) | |
sys.excepthook = excepthook | |
install_thread_excepthook() | |
class Struct(object): | |
""" | |
Helper class: Access dict keys like objects. | |
Attention: Differs from dict! | |
- iter(Struct) returns key, value | |
""" | |
def __init__(self, **entries): | |
# Recursion! | |
for key in entries: | |
if isinstance(entries[key], dict): | |
entries[key] = Struct(**entries[key]) | |
self.__dict__.update(entries) | |
def __repr__(self): | |
return str(self.__dict__) | |
def __iter__(self): | |
return iter(self.__dict__.items()) | |
def __getitem__(self, key): | |
return self.__dict__[key] | |
def __setitem__(self, key, value): | |
return self.__dict__.update({key: value}) | |
def mail_logging(logger, config): | |
""" | |
Log error messages to mail. | |
config = { | |
'SMTP_SERVER': '...', | |
'SMTP_USERNAME': '...', | |
'SMTP_PASSWORD': '...', | |
'MAIL_FROM': '...', | |
'MAIL_TO': '...', | |
'MAIL_SUBJECT': '...' | |
} | |
:type _logger: logging.Logger | |
""" | |
def mail_get_subject(self, record): | |
return self.subject + ': ' + str(record.exc_info[0].__name__) | |
class MyFormatter(logging.Formatter): | |
def __init__(self, *args, **kwargs): | |
super(MyFormatter, self).__init__(*args, **kwargs) | |
def format(self, record): | |
""" | |
:type record: logging.LogRecord | |
""" | |
exc = record.exc_info | |
exc_location = traceback.extract_tb(exc[2])[-1] | |
#: == (filename, line number, function name, text) | |
record.pathname = exc_location[0] | |
record.lineno = exc_location[1] | |
record.funcName = exc_location[2] | |
record.module = exc_location[3] # Misuse module for line content | |
return super(MyFormatter, self).format(record) | |
mail_handler = SMTPHandler(config['SMTP_SERVER'], | |
config['MAIL_FROM'], | |
[config['MAIL_TO']], | |
config['MAIL_SUBJECT'], | |
(config['SMTP_USERNAME'], config['SMTP_PASSWORD']), | |
tuple() if config['SMTP_SSL'] == 1 else None) | |
mail_handler.getSubject = types.MethodType(mail_get_subject, mail_handler) | |
mail_handler.setFormatter(MyFormatter(''' | |
Message type: %(levelname)s | |
Location: %(pathname)s:%(lineno)d | |
Function: %(funcName)s | |
Line: %(module)s | |
Time: %(asctime)s | |
Message: %(message)s | |
''')) | |
mail_handler.setLevel(logging.ERROR) | |
logger.addHandler(mail_handler) | |
class FileLockException(Exception): | |
pass | |
class FileLock(object): | |
""" A file locking mechanism that has context-manager support so | |
you can use it in a with statement. This should be relatively cross | |
compatible as it doesn't rely on msvcrt or fcntl for the locking. | |
""" | |
def __init__(self, file_name, timeout=10, delay=.05): | |
""" Prepare the file locker. Specify the file to lock and optionally | |
the maximum timeout and the delay between each attempt to lock. | |
""" | |
self.is_locked = False | |
self.lockfile = os.path.join(os.getcwd(), "%s.lock" % file_name) | |
self.file_name = file_name | |
self.timeout = timeout | |
self.delay = delay | |
def acquire(self): | |
""" Acquire the lock, if possible. If the lock is in use, it check again | |
every `wait` seconds. It does this until it either gets the lock or | |
exceeds `timeout` number of seconds, in which case it throws | |
an exception. | |
""" | |
start_time = time.time() | |
while True: | |
try: | |
self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR) | |
break; | |
except OSError as e: | |
if e.errno != errno.EEXIST: | |
raise | |
if (time.time() - start_time) >= self.timeout: | |
raise FileLockException("Timeout occured.") | |
time.sleep(self.delay) | |
self.is_locked = True | |
def release(self): | |
""" Get rid of the lock by deleting the lockfile. | |
When working in a `with` statement, this gets automatically | |
called at the end. | |
""" | |
if self.is_locked: | |
os.close(self.fd) | |
os.unlink(self.lockfile) | |
self.is_locked = False | |
def __enter__(self): | |
""" Activated when used in the with statement. | |
Should automatically acquire a lock to be used in the with block. | |
""" | |
if not self.is_locked: | |
self.acquire() | |
return self | |
def __exit__(self, type, value, traceback): | |
""" Activated at the end of the with statement. | |
It automatically releases the lock if it isn't locked. | |
""" | |
if self.is_locked: | |
self.release() | |
def __del__(self): | |
""" Make sure that the FileLock instance doesn't leave a lockfile | |
lying around. | |
""" | |
self.release() | |
@decorator | |
def synchronized(method, self, *args, **kwargs): | |
""" | |
Make a function call synchronized. Works only on class members! | |
Needs decorator package. | |
""" | |
try: | |
self._lock | |
except AttributeError: | |
self._lock = threading.Lock() | |
with self._lock: | |
return method(self, *args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment