Last active
December 18, 2015 03:08
-
-
Save shadda/5715778 to your computer and use it in GitHub Desktop.
General purpose python utility script, copyright (c) 1991 herald zoidberg
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys, os | |
| import time | |
| import pytz | |
| import math | |
| import traceback | |
| import oursql | |
| import re | |
| import eventlet | |
| from eventlet import GreenPool | |
| import pycurl | |
| import cStringIO | |
| import pickle | |
| import hashlib | |
| import urllib | |
| from lxml import etree | |
| from urllib import urlencode | |
| from app.lib.pyparallelcurl import ParallelCurl | |
| class Tools(object): | |
| @staticmethod | |
| def coalesce(*args): | |
| for _arg in args: | |
| if _arg is not None: | |
| return _arg | |
| return None | |
| @staticmethod | |
| def md5(data): | |
| m = hashlib.md5() | |
| if isinstance(data, str): | |
| m.update(data) | |
| else: | |
| _tmp = pickle.dumps(data) | |
| m.update(_tmp) | |
| return m.hexdigest() | |
| @staticmethod | |
| def fetchDataAsync(requests, max_requests=10): | |
| _results = [] | |
| def _callback(ch, response): | |
| _results.append( response['result_buffer'].getvalue() ) | |
| fetch = Fetch(max_requests) | |
| for request in requests: | |
| request.callback = _callback | |
| fetch.enqueue(request) | |
| fetch.finishAllRequests() | |
| return _results | |
| @staticmethod | |
| def fetchoneAssoc(cursor): | |
| """Returns a dict with columns-as-keys from a query.""" | |
| data = cursor.fetchone() | |
| if data == None: | |
| return None | |
| #Get the field descriptors for this row | |
| desc = cursor.description | |
| dic = {} | |
| #zip returns a list of tuple where each nTh item | |
| #contains the nTh item of each argument. | |
| for (name, value) in zip(desc, data): | |
| dic[name[0].lower()] = value | |
| return dic | |
| @staticmethod | |
| def fetchallAssoc(cursor): | |
| """Returns a dict with columns-as-keys from a query.""" | |
| rows = [] | |
| desc = cursor.description | |
| for data in cursor: | |
| if data == None: | |
| return None | |
| #Get the field descriptors for this row | |
| dic = {} | |
| #zip returns a list of tuple where each nTh item | |
| #contains the nTh item of each argument. | |
| for (name, value) in zip(desc, data): | |
| dic[name[0].lower()] = value | |
| rows.append(dic) | |
| return rows | |
| @staticmethod | |
| def fetchAssoc(row, desc): | |
| """Returns a dict with columns-as-keys from a query.""" | |
| #Get the field descriptors for this row | |
| dic = {} | |
| #zip returns a list of tuple where each nTh item | |
| #contains the nTh item of each argument. | |
| for (name, value) in zip(desc, row): | |
| dic[name[0].lower()] = value | |
| return dic | |
| @staticmethod | |
| def mostCommon(data): | |
| return max(set(data), key=data.count) | |
| @staticmethod | |
| def strFromUnicodeRecursive(data): | |
| """ | |
| json.load() returns unicode strings, which, | |
| in a sane world would be fine, but many python | |
| libraries check explicitly for 'str' | |
| """ | |
| def _decodeDict(data): | |
| _res = {} | |
| for k,v in data.iteritems(): | |
| if isinstance(k, unicode): | |
| k = str(k) | |
| if isinstance(v, unicode): | |
| v = str(v) | |
| if isinstance(v, list): | |
| v = _decodeList(v) | |
| if isinstance(v, dict): | |
| v = _decodeDict(v) | |
| _res[k] = v | |
| return _res | |
| def _decodeList(data): | |
| _res = [] | |
| for v in data: | |
| if isinstance(v, unicode): | |
| v = str(v) | |
| if isinstance(v, list): | |
| v = _decodeList(v) | |
| if isinstance(v, dict): | |
| v = _decodeDict(v) | |
| _res.append(v) | |
| return _res | |
| return _decodeDict(data) | |
| @staticmethod | |
| def getDataPool(data, max_entries): | |
| """ | |
| The point of this function is that some API calls | |
| limit the number of values you can pass in a single request. | |
| This function breaks up any list of values into | |
| pools from 0-N where N is max_entries | |
| """ | |
| if type(data) != list: | |
| raise Exception("Unable to create data pool: invalid parameter") | |
| _len = len(data) | |
| _results = [] | |
| if _len >= max_entries: | |
| _pool = [] | |
| _pools = int(math.ceil( _len / max_entries )) | |
| for i in xrange(0, _pools + 1): | |
| _min = max_entries * i | |
| _max = min(_min + max_entries, _len) | |
| _pool = data[_min:_max] | |
| if len(_pool) > 0: | |
| _results.append(_pool) | |
| elif _len > 0: | |
| _results = [data] | |
| return _results | |
| @staticmethod | |
| def mutex(pidFile): | |
| """Creates a pid file that blocks any new processes from spawning""" | |
| def _writePid(): | |
| _pid = os.getpid() | |
| with open(pidFile, "w") as pidHandle: | |
| pidHandle.write("%s" % _pid) | |
| return _pid | |
| def _getPid(): | |
| if os.access(pidFile, os.F_OK): | |
| with open(pidFile, "r") as pidHandle: | |
| pidHandle.seek(0) | |
| pid = pidHandle.readline() | |
| if pid: return pid | |
| return None | |
| return None | |
| def _running(pid): | |
| return pid and os.path.exists("/proc/%s" % pid) | |
| _pid = _getPid() | |
| if _pid: | |
| if _running(_pid): | |
| Tools.log("This process is already running.") | |
| sys.exit(1) | |
| else: | |
| Tools.log("Pid found, but not running. Cleaning up.") | |
| os.remove(pidFile) | |
| return _writePid() | |
| @staticmethod | |
| def unique(seq, fn=None): | |
| if not fn: | |
| def fn(x): return x | |
| seen = {} | |
| result = [] | |
| for item in seq: | |
| marker = fn(item) | |
| if marker in seen: continue | |
| seen[marker] = 1 | |
| result.append(item) | |
| return result | |
| @staticmethod | |
| def log(message): | |
| #logger.Logger.instance().debug(message) | |
| print message | |
| @staticmethod | |
| def notice(message): | |
| print message | |
| @staticmethod | |
| def exception(e): | |
| print traceback.format_exc(e) | |
| #logger.Logger.instance().exception(e) | |
| @staticmethod | |
| def convertUTCDateToLocal(event_date): | |
| """Accepts a datetime object in UTC, and converts it to the timezone | |
| specified""" | |
| #Create our timezone objects | |
| tz_utc = pytz.utc | |
| tz_local = pytz.timezone('America/Chicago') | |
| #Convert our date object (lacking timezone) to UTC (with timezone) | |
| date_utc = tz_utc.localize(event_date) | |
| #Convert out new UTC date to localtime | |
| date_local = date_utc.astimezone(tz_local) | |
| return date_local | |
| @staticmethod | |
| def striptags(value): | |
| return re.sub(r'<[^>]*>', '', value) | |
| @staticmethod | |
| def boolval(value): | |
| if not value: | |
| return False | |
| if str(value).lower() in ('t','true'): | |
| return True | |
| try: | |
| if int(value) == 1: | |
| return True | |
| except ValueError: | |
| pass | |
| return False | |
| @staticmethod | |
| def log(msg): | |
| print msg | |
| class FetchRequest(object): | |
| def __init__(self, url, method='GET', data=None, options={}, headers={}, verifyssl=True, callback=None, timeout=None, userdata=None): | |
| self.url = url | |
| self.method = method | |
| self.data = data | |
| self.headers = headers | |
| self.verifyssl = verifyssl | |
| self.callback = callback | |
| self.options = options | |
| self.userdata = userdata | |
| self.options.update({ | |
| pycurl.SSL_VERIFYPEER: False, | |
| pycurl.SSL_VERIFYHOST: False, | |
| pycurl.USERAGENT: 'Bright API Client', | |
| pycurl.FOLLOWLOCATION: True, | |
| }) | |
| if timeout: | |
| self.options[pycurl.TIMEOUT] = timeout | |
| class Fetch(object): | |
| _max_requests = 10 | |
| _pending_requests = {} | |
| _options = {} | |
| _multi_handle = None | |
| def __init__(self, max_requests=10, options={}): | |
| self._max_requests= max_requests | |
| self._options = options | |
| self._multi_handle = pycurl.CurlMulti() | |
| def __del__(self): | |
| self.finishAllRequests() | |
| def setMaxRequests(self, max_requests): | |
| self._max_requests = max_requests | |
| def setOptions(self, options): | |
| self._options = options | |
| def enqueue(self, request, callback=None): | |
| if not isinstance(request, FetchRequest): | |
| raise TypeError('Invalid request passed to Fetch.enqueue') | |
| if self._max_requests > 0: | |
| self.waitFor(self._max_requests) | |
| ch = pycurl.Curl() | |
| for key, value in request.options.items(): | |
| ch.setopt(key, value) | |
| ch.setopt(pycurl.URL, request.url) | |
| _headers = ["%s: %s" % (k, v) for k,v in request.headers.items()] | |
| ch.setopt(pycurl.HTTPHEADER, _headers) | |
| _buffer = cStringIO.StringIO() | |
| ch.setopt(pycurl.WRITEFUNCTION, _buffer.write) | |
| if request.method == 'POST': | |
| ch.setopt(pycurl.POST, True) | |
| ch.setopt(pycurl.POSTFIELDS, str(request.data)) | |
| self._multi_handle.add_handle(ch) | |
| self._pending_requests[ch] = { | |
| 'handle': ch, | |
| 'result_buffer': _buffer, | |
| 'request': request | |
| } | |
| self.checkForCompleteRequests() | |
| def finishAllRequests(self): | |
| self.waitFor(1) | |
| def checkForCompleteRequests(self): | |
| if self._multi_handle.select(1.0) == -1: | |
| return | |
| while True: | |
| ret, num_handles = self._multi_handle.perform() | |
| if ret != pycurl.E_CALL_MULTI_PERFORM: | |
| break | |
| while True: | |
| num_q, ok_list, err_list = self._multi_handle.info_read() | |
| for ch in ok_list: | |
| if ch not in self._pending_requests: | |
| raise RuntimeError("Error - handle wasn't found in requests: %s" % ch) | |
| response = self._pending_requests[ch] | |
| if response['request'].callback: | |
| response['request'].callback(ch, response) | |
| self._multi_handle.remove_handle(ch) | |
| del self._pending_requests[ch] | |
| for ch, errno, errmsg in err_list: | |
| if ch not in self._pending_requests: | |
| raise RuntimeError("Error - handle wasn't found in requests: %s" % ch) | |
| response = self._pending_requests[ch] | |
| if response['request'].callback: | |
| response['request'].callback(ch, response) | |
| self._multi_handle.remove_handle(ch) | |
| del self._pending_requests[ch] | |
| if num_q < 1: | |
| break | |
| def waitFor(self, num_requests): | |
| while True: | |
| self.checkForCompleteRequests() | |
| if len(self._pending_requests) < num_requests: | |
| break | |
| time.sleep(0.01) | |
| class NamedDictCursor(oursql.DictCursor): | |
| """ | |
| NamedDictCursor provides named parameter | |
| binding for oursql prepared statements. | |
| """ | |
| def _convertNamed(self, sql, params): | |
| matches = re.findall(r':([^\s\W]+)', sql) | |
| qparams = [] | |
| for match in matches: | |
| if match in params: | |
| qparams.append(params[match]) | |
| _sql = re.sub(r':[^\s\W]+', '?', sql) | |
| return (_sql, qparams) | |
| def execute(self, sql, params, **args): | |
| if isinstance(params, dict): | |
| sql, params = self._convertNamed(sql, params) | |
| return super(NamedDictCursor, self).execute(sql, params, *args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment