Skip to content

Instantly share code, notes, and snippets.

@shadda
Created September 3, 2013 21:53
Show Gist options
  • Select an option

  • Save shadda/6430097 to your computer and use it in GitHub Desktop.

Select an option

Save shadda/6430097 to your computer and use it in GitHub Desktop.
import sys, os
import time
import pytz
import math
import traceback
import re
import eventlet
from datetime import datetime
from eventlet import GreenPool
import pycurl
import cStringIO
import pickle
import hashlib
import urllib
import pprint
import json
from lxml import etree
from urllib import urlencode
from app.lib.pyparallelcurl import ParallelCurl
class Tools(object):
@staticmethod
def coalesce(*args):
for _arg in args:
if _arg is not None:
return _arg
return None
@staticmethod
def md5(data):
m = hashlib.md5()
if isinstance(data, str):
m.update(data)
else:
_tmp = pickle.dumps(data)
m.update(_tmp)
return m.hexdigest()
@staticmethod
def fetchDataAsync(requests, max_requests=10):
_results = []
def _callback(ch, response):
_results.append( response['result_buffer'].getvalue() )
fetch = Fetch(max_requests)
for request in requests:
request.callback = _callback
fetch.enqueue(request)
fetch.finishAllRequests()
return _results
@staticmethod
def fetchoneAssoc(cursor):
"""Returns a dict with columns-as-keys from a query."""
data = cursor.fetchone()
if data == None:
return None
#Get the field descriptors for this row
desc = cursor.description
dic = {}
#zip returns a list of tuple where each nTh item
#contains the nTh item of each argument.
for (name, value) in zip(desc, data):
dic[name[0].lower()] = value
return dic
@staticmethod
def fetchallAssoc(cursor):
"""Returns a dict with columns-as-keys from a query."""
rows = []
desc = cursor.description
for data in cursor:
if data == None:
return None
#Get the field descriptors for this row
dic = {}
#zip returns a list of tuple where each nTh item
#contains the nTh item of each argument.
for (name, value) in zip(desc, data):
dic[name[0].lower()] = value
rows.append(dic)
return rows
@staticmethod
def fetchAssoc(row, desc):
"""Returns a dict with columns-as-keys from a query."""
#Get the field descriptors for this row
dic = {}
#zip returns a list of tuple where each nTh item
#contains the nTh item of each argument.
for (name, value) in zip(desc, row):
dic[name[0].lower()] = value
return dic
@staticmethod
def mostCommon(data):
return max(set(data), key=data.count)
@staticmethod
def strFromUnicodeRecursive(data):
"""
json.load() returns unicode strings, which,
in a sane world would be fine, but many python
libraries check explicitly for 'str'
"""
def _decodeDict(data):
_res = {}
for k,v in data.iteritems():
if isinstance(k, unicode):
k = str(k)
if isinstance(v, unicode):
v = str(v)
if isinstance(v, list):
v = _decodeList(v)
if isinstance(v, dict):
v = _decodeDict(v)
_res[k] = v
return _res
def _decodeList(data):
_res = []
for v in data:
if isinstance(v, unicode):
v = str(v)
if isinstance(v, list):
v = _decodeList(v)
if isinstance(v, dict):
v = _decodeDict(v)
_res.append(v)
return _res
return _decodeDict(data)
@staticmethod
def getDataPool(data, max_entries):
"""
The point of this function is that some API calls
limit the number of values you can pass in a single request.
This function breaks up any list of values into
pools from 0-N where N is max_entries
"""
if type(data) != list:
raise Exception("Unable to create data pool: invalid parameter")
_len = len(data)
_results = []
if _len >= max_entries:
_pool = []
_pools = int(math.ceil( _len / max_entries ))
for i in xrange(0, _pools + 1):
_min = max_entries * i
_max = min(_min + max_entries, _len)
_pool = data[_min:_max]
if len(_pool) > 0:
_results.append(_pool)
elif _len > 0:
_results = [data]
return _results
@staticmethod
def mutex(pidFile):
"""Creates a pid file that blocks any new processes from spawning"""
def _writePid():
_pid = os.getpid()
with open(pidFile, "w") as pidHandle:
pidHandle.write("%s" % _pid)
return _pid
def _getPid():
if os.access(pidFile, os.F_OK):
with open(pidFile, "r") as pidHandle:
pidHandle.seek(0)
pid = pidHandle.readline()
if pid: return pid
return None
return None
def _running(pid):
return pid and os.path.exists("/proc/%s" % pid)
_pid = _getPid()
if _pid:
if _running(_pid):
Tools.log("This process is already running.")
sys.exit(1)
else:
Tools.log("Pid found, but not running. Cleaning up.")
os.remove(pidFile)
return _writePid()
@staticmethod
def unique(seq, fn=None):
if not fn:
def fn(x): return x
seen = {}
result = []
for item in seq:
marker = fn(item)
if marker in seen: continue
seen[marker] = 1
result.append(item)
return result
@staticmethod
def log(message):
#logger.Logger.instance().debug(message)
print message
@staticmethod
def notice(message):
print message
@staticmethod
def exception(e):
print traceback.format_exc(e)
#logger.Logger.instance().exception(e)
@staticmethod
def convertUTCDateToLocal(event_date):
"""Accepts a datetime object in UTC, and converts it to the timezone
specified"""
#Create our timezone objects
tz_utc = pytz.utc
tz_local = pytz.timezone('America/Chicago')
#Convert our date object (lacking timezone) to UTC (with timezone)
date_utc = tz_utc.localize(event_date)
#Convert out new UTC date to localtime
date_local = date_utc.astimezone(tz_local)
return date_local
@staticmethod
def striptags(value):
return re.sub(r'<[^>]*>', '', value)
@staticmethod
def boolval(value):
if not value:
return False
if str(value).lower() in ('t','true'):
return True
try:
if int(value) == 1:
return True
except ValueError:
pass
return False
@staticmethod
def parseCookies(cookieString):
cookies = {}
for cookie in cookieString.split(';'):
parts = cookie.split('=')
key = parts[0].strip()
value = (parts[1] if parts[1] else '').strip()
cookies[key] = value
return cookies
@staticmethod
def dump(*args):
for obj in args:
pprint.PrettyPrinter(indent=4, depth=6).pprint(obj)
class FetchRequest(object):
def __init__(self, url, method='GET', data=None, options={}, headers={}, verifyssl=True, callback=None, timeout=None, userdata=None):
self.url = url
self.method = method
self.data = data
self.headers = headers
self.verifyssl = verifyssl
self.callback = callback
self.options = options
self.userdata = userdata
self.options.update({
pycurl.SSL_VERIFYPEER: False,
pycurl.SSL_VERIFYHOST: False,
pycurl.USERAGENT: 'Bright API Client',
pycurl.FOLLOWLOCATION: True,
})
if timeout:
self.options[pycurl.TIMEOUT] = timeout
class Fetch(object):
_max_requests = 10
_pending_requests = {}
_options = {}
_multi_handle = None
def __init__(self, max_requests=10, options={}):
self._max_requests= max_requests
self._options = options
self._multi_handle = pycurl.CurlMulti()
def __del__(self):
self.finishAllRequests()
def setMaxRequests(self, max_requests):
self._max_requests = max_requests
def setOptions(self, options):
self._options = options
def enqueue(self, request, callback=None):
if not isinstance(request, FetchRequest):
raise TypeError('Invalid request passed to Fetch.enqueue')
if self._max_requests > 0:
self.waitFor(self._max_requests)
ch = pycurl.Curl()
for key, value in request.options.items():
ch.setopt(key, value)
ch.setopt(pycurl.URL, request.url)
_headers = ["%s: %s" % (k, v) for k,v in request.headers.items()]
ch.setopt(pycurl.HTTPHEADER, _headers)
_buffer = cStringIO.StringIO()
ch.setopt(pycurl.WRITEFUNCTION, _buffer.write)
if request.method == 'POST':
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, str(request.data))
self._multi_handle.add_handle(ch)
self._pending_requests[ch] = {
'handle': ch,
'result_buffer': _buffer,
'request': request
}
self.checkForCompleteRequests()
def finishAllRequests(self):
self.waitFor(1)
def checkForCompleteRequests(self):
if self._multi_handle.select(1.0) == -1:
return
while True:
ret, num_handles = self._multi_handle.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
while True:
num_q, ok_list, err_list = self._multi_handle.info_read()
for ch in ok_list:
if ch not in self._pending_requests:
raise RuntimeError("Error - handle wasn't found in requests: %s" % ch)
response = self._pending_requests[ch]
if response['request'].callback:
response['request'].callback(ch, response)
self._multi_handle.remove_handle(ch)
del self._pending_requests[ch]
for ch, errno, errmsg in err_list:
if ch not in self._pending_requests:
raise RuntimeError("Error - handle wasn't found in requests: %s" % ch)
response = self._pending_requests[ch]
if response['request'].callback:
response['request'].callback(ch, response)
self._multi_handle.remove_handle(ch)
del self._pending_requests[ch]
if num_q < 1:
break
def waitFor(self, num_requests):
while True:
self.checkForCompleteRequests()
if len(self._pending_requests) < num_requests:
break
time.sleep(0.01)
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if hasattr(obj, '__getstate__'):
return obj.__getstate__()
else:
return super(JsonEncoder, self).default(obj)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment