Last active
August 29, 2015 14:10
-
-
Save gregpinero/53ccdb7287c414b7bd46 to your computer and use it in GitHub Desktop.
Python Helper Functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import datetime | |
from time import time | |
import sys | |
import math | |
import zipfile | |
import pickle | |
from cStringIO import StringIO | |
from dateutil.relativedelta import relativedelta | |
class AttributeDict(dict): | |
def __getattr__(self, attr): | |
return self[attr] | |
def __setattr__(self, attr, value): | |
self[attr] = value | |
class InMemoryZip(object): | |
"""Inspired from http://stackoverflow.com/questions/2463770/python-in-memory-zip-library | |
Sample usage: | |
imz = InMemoryZip() | |
imz.append("test.txt", "Another test").append("test2.txt", "Still another") | |
imz.writetofile("test.zip") | |
""" | |
def __init__(self, level=zipfile.ZIP_DEFLATED): | |
# Create the in-memory file-like object | |
self.in_memory_zip = StringIO() | |
self.level = level | |
def append(self, filename_in_zip, file_contents): | |
'''Appends a file with name filename_in_zip and contents of | |
file_contents to the in-memory zip.''' | |
# Get a handle to the in-memory zip in append mode | |
zf = zipfile.ZipFile(self.in_memory_zip, "a", self.level, False) | |
# Write the file to the in-memory zip | |
zf.writestr(filename_in_zip, file_contents) | |
# Mark the files as having been created on Windows so that | |
# Unix permissions are not inferred as 0000 | |
for zfile in zf.filelist: | |
zfile.create_system = 0 | |
return self | |
def read(self): | |
'''Returns a string with the contents of the in-memory zip.''' | |
self.in_memory_zip.seek(0) | |
return self.in_memory_zip.read() | |
def writetofile(self, filename): | |
'''Writes the in-memory zip to a file.''' | |
f = file(filename, "w") | |
f.write(self.read()) | |
f.close() | |
def trace(fn): | |
"""A decorator to time your functions""" | |
def trace_func(*args, **kwargs): | |
print fn.__name__ + '...', | |
sys.stdout.flush() | |
beg = time() | |
ret = fn(*args, **kwargs) | |
tot = time() - beg | |
print '%.3f' % tot | |
return ret | |
return trace_func | |
def daterange(startdt,enddt,increment_val,increment_type='days'): | |
"""Return dates by stepping through the date range and returning | |
each value inclusive of start and end dates. | |
increment_type can be 'days' or 'months' | |
uses dateutil : http://labix.org/python-dateutil | |
>>> import datetime; d=datetime.date | |
>>> daterange(d(1999,12,29),d(2000,1,2),1,'days') | |
[datetime.date(1999, 12, 29), datetime.date(1999, 12, 30), datetime.date(1999, 12, 31), datetime.date(2000, 1, 1), datetime.date(2000, 1, 2)] | |
>>> daterange(d(2007,11,28),d(2007,12,1),1,'days') | |
[datetime.date(2007, 11, 28), datetime.date(2007, 11, 29), datetime.date(2007, 11, 30), datetime.date(2007, 12, 1)] | |
""" | |
if increment_type=='days': | |
dt_inc=relativedelta(days=increment_val) | |
elif increment_type=='months': | |
dt_inc=relativedelta(months=increment_val) | |
retlist=[] | |
while startdt<=enddt: | |
retlist.append(startdt) | |
startdt+=dt_inc | |
retlist=[item for item in retlist if item<=enddt] #safety | |
return retlist | |
def monetize(number,symbol='$',use_comma=True): | |
temp = symbol + "%.2f" % round(float(number),2) | |
if not use_comma: | |
return temp | |
profile = re.compile(r"(\d)(\d\d\d[.,])") | |
while 1: | |
temp, count = re.subn(profile,r"\1,\2",temp) | |
if not count: break | |
return temp | |
#Group a list by common element property | |
def group_list_items_by_common_attribute(alist,getterfunc): | |
""" | |
Return a dict with key as key, holding a list of all of his elements | |
example: | |
list1=[{'a':1},{'a':2},{'a':3},{'a':1}] | |
def gfunc1(val): | |
return str(val.get('a','')) | |
print group_list_items_by_common_key(list1,gfunc1) | |
>> {'1': [{'a': 1}, {'a': 1}], '3': [{'a': 3}], '2': [{'a': 2}]} | |
""" | |
retdict={} | |
for item in alist: | |
currentkey=getterfunc(item) | |
if not retdict.has_key(currentkey): | |
retdict[currentkey]=[item] | |
else: | |
retdict[currentkey].append(item) | |
assert len(alist)==sum([len(subitems) for subitems in retdict.values()]) | |
return retdict | |
def latin1_to_ascii(unicrap): | |
"""This takes a UNICODE string and replaces Latin-1 characters with | |
something equivalent in 7-bit ASCII. It returns a plain ASCII string. | |
This function makes a best effort to convert Latin-1 characters into | |
ASCII equivalents. It does not just strip out the Latin-1 characters. | |
All characters in the standard 7-bit ASCII range are preserved. | |
In the 8th bit range all the Latin-1 accented letters are converted | |
to unaccented equivalents. Most symbol characters are converted to | |
something meaningful. Anything not converted is deleted. | |
http://code.activestate.com/recipes/251871/ (r1) | |
""" | |
xlate={0xc0:'A', 0xc1:'A', 0xc2:'A', 0xc3:'A', 0xc4:'A', 0xc5:'A', | |
0xc6:'Ae', 0xc7:'C', | |
0xc8:'E', 0xc9:'E', 0xca:'E', 0xcb:'E', | |
0xcc:'I', 0xcd:'I', 0xce:'I', 0xcf:'I', | |
0xd0:'Th', 0xd1:'N', | |
0xd2:'O', 0xd3:'O', 0xd4:'O', 0xd5:'O', 0xd6:'O', 0xd8:'O', | |
0xd9:'U', 0xda:'U', 0xdb:'U', 0xdc:'U', | |
0xdd:'Y', 0xde:'th', 0xdf:'ss', | |
0xe0:'a', 0xe1:'a', 0xe2:'a', 0xe3:'a', 0xe4:'a', 0xe5:'a', | |
0xe6:'ae', 0xe7:'c', | |
0xe8:'e', 0xe9:'e', 0xea:'e', 0xeb:'e', | |
0xec:'i', 0xed:'i', 0xee:'i', 0xef:'i', | |
0xf0:'th', 0xf1:'n', | |
0xf2:'o', 0xf3:'o', 0xf4:'o', 0xf5:'o', 0xf6:'o', 0xf8:'o', | |
0xf9:'u', 0xfa:'u', 0xfb:'u', 0xfc:'u', | |
0xfd:'y', 0xfe:'th', 0xff:'y', | |
0xa1:'!', 0xa2:'{cent}', 0xa3:'{pound}', 0xa4:'{currency}', | |
0xa5:'{yen}', 0xa6:'|', 0xa7:'{section}', 0xa8:'{umlaut}', | |
0xa9:'{C}', 0xaa:'{^a}', 0xab:'<<', 0xac:'{not}', | |
0xad:'-', 0xae:'{R}', 0xaf:'_', 0xb0:'{degrees}', | |
0xb1:'{+/-}', 0xb2:'{^2}', 0xb3:'{^3}', 0xb4:"'", | |
0xb5:'{micro}', 0xb6:'{paragraph}', 0xb7:'*', 0xb8:'{cedilla}', | |
0xb9:'{^1}', 0xba:'{^o}', 0xbb:'>>', | |
0xbc:'{1/4}', 0xbd:'{1/2}', 0xbe:'{3/4}', 0xbf:'?', | |
0xd7:'*', 0xf7:'/' | |
} | |
r = '' | |
for i in unicrap: | |
if xlate.has_key(ord(i)): | |
r += xlate[ord(i)] | |
elif ord(i) >= 0x80: | |
pass | |
else: | |
r += str(i) | |
return r | |
def todate(datetime_val): | |
"""Convert a datetime to a date""" | |
return datetime.date(datetime_val.year, datetime_val.month, datetime_val.day) | |
def memoize(max_hits=None,max_age=None,persist_to_file=None,update_file_freq=500, | |
flush_cache_file=False): | |
class decorator: | |
"""A decorator to cache previosly seen function inputs. | |
Also lets you specify max_hits which defines how many times | |
to return a cached value before rerunning function (max_hits) | |
or how long in seconds since the previous run the function before | |
running again (max_age). | |
usage (WARNING: need (), or (x) because of how decorator is set up) | |
@memoize() | |
def some_func(.. | |
or | |
@memoize(50) | |
def some_func(.. | |
>>> import time | |
>>> a = 42 | |
>>> @memoize(2) | |
... def meaningoflife(): | |
... time.sleep(.2) | |
... return a | |
>>> meaningoflife() | |
42 | |
>>> timelimit(.1)(meaningoflife)() | |
Traceback (most recent call last): | |
... | |
TimeoutError: took too long | |
>>> meaningoflife() | |
42 | |
>>> timelimit(.1)(meaningoflife)() | |
42 | |
>>> timelimit(.1)(meaningoflife)() | |
42 | |
""" | |
def __init__(self, function): | |
assert not (max_hits and max_age) | |
self.max_hits = max_hits | |
self.max_age = max_age | |
self.function = function | |
self.memoized = {} | |
self.writes = 0 | |
self.update_file_freq = update_file_freq | |
if persist_to_file: | |
cache_file_path = function.func_name + '.cache' | |
if os.path.exists(cache_file_path) and not flush_cache_file: | |
#read in file: | |
try: | |
self.memoized = pickle.load(open(cache_file_path, 'r')) | |
except EOFError: | |
pass | |
#open for writing | |
self.cache_file = open(cache_file_path, 'w') | |
self.update_disk_cache(force=True) | |
else: | |
self.cache_file = None | |
def update_disk_cache(self, force=False): | |
self.writes += 1 | |
if self.cache_file and ((self.writes % self.update_file_freq == 0) or force): | |
self.cache_file.truncate() | |
self.cache_file.seek(0) | |
pickle.dump(self.memoized, self.cache_file, pickle.HIGHEST_PROTOCOL) | |
self.cache_file.flush() | |
def __call__(self, *args, **kwargs): | |
key = (args,tuple(kwargs.items())) | |
if self.max_hits: | |
if key in self.memoized and self.memoized[key][1]<self.max_hits: | |
#use the cached value, increment the hit count | |
self.memoized[key][1]+=1 | |
else: | |
#run the function, reset the hit count | |
self.memoized[key] = [self.function(*args,**kwargs),0] | |
self.update_disk_cache() | |
elif self.max_age: | |
if key in self.memoized and \ | |
datetime.datetime.today()<(self.memoized[key][1] + datetime.timedelta(seconds=self.max_age)): | |
#use the cached value | |
pass | |
else: | |
#run the function, set the date | |
self.memoized[key] = [self.function(*args,**kwargs),datetime.datetime.today()] | |
self.update_disk_cache() | |
else: | |
#Always use value if in cache, never expires | |
if key not in self.memoized: | |
self.memoized[key] = [self.function(*args,**kwargs)] | |
self.update_disk_cache() | |
return self.memoized[key][0] | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment