Last active
September 10, 2024 20:12
-
-
Save temoto/6156143 to your computer and use it in GitHub Desktop.
Part of py-helpers. Gzip compression shortcuts. Encoding. Database helpers. Retry decorator.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def namedlist(typename, field_names): | |
"""Returns a new subclass of list with named fields. | |
>>> Point = namedlist('Point', ('x', 'y')) | |
>>> Point.__doc__ # docstring for the new class | |
'Point(x, y)' | |
>>> p = Point(11, y=22) # instantiate with positional args or keywords | |
>>> p[0] + p[1] # indexable like a plain list | |
33 | |
>>> x, y = p # unpack like a regular list | |
>>> x, y | |
(11, 22) | |
>>> p.x + p.y # fields also accessable by name | |
33 | |
>>> d = p._asdict() # convert to a dictionary | |
>>> d['x'] | |
11 | |
>>> Point(**d) # convert from a dictionary | |
Point(x=11, y=22) | |
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields | |
Point(x=100, y=22) | |
""" | |
fields_len = len(field_names) | |
fields_text = repr(tuple(field_names)).replace("'", "")[1:-1] # tuple repr without parens or quotes | |
class ResultType(list): | |
__slots__ = () | |
_fields = field_names | |
def _fixed_length_error(*args, **kwargs): | |
raise TypeError(u"Named list has fixed length") | |
append = _fixed_length_error | |
insert = _fixed_length_error | |
pop = _fixed_length_error | |
remove = _fixed_length_error | |
def sort(self): | |
raise TypeError(u"Sorting named list in place would corrupt field accessors. Use sorted(x)") | |
def _replace(self, **kwargs): | |
values = map(kwargs.pop, field_names, self) | |
if kwargs: | |
raise TypeError(u"Unexpected field names: {s!r}".format(kwargs.keys())) | |
if len(values) != fields_len: | |
raise TypeError(u"Expected {e} arguments, got {n}".format(e=fields_len, n=len(values))) | |
return ResultType(*values) | |
def __repr__(self): | |
items_repr=", ".join("{name}={value!r}".format(name=name, value=value) | |
for name, value in zip(field_names, self)) | |
return "{typename}({items})".format(typename=typename, items=items_repr) | |
ResultType.__init__ = eval("lambda self, {fields}: self.__setitem__(slice(None, None, None), [{fields}])".format(fields=fields_text)) | |
ResultType.__name__ = typename | |
for i, name in enumerate(field_names): | |
fget = eval("lambda self: self[{0:d}]".format(i)) | |
fset = eval("lambda self, value: self.__setitem__({0:d}, value)".format(i)) | |
setattr(ResultType, name, property(fget, fset)) | |
return ResultType |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def datetime_to_tuple(dt, precision=None): | |
"""datetime, precision -> tuple(year, month, day, hour, minute, second, microsecond)[:precision] | |
Reverse operation is `datetime(*tuple)`. | |
""" | |
full = (dt.year, dt.month, dt.day, dt.hour, | |
dt.minute, dt.second, dt.microsecond) | |
return full[:precision] if precision else full | |
def datetime_to_unix(dt, _epoch_ord=datetime.date(1970, 1, 1).toordinal()): | |
"""UTC datetime -> UNIX timestamp | |
Invariant: `datetime.utcfromtimestamp(datetime_to_unix(dt)) == dt` | |
""" | |
days = dt.date().toordinal() - _epoch_ord | |
hours = days * 24 + dt.hour | |
minutes = hours * 60 + dt.minute | |
seconds = minutes * 60 + dt.second | |
return seconds + dt.microsecond / 1e6 | |
def str_to_date(s, format="%Y-%m-%d", _parse=datetime.datetime.strptime): | |
""" '2012-11-13' -> date(2012, 11, 13) | |
""" | |
dt = _parse(s, format) | |
return dt.date() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""PostgreSQL DB helpers. | |
Uses two pools for PostgreSQL DB connections: one pool for connections in autocommit mode used by execute(), | |
one pool for connections in transaction mode. | |
Interface:: | |
* autocommit() -> context manager, returns psycopg2.Cursor (in autocommit mode) | |
* execute(statement, params=None, repeat=True) -> psycopg2.Cursor | |
* transaction() -> context manager, returns psycopg2.Cursor inside explicit transaction | |
""" | |
import collections | |
import contextlib | |
import eventlet | |
import eventlet.db_pool | |
import logbook | |
# import logging | |
import psycopg2 | |
import psycopg2.extensions | |
import psycopg2.extras | |
import psycopg2.pool | |
import random | |
import time | |
try: | |
import sqlalchemy | |
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect | |
_sa_class = sqlalchemy.sql.ClauseElement | |
_sa_dialect = PGDialect() | |
except ImportError: | |
_sa_class = None | |
_sa_dialect = None | |
# Select logbook or logging here | |
log = logbook.Logger('db') | |
# log = logging.getLogger('db') | |
class Error(Exception): | |
pass | |
class DictCursor(psycopg2.extras.DictCursor): | |
def execute(self, statement, params=None, record_type=None, | |
_sa_class=_sa_class, _sa_dialect=_sa_dialect): | |
"""Psycopg2.Cursor.execute wrapped with query time logging. | |
Returns self, so you can chain it with fetch* methods, etc. | |
""" | |
if _sa_class is not None and isinstance(statement, _sa_class): | |
compiled = statement.compile(dialect=_sa_dialect) | |
statement, params = compiled.string, compiled.params | |
self.connection.notices[:] = [] | |
error = None | |
start = time.time() | |
try: | |
super(DictCursor, self).execute(statement, params) | |
except psycopg2.Error as error: | |
pass | |
total = round(time.time() - start, 3) | |
for notice in self.connection.notices: | |
log.notice(notice.strip().decode('utf-8', 'replace')) | |
if notice == "WARNING: there is already a transaction in progress\n": | |
raise Error(u"Nested BEGIN inside transaction. Aborting possibly broken code.") | |
sql = (self.mogrify(statement, params) | |
if not statement.lower().startswith("insert") | |
else statement).decode('utf-8', 'replace') | |
sql_id = id(sql) | |
log.info(u"Query [{time:.3f}] id={id} {sql}".format( | |
time=total, id=sql_id, sql=sql)) | |
if error is not None: | |
raise error | |
return self | |
def executemany(self, statement, parameters): | |
return super(DictCursor, self).executemany(statement, parameters) | |
def callproc(self, procname, parameters): | |
return super(DictCursor, self).callproc(procname, parameters) | |
def scalar(self): | |
row = self.fetchone() | |
if row is None: | |
return None | |
return row[0] | |
class NamedTupleCursor(psycopg2.extras.NamedTupleCursor): | |
EmptyRecord = namedtuple("Record", ()) | |
def execute(self, statement, params=None, record_type=None, | |
_sa_class=_sa_class, _sa_dialect=_sa_dialect): | |
"""Psycopg2.Cursor.execute wrapped with query time logging. | |
Returns cursor object, so you can chain it with fetch* methods, etc. | |
""" | |
if _sa_class is not None and isinstance(statement, _sa_class): | |
compiled = statement.compile(dialect=_sa_dialect) | |
statement, params = compiled.string, compiled.params | |
self.connection.notices[:] = [] | |
error = None | |
start = time.time() | |
try: | |
super(NamedTupleCursor, self).execute(statement, params) | |
except psycopg2.Error as error: | |
pass | |
total = round(time.time() - start, 3) | |
for notice in self.connection.notices: | |
log.notice(notice.strip().decode('utf-8', 'replace')) | |
if notice == "WARNING: there is already a transaction in progress\n": | |
raise DbError(u"Nested BEGIN inside transaction. Aborting possibly broken code.") | |
sql = (self.mogrify(statement, params) | |
if not statement.lower().startswith("insert") | |
else statement).decode('utf-8', 'replace') | |
sql_id = id(sql) | |
log.info(u"Query [{time:.3f}] id={id} {sql}".format( | |
time=total, id=sql_id, sql=sql)) | |
if error is not None: | |
raise error | |
self.Record = record_type | |
return self | |
def executemany(self, statement, parameters): | |
return super(NamedTupleCursor, self).executemany(statement, parameters) | |
def callproc(self, procname, parameters): | |
return super(NamedTupleCursor, self).callproc(procname, parameters) | |
def scalar(self): | |
row = self.fetchone() | |
if row is None: | |
return None | |
return row[0] | |
def _make_nt(self, _namedtuple=namedtuple): | |
if not self.description: | |
return NamedTupleCursor.EmptyRecord | |
columns = [d[0] if d[0] != "?column?" else "column" + str(i) | |
for i, d in enumerate(self.description, 1)] | |
return _namedtuple("Record", columns) | |
# Select default cursor class here | |
default_cursor_class = DictCursor | |
# default_cursor_class = NamedTupleCursor | |
class ReadCursor(object): | |
"""Read-only cursor-like object. | |
""" | |
rowcount = property(lambda self: self._rowcount) | |
def __init__(self, rows, rowcount): | |
self._rows = rows | |
self._rowcount = rowcount | |
def fetchone(self): | |
if self._rows is None: | |
raise psycopg2.ProgrammingError("no results to fetch") | |
if self._rowcount == 0: | |
return None | |
return self._rows[0] | |
def fetchmany(self, size=None): | |
if self._rows is None: | |
raise psycopg2.ProgrammingError("no results to fetch") | |
if size is None: | |
return self._rows | |
return self._rows[:size] | |
def fetchall(self): | |
if self._rows is None: | |
raise psycopg2.ProgrammingError("no results to fetch") | |
return self._rows | |
def __iter__(self): | |
if self._rows is None: | |
raise psycopg2.ProgrammingError("no results to fetch") | |
return iter(self._rows) | |
def scalar(self): | |
if self._rows is None: | |
raise psycopg2.ProgrammingError("no results to fetch") | |
if self._rowcount == 0: | |
return None | |
return self._rows[0][0] | |
class Connection(psycopg2.extensions.connection): | |
def commit(self): | |
start = time.time() | |
super(Connection, self).commit() | |
total = time.time() - start | |
log.info(u"Commit [{time:.3f}]".format(time=total)) | |
def rollback(self): | |
start = time.time() | |
super(Connection, self).rollback() | |
total = time.time() - start | |
log.info(u"Rollback [{time:.3f}]".format(time=total)) | |
def cursor(self, _klass=default_cursor_class, *args, **kwargs): | |
return super(Connection, self).cursor( | |
*args, cursor_factory=_klass, **kwargs) | |
class EventletConnectionPool(eventlet.db_pool.RawConnectionPool): | |
def connect(self, db_module, timeout, *args, **kwargs): | |
try: | |
connection = super(EventletConnectionPool, self).connect(db_module, timeout, *args, **kwargs) | |
except psycopg2.OperationalError: | |
raise | |
# Note: makes round-trip to DB. Only required for new connections. | |
connection.autocommit = True | |
return connection | |
@contextlib.contextmanager | |
def item(self): | |
close = True | |
conn = self.get() | |
# Note: makes round-trip to DB. Only required for new connections. | |
conn.autocommit = True | |
try: | |
yield conn | |
# no error | |
close = False | |
finally: | |
if close: | |
conn._base.close() | |
self.put(conn) | |
class ThreadConnectionPool(psycopg2.pool.ThreadedConnectionPool): | |
@contextlib.contextmanager | |
def item(self): | |
close = True | |
conn = self.getconn() | |
# Note: makes round-trip to DB. Only required for new connections. | |
conn.autocommit = True | |
try: | |
yield conn | |
# no error | |
close = False | |
finally: | |
self.putconn(conn, close=close or conn.closed) | |
def is_connection_error(e): | |
"""Exception object -> True | False | |
""" | |
if not isinstance(e, psycopg2.DatabaseError): | |
return False | |
error_str = str(e) | |
MSG1 = "socket not open" | |
MSG2 = "server closed the connection unexpectedly" | |
MSG3 = "could not connect to server" | |
return MSG1 in error_str or MSG2 in error_str or MSG3 in error_str | |
# TODO: override this if necessary | |
def get_connection_pool(group): | |
# the most straightforward threaded pool built-in psycopg2 | |
return ThreadConnectionPool( | |
minconn=0, maxconn=10, | |
dsn=POSTGRESQL_DSN, connection_factory=Connection) | |
# eventlet pool | |
# return EventletConnectionPool( | |
# psycopg2, min_size=0, max_size=10, max_idle=10, max_age=60, | |
# dsn=POSTGRESQL_DSN, connection_factory=Connection) | |
# pre initialized pools for different groups of database servers | |
# return group_map[group] | |
@contextlib.contextmanager | |
def autocommit(group='default', connection_pool=None): | |
"""Context manager. | |
Executes block with new cursor from pooled connection in autocommit mode. Returns cursor. | |
At the end of the block, the connection is returned to pool. | |
>>> with autocommit() as cursor: | |
... cursor.execute("select 1") | |
... cursor.execute("select 2") | |
Use it when you do several selects and don't want to waste time for final ROLLBACK. | |
""" | |
pool = connection_pool or get_connection_pool(group) | |
with pool.item() as connection: | |
cursor = connection.cursor() | |
yield cursor | |
def execute(statement, params=None, group='default', connection_pool=None, repeat=True, record_type=None): | |
"""Shortcut for | |
1. get connection from pool, create new cursor | |
2. cursor.execute(statement, params) | |
3. cursor.fetchall() (if possible) | |
4. return connection to pool | |
Returns read-only cursor with rows. | |
On disconnect, if `repeat is True` attempts reconnect and repeats function call one more time. | |
If second attempt fails, raises exception. | |
""" | |
pool = connection_pool or get_connection_pool(group) | |
with pool.item() as connection: | |
try: | |
cursor = connection.cursor() | |
cursor.execute(statement, params, record_type=record_type) | |
rows = None | |
rowcount = cursor.rowcount | |
try: | |
rows = cursor.fetchall() | |
except psycopg2.ProgrammingError as e: | |
if str(e) != "no results to fetch": | |
raise | |
return ReadCursor(rows, rowcount) | |
except psycopg2.DatabaseError as e: | |
if repeat and is_connection_error(e): | |
log.warning(u"execute() DB disconnect, repeating query.") | |
else: | |
raise | |
# Connection lost, repeat. | |
return execute(statement, params, repeat=False) | |
def transaction(group='default', connection_pool=None): | |
"""Context manager. | |
Executes block with new cursor from pooled connection in transaction. Returns cursor. | |
At the end of the block, the connection is returned to pool. | |
Transaction is commited "on success". | |
>>> with transaction() as cursor: | |
... rows = cursor.execute(...).fetchall() | |
... process(rows) | |
... cursor.execute(...) | |
Always use it instead of manual BEGIN/ROLLBACK-s. | |
""" | |
pool = connection_pool or get_connection_pool(group) | |
with pool.item() as connection: | |
cursor = connection.cursor() | |
cursor.execute("begin") | |
try: | |
yield cursor | |
except Exception: | |
cursor.execute("rollback") | |
raise | |
else: | |
cursor.execute("commit") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def str_utf8(x): | |
""" | |
Returns the byte string representation of obj. | |
Like unicode(x).encode('utf-8') except it works for bytes. | |
""" | |
if isinstance(x, str): | |
return x | |
return unicode(x).encode('utf-8') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from cStringIO import StringIO | |
from gzip import GzipFile | |
import zlib | |
def gzip_string(s, level=6): | |
"""Compress string using gzip. | |
Default compression level is 6. | |
""" | |
zbuf = StringIO() | |
zfile = GzipFile(mode='wb', compresslevel=level, fileobj=zbuf) | |
zfile.write(s) | |
zfile.close() | |
return zbuf.getvalue() | |
def gunzip_string(s): | |
"""Decompress string using gzip. | |
See http://stackoverflow.com/questions/2695152/in-python-how-do-i-decode-gzip-encoding/2695466#2695466 | |
""" | |
return zlib.decompress(s, 16 + zlib.MAX_WBITS) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import itertools | |
def recursive_sizeof(o, handlers={}, verbose=False, default_size=sys.getsizeof(0)): | |
"""Returns the approximate memory footprint an object and all of its contents. | |
Automatically finds the contents of the following builtin containers and | |
their subclasses: tuple, list, deque, dict, set and frozenset. | |
To search other containers, add handlers to iterate over their contents: | |
handlers = {SomeContainerClass: iter, | |
OtherContainerClass: OtherContainerClass.get_elements} | |
For objects without __sizeof__, `default_size` is used. | |
From http://code.activestate.com/recipes/577504/ | |
""" | |
dict_handler = lambda d: itertools.chain.from_iterable(d.items()) | |
iteritems_handler = lambda d: itertools.chain.from_iterable(d.items()) | |
all_handlers = { | |
tuple: iter, | |
list: iter, | |
deque: iter, | |
dict: dict_handler, | |
set: iter, | |
frozenset: iter, | |
} | |
all_handlers.update(handlers) # user handlers take precedence | |
seen = set() # track which object id's have already been seen | |
def sizeof(o): | |
id_ = id(o) | |
if id_ in seen: # do not double count the same object | |
return 0 | |
seen.add(id_) | |
s = sys.getsizeof(o, default_size) | |
if verbose: | |
print(s, type(o), repr(o)) | |
handler = None | |
for type_ in all_handlers: | |
if isinstance(o, type_): | |
handler = all_handlers[type_] | |
break | |
if handler is None and hasattr(o, 'iteritems'): | |
handler = iteritems_handler | |
if handler is not None: | |
s += sum(map(sizeof, handler(o))) | |
return s | |
return sizeof(o) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
# import logbook | |
import logging | |
import time | |
# log = logbook.Logger(__name__) | |
log = logging.getLogger(__name__) | |
def repr_func(f): | |
"""Attempt to get the most useful string representation of callable. | |
""" | |
name = getattr(f, 'func_name', '<unknown>') | |
func_code = getattr(f, 'func_code', None) | |
if func_code is not None: | |
return u'{name}() @ {fc.co_filename}:{fc.co_firstlineno}'.format( | |
name=name, | |
fc=func_code) | |
return repr(f) | |
def retry(tries, exceptions=(Exception,), delay=0): | |
""" | |
Decorator for retrying a function if exception occurs | |
tries -- num tries | |
exceptions -- exceptions to catch | |
delay -- wait between retries | |
""" | |
def wrapper(func): | |
@functools.wraps(func) | |
def wrapped(*args, **kwargs): | |
n = tries # copy to local variable for modification | |
while n > 0: | |
n -= 1 | |
try: | |
return func(*args, **kwargs) | |
except exceptions as e: | |
if n == 0: | |
raise | |
# logbook | |
# log.error(u'retry: {f} {e}', f=repr_func(func), e=e) | |
# logging | |
log.error(u'retry: %s %s', repr_func(func), e) | |
time.sleep(delay) | |
return wrapped | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Looks like some really handy snippets. Will leave you a shout out in my commits when i use them. Thanks ! :)