Skip to content

Instantly share code, notes, and snippets.

@shadda
Last active December 18, 2015 03:08
Show Gist options
  • Select an option

  • Save shadda/5715778 to your computer and use it in GitHub Desktop.

Select an option

Save shadda/5715778 to your computer and use it in GitHub Desktop.
General purpose python utility script, copyright (c) 1991 herald zoidberg
import sys, os
import time
import pytz
import math
import traceback
import oursql
import re
import eventlet
from eventlet import GreenPool
import pycurl
import cStringIO
import pickle
import hashlib
import urllib
from lxml import etree
from urllib import urlencode
from app.lib.pyparallelcurl import ParallelCurl
class Tools(object):
@staticmethod
def coalesce(*args):
for _arg in args:
if _arg is not None:
return _arg
return None
@staticmethod
def md5(data):
m = hashlib.md5()
if isinstance(data, str):
m.update(data)
else:
_tmp = pickle.dumps(data)
m.update(_tmp)
return m.hexdigest()
@staticmethod
def fetchDataAsync(requests, max_requests=10):
_results = []
def _callback(ch, response):
_results.append( response['result_buffer'].getvalue() )
fetch = Fetch(max_requests)
for request in requests:
request.callback = _callback
fetch.enqueue(request)
fetch.finishAllRequests()
return _results
@staticmethod
def fetchoneAssoc(cursor):
"""Returns a dict with columns-as-keys from a query."""
data = cursor.fetchone()
if data == None:
return None
#Get the field descriptors for this row
desc = cursor.description
dic = {}
#zip returns a list of tuple where each nTh item
#contains the nTh item of each argument.
for (name, value) in zip(desc, data):
dic[name[0].lower()] = value
return dic
@staticmethod
def fetchallAssoc(cursor):
"""Returns a dict with columns-as-keys from a query."""
rows = []
desc = cursor.description
for data in cursor:
if data == None:
return None
#Get the field descriptors for this row
dic = {}
#zip returns a list of tuple where each nTh item
#contains the nTh item of each argument.
for (name, value) in zip(desc, data):
dic[name[0].lower()] = value
rows.append(dic)
return rows
@staticmethod
def fetchAssoc(row, desc):
"""Returns a dict with columns-as-keys from a query."""
#Get the field descriptors for this row
dic = {}
#zip returns a list of tuple where each nTh item
#contains the nTh item of each argument.
for (name, value) in zip(desc, row):
dic[name[0].lower()] = value
return dic
@staticmethod
def mostCommon(data):
return max(set(data), key=data.count)
@staticmethod
def strFromUnicodeRecursive(data):
"""
json.load() returns unicode strings, which,
in a sane world would be fine, but many python
libraries check explicitly for 'str'
"""
def _decodeDict(data):
_res = {}
for k,v in data.iteritems():
if isinstance(k, unicode):
k = str(k)
if isinstance(v, unicode):
v = str(v)
if isinstance(v, list):
v = _decodeList(v)
if isinstance(v, dict):
v = _decodeDict(v)
_res[k] = v
return _res
def _decodeList(data):
_res = []
for v in data:
if isinstance(v, unicode):
v = str(v)
if isinstance(v, list):
v = _decodeList(v)
if isinstance(v, dict):
v = _decodeDict(v)
_res.append(v)
return _res
return _decodeDict(data)
@staticmethod
def getDataPool(data, max_entries):
"""
The point of this function is that some API calls
limit the number of values you can pass in a single request.
This function breaks up any list of values into
pools from 0-N where N is max_entries
"""
if type(data) != list:
raise Exception("Unable to create data pool: invalid parameter")
_len = len(data)
_results = []
if _len >= max_entries:
_pool = []
_pools = int(math.ceil( _len / max_entries ))
for i in xrange(0, _pools + 1):
_min = max_entries * i
_max = min(_min + max_entries, _len)
_pool = data[_min:_max]
if len(_pool) > 0:
_results.append(_pool)
elif _len > 0:
_results = [data]
return _results
@staticmethod
def mutex(pidFile):
"""Creates a pid file that blocks any new processes from spawning"""
def _writePid():
_pid = os.getpid()
with open(pidFile, "w") as pidHandle:
pidHandle.write("%s" % _pid)
return _pid
def _getPid():
if os.access(pidFile, os.F_OK):
with open(pidFile, "r") as pidHandle:
pidHandle.seek(0)
pid = pidHandle.readline()
if pid: return pid
return None
return None
def _running(pid):
return pid and os.path.exists("/proc/%s" % pid)
_pid = _getPid()
if _pid:
if _running(_pid):
Tools.log("This process is already running.")
sys.exit(1)
else:
Tools.log("Pid found, but not running. Cleaning up.")
os.remove(pidFile)
return _writePid()
@staticmethod
def unique(seq, fn=None):
if not fn:
def fn(x): return x
seen = {}
result = []
for item in seq:
marker = fn(item)
if marker in seen: continue
seen[marker] = 1
result.append(item)
return result
@staticmethod
def log(message):
#logger.Logger.instance().debug(message)
print message
@staticmethod
def notice(message):
print message
@staticmethod
def exception(e):
print traceback.format_exc(e)
#logger.Logger.instance().exception(e)
@staticmethod
def convertUTCDateToLocal(event_date):
"""Accepts a datetime object in UTC, and converts it to the timezone
specified"""
#Create our timezone objects
tz_utc = pytz.utc
tz_local = pytz.timezone('America/Chicago')
#Convert our date object (lacking timezone) to UTC (with timezone)
date_utc = tz_utc.localize(event_date)
#Convert out new UTC date to localtime
date_local = date_utc.astimezone(tz_local)
return date_local
@staticmethod
def striptags(value):
return re.sub(r'<[^>]*>', '', value)
@staticmethod
def boolval(value):
if not value:
return False
if str(value).lower() in ('t','true'):
return True
try:
if int(value) == 1:
return True
except ValueError:
pass
return False
@staticmethod
def log(msg):
print msg
class FetchRequest(object):
def __init__(self, url, method='GET', data=None, options={}, headers={}, verifyssl=True, callback=None, timeout=None, userdata=None):
self.url = url
self.method = method
self.data = data
self.headers = headers
self.verifyssl = verifyssl
self.callback = callback
self.options = options
self.userdata = userdata
self.options.update({
pycurl.SSL_VERIFYPEER: False,
pycurl.SSL_VERIFYHOST: False,
pycurl.USERAGENT: 'Bright API Client',
pycurl.FOLLOWLOCATION: True,
})
if timeout:
self.options[pycurl.TIMEOUT] = timeout
class Fetch(object):
_max_requests = 10
_pending_requests = {}
_options = {}
_multi_handle = None
def __init__(self, max_requests=10, options={}):
self._max_requests= max_requests
self._options = options
self._multi_handle = pycurl.CurlMulti()
def __del__(self):
self.finishAllRequests()
def setMaxRequests(self, max_requests):
self._max_requests = max_requests
def setOptions(self, options):
self._options = options
def enqueue(self, request, callback=None):
if not isinstance(request, FetchRequest):
raise TypeError('Invalid request passed to Fetch.enqueue')
if self._max_requests > 0:
self.waitFor(self._max_requests)
ch = pycurl.Curl()
for key, value in request.options.items():
ch.setopt(key, value)
ch.setopt(pycurl.URL, request.url)
_headers = ["%s: %s" % (k, v) for k,v in request.headers.items()]
ch.setopt(pycurl.HTTPHEADER, _headers)
_buffer = cStringIO.StringIO()
ch.setopt(pycurl.WRITEFUNCTION, _buffer.write)
if request.method == 'POST':
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, str(request.data))
self._multi_handle.add_handle(ch)
self._pending_requests[ch] = {
'handle': ch,
'result_buffer': _buffer,
'request': request
}
self.checkForCompleteRequests()
def finishAllRequests(self):
self.waitFor(1)
def checkForCompleteRequests(self):
if self._multi_handle.select(1.0) == -1:
return
while True:
ret, num_handles = self._multi_handle.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
while True:
num_q, ok_list, err_list = self._multi_handle.info_read()
for ch in ok_list:
if ch not in self._pending_requests:
raise RuntimeError("Error - handle wasn't found in requests: %s" % ch)
response = self._pending_requests[ch]
if response['request'].callback:
response['request'].callback(ch, response)
self._multi_handle.remove_handle(ch)
del self._pending_requests[ch]
for ch, errno, errmsg in err_list:
if ch not in self._pending_requests:
raise RuntimeError("Error - handle wasn't found in requests: %s" % ch)
response = self._pending_requests[ch]
if response['request'].callback:
response['request'].callback(ch, response)
self._multi_handle.remove_handle(ch)
del self._pending_requests[ch]
if num_q < 1:
break
def waitFor(self, num_requests):
while True:
self.checkForCompleteRequests()
if len(self._pending_requests) < num_requests:
break
time.sleep(0.01)
class NamedDictCursor(oursql.DictCursor):
"""
NamedDictCursor provides named parameter
binding for oursql prepared statements.
"""
def _convertNamed(self, sql, params):
matches = re.findall(r':([^\s\W]+)', sql)
qparams = []
for match in matches:
if match in params:
qparams.append(params[match])
_sql = re.sub(r':[^\s\W]+', '?', sql)
return (_sql, qparams)
def execute(self, sql, params, **args):
if isinstance(params, dict):
sql, params = self._convertNamed(sql, params)
return super(NamedDictCursor, self).execute(sql, params, *args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment