A "Best of the Best Practices" (BOBP) guide to developing in Python.
- "Build tools for others that you want to be built for you." - Kenneth Reitz
- "Simplicity is alway better than functionality." - Pieter Hintjens
import collections | |
import MySQLdb as dbapi | |
__all__ = ['MySql'] | |
class MySql(object): | |
def __init__(self, **kwargs): | |
self.connection = dbapi.connect(**kwargs) | |
self.host_name = kwargs.get('host') |
names = " ".join(d[0] for d in cursor.description) | |
cls = collections.namedtuple('Results', names) | |
rows = map(cls._make, cursor.fetchall()) |
from pyparsing import * | |
import unittest | |
class Node(list): | |
def __eq__(self, other): | |
return list.__eq__(self, other) and self.__class__ == other.__class__ | |
def __repr__(self): | |
return '%s(%s)' % (self.__class__.__name__, list.__repr__(self)) |
from pyelasticsearch import ElasticSearch, ElasticHttpNotFoundError | |
from pyparsing import * | |
import unittest | |
ELASTICSEARCH_INDEX = 'myindex' | |
ELASTICSEARCH_URL = 'http://localhost:9200/' | |
es = ElasticSearch(ELASTICSEARCH_URL) |
# from http://pythondoeswhat.blogspot.com/2011/09/namedtupledict.html | |
def namedtupledict(*a, **kw): | |
namedtuple = collections.namedtuple(*a, **kw) | |
def getitem(self, key): | |
if type(key) == str: | |
return getattr(self, key) | |
return tuple.__getitem__(self, key) | |
namedtuple.__getitem__ = getitem | |
return namedtuple |
#!/usr/bin/env python | |
from gevent import monkey | |
monkey.patch_all() # Patch everything | |
import gevent | |
import time | |
class Hub(object): | |
"""A simple reactor hub... In async!""" |
import pandas as pd | |
df = pd.DataFrame([['2012', 'A', 1], ['2012', 'B', 4], ['2011', 'A', 5], ['2011', 'B', 4]], | |
columns=['year', 'manager', 'return_pct']) | |
df['total_return'] = (df | |
.groupby('manager')['return_pct'] | |
.transform(lambda group: (1 + group / 100.).cumprod().iat[-1])) - 1 | |
df['ranking'] = df.total_return.rank(ascending=False, method='dense') |
from MySQLdb.cursors import SSDictCursor | |
def iterate_query(query, connection, arraysize=1): | |
c = connection.cursor(cursorclass=SSDictCursor) | |
c.execute(query) | |
while True: | |
nextrows = c.fetchmany(arraysize) | |
if not nextrows: | |
break |