Last active
September 23, 2019 15:47
-
-
Save oprietop/89848967f1843985e586c1e6f0af48b0 to your computer and use it in GitHub Desktop.
Customizable CurlMulti Wrapper with influxdb support.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python -u | |
# -*- coding: utf8 -*- | |
import json | |
import pycurl | |
from io import BytesIO | |
from urllib.parse import urlencode, urlparse | |
from influxdb import InfluxDBClient | |
# Configuration Defaults | |
DEFAULTS = { 'influxdb': { 'host': '127.0.0.1' | |
, 'port': '8086' | |
, 'user': 'root' | |
, 'pass': 'root' | |
, 'db': 'test' | |
, 'retention': '31d' | |
} | |
, 'pycurl' : { 'connect_timeout': 10 | |
, 'timeout': 10 | |
, 'max_conns': 10 | |
, 'reuse': True | |
, 'output': { 'console': True | |
, 'headers': True | |
, 'timeline': True | |
, 'verbose': False | |
} | |
} | |
} | |
#{{{ Non-configuration constants | |
ANSII = { 'bold': '\033[1m' | |
, 'black': '\033[30m', 'red': '\033[31m' , 'green': '\033[32m', 'orange': '\033[33m' | |
, 'blue': '\033[34m', 'purple': '\033[35m' , 'cyan': '\033[36m', 'lightgrey': '\033[37m' | |
, 'end': '\033[0m' | |
, 'darkgrey': '\033[90m', 'lightred': '\033[91m', 'lightgreen': '\033[92m', 'yellow': '\033[93m' | |
, 'lightblue': '\033[94m', 'pink': '\033[95m', 'lightcyan': '\033[96m' | |
} | |
CURLOPTS = { 'time' : [ 'NAMELOOKUP_TIME', 'CONNECT_TIME' | |
, 'APPCONNECT_TIME', 'PRETRANSFER_TIME' | |
, 'STARTTRANSFER_TIME', 'TOTAL_TIME', 'REDIRECT_TIME' | |
] | |
, 'size' : [ 'SIZE_UPLOAD', 'SIZE_DOWNLOAD' ] | |
, 'header' : [ 'HEADER_SIZE', 'REQUEST_SIZE' ] | |
, 'length' : [ 'CONTENT_LENGTH_DOWNLOAD', 'CONTENT_LENGTH_UPLOAD' ] | |
, 'code' : [ 'HTTP_CODE', 'HTTP_CONNECTCODE' ] | |
, 'count' : [ 'NUM_CONNECTS', 'REDIRECT_COUNT' ] | |
, 'speed' : [ 'SPEED_UPLOAD', 'SPEED_DOWNLOAD' ] | |
} | |
OUTPUT = """Timeline: | |
| | |
|--{yellow}NAMELOOKUP{end} {namelookup_time}ms | |
|--|--{cyan}CONNECT{end} {connect_time}ms ({yellow}CONNECT{end} +{connect}ms) | |
|--|--|--{cyan}APPCONNECT{end} {appconnect_time}ms ({yellow}SSL{end} +{ssl}ms) | |
|--|--|--|--{cyan}PRETRANSFER{end} {pretransfer_time}ms ({yellow}PRETRANSFER{end} +{pretransfer_time}ms) | |
|--|--|--|--|--{cyan}STARTTRANSFER{end} {starttransfer_time}ms ({yellow}WAIT{end} +{server}ms) | |
|--|--|--|--|--|--{cyan}TOTAL{end} {total_time}ms ({yellow}RECEIVE{end} +{transfer}ms) | |
\--|--|--|--|--|--{lightred}REDIRECT{end} {redirect_time}ms | |
""" | |
#}}} | |
class influxDBWrapper(object): | |
#{{{ def __init__ | |
def __init__(self): | |
self.points = [] | |
self.influxdb_host = DEFAULTS['influxdb']['host'] | |
self.influxdb_port = DEFAULTS['influxdb']['port'] | |
self.influxdb_user = DEFAULTS['influxdb']['user'] | |
self.influxdb_pass = DEFAULTS['influxdb']['pass'] | |
self.influxdb_db = DEFAULTS['influxdb']['db'] | |
self.influxdb_retention = DEFAULTS['influxdb']['retention'] | |
self.connect() | |
#}}} | |
#{{{ def connect | |
def connect(self): | |
self.influxdb = InfluxDBClient(self.influxdb_host, self.influxdb_port, self.influxdb_user, self.influxdb_pass, self.influxdb_db) | |
self.influxdb.create_database(self.influxdb_db) | |
self.influxdb.create_retention_policy(self.influxdb_retention, self.influxdb_retention, 1, default=True) | |
#}}} | |
#{{{ def ping | |
def ping(self): | |
print(self.influxdb.request('ping', expected_response_code=204)) | |
#}}} | |
#{{{ def add | |
def add(self, p): | |
self.points.extend(p) | |
#}}} | |
#{{{ def push | |
def push(self, points=None): | |
if not points: | |
points = self.points | |
self.influxdb.write_points(points) | |
#}}} | |
#{{{ def print | |
def print(self): | |
print(self.points) | |
#}}} | |
#{{{ def drop_db | |
def drop_db(self): | |
self.influxdb.drop_database(self.influxdb_db) | |
#}}} | |
class CurlWrapper(object): | |
#{{{ def __init__ | |
def __init__(self): | |
self.requests = [] | |
self.connect_timeout = DEFAULTS['pycurl']['connect_timeout'] | |
self.timeout = DEFAULTS['pycurl']['timeout'] | |
self.max_conns = DEFAULTS['pycurl']['max_conns'] | |
self.reuse = DEFAULTS['pycurl']['reuse'] | |
self.output = DEFAULTS['pycurl']['output'] | |
#}}} | |
#{{{def fetch | |
def fetch(self, r): | |
return self.multi_requests([r]) | |
#}}} | |
#{{{ def add | |
def add(self, r): | |
self.requests.append(r) | |
#}}} | |
#{{{ def perform | |
def perform(self, max_conns=None): | |
return self.multi_requests(self.requests, max_conns) | |
#}}} | |
#{{{ def multi_requests | |
def multi_requests(self, requests, max_conns=None): | |
responses = [] | |
num_handles = 0 | |
max_conns = max_conns or self.max_conns | |
cm = pycurl.CurlMulti() | |
#Loop while we got requests to launch or running handles | |
while requests or num_handles: | |
# Debug | |
if self.output['verbose']: | |
print( '\r' | |
, 'requests', len(self.requests) | |
, 'responses', len(responses) | |
, 'num_handles', num_handles | |
, end='' | |
) | |
# Feed CurlMulti with requests | |
while requests and num_handles < max_conns: | |
r = requests.pop(0) | |
c = self.prepare_curl_handle(**r) | |
# Add handle to cm | |
cm.add_handle(c) | |
num_handles += 1 | |
# Add to results to avoid c losing scope | |
responses.append(c) | |
# Call cm.perform forever | |
while True: | |
ret, num_handles = cm.perform() | |
# Break the loop when cm wants to wait | |
if ret != pycurl.E_CALL_MULTI_PERFORM: | |
break | |
cm.select(1.0) | |
# Get the results | |
_, ok_list, err_list = cm.info_read() | |
# Process the ok_list | |
for c in ok_list: | |
cm.remove_handle(c) | |
# Process the err_list | |
for c, errno, errmsg in err_list: | |
c.errno, c.errmsg = str(errno), str(errmsg) | |
cm.remove_handle(c) | |
# Generate timings and outputs | |
for c in responses: | |
self.get_timings(c) | |
if c.output['console']: | |
self.print_info(c) | |
return responses | |
#}}} | |
#{{{ def prepare_curl_handle | |
def prepare_curl_handle(self, url, c=None, method='GET', params=None, headers=None, reuse=None, resolve=None, ua=None, id=None, output={}, tags={}, **kwargs): | |
# Create the Curl Handle and set defaults | |
if not c: | |
c = pycurl.Curl() | |
c.res = {} | |
c.res['id'] = 'default' | |
c.res['tags'] = {} | |
c.res['reuse'] = self.reuse | |
c.errno, c.errmsg = None, None | |
c.output = dict(self.output) | |
# Don't want to share the DNS cache between handles | |
s = pycurl.CurlShare() | |
s.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS) | |
c.setopt(pycurl.SHARE, s) | |
# Set output options | |
for o, b in output.items(): | |
if o in self.output and type(b) == bool: | |
c.output[o] = b | |
# Enable verbose mode if needed | |
if c.output['verbose']: | |
c.setopt(c.VERBOSE, 1) | |
# Avoid connection reuse if needed | |
if reuse == None: | |
c.res['reuse'] = self.reuse | |
if c.res['reuse'] == False: | |
c.setopt(pycurl.FORBID_REUSE, 1) | |
# Set request method | |
method = method.upper() | |
if method == 'GET': | |
c.setopt(c.HTTPGET, 1) | |
elif method == 'POST': | |
c.setopt(c.POST, 1) | |
else: | |
c.setopt(c.CUSTOMREQUEST, method) | |
# Set request params | |
if params: | |
if not isinstance(params, str): | |
print('Request params must be a string.') | |
else: | |
if method == 'GET': | |
url = url + '?' + params | |
else: | |
c.setopt(c.POSTFIELDS, params) | |
# User Agent | |
if ua and isinstance(ua, str): | |
c.setopt(c.USERAGENT, ua) | |
# Fake DNS cache | |
if resolve and isinstance(resolve, list): | |
c.setopt(c.RESOLVE, resolve) | |
# Request Headers | |
if headers and isinstance(headers, dict): | |
c.setopt(c.HTTPHEADER, [ '{}: {}'.format(key, value) for key, value in headers.items() ]) | |
# Response Headers | |
c.headers_text = "" | |
c.headers = {} | |
def headers_cb(x): | |
txt = x.decode('ascii') | |
c.headers_text += txt | |
if ': ' in txt: | |
k, v = txt.split(': ',1) | |
c.headers[k.strip()] = v.strip() | |
c.setopt(c.HEADERFUNCTION, headers_cb) | |
# Setup the curl object | |
c.buffer = BytesIO() | |
c.setopt(c.WRITEDATA, c.buffer) | |
c.setopt(c.URL, url) | |
c.setopt(c.CONNECTTIMEOUT, self.connect_timeout) | |
c.setopt(c.TIMEOUT, self.timeout) | |
c.setopt(c.FOLLOWLOCATION, 1) | |
c.setopt(c.MAXREDIRS, 5) | |
c.setopt(c.NOSIGNAL, 1) | |
c.setopt(c.COOKIEJAR, '/dev/null') | |
# Keep some info we'll need later | |
c.res['method'] = method | |
c.res['url'] = url | |
if id: | |
c.res['id'] = id | |
if tags: | |
c.res['tags'] = tags | |
# Store kwargs, they can be of use later | |
if hasattr(c, 'kwargs'): | |
c.kwargs = {**c.kwargs, **kwargs} | |
else: | |
c.kwargs = kwargs | |
return c | |
#}}} | |
#{{{ def get_timings | |
def get_timings(self, c): | |
# Populate tags | |
t = c.tags = {} | |
t['id'] = c.res['id'] | |
t['method'] = c.res['method'] | |
t['primary_ip'] = c.getinfo(c.PRIMARY_IP) | |
if hasattr(c, 'errno'): | |
t['errno'], t['errmsg'] = c.errno, c.errmsg | |
# Dismante the url | |
t['url'] = c.res['url'] | |
uparse = urlparse(t['url']) | |
t['scheme'], t['hostname'], t['path'] = uparse.scheme, uparse.hostname, uparse.path or '/' | |
# Dismantle the effective url | |
t['effective_url'] = c.getinfo(pycurl.EFFECTIVE_URL) | |
euparse = urlparse(t['effective_url']) | |
t['effective_scheme'], t['effective_hostname'], t['effective_path'] = euparse.scheme, euparse.hostname, euparse.path or '/' | |
# Overwrite the tags dictionary with the provided one | |
for k, v in c.res['tags'].items(): | |
t[k] = v | |
# Populate measurements | |
m = c.measurements = {} | |
for type, names in CURLOPTS.items(): | |
m[type] = {} | |
for name in names: | |
func = getattr(c, name) | |
name = name.lower() | |
m[type][name] = c.getinfo(func) | |
# Generate the range between timings | |
# http://blog.kenweiner.com/2014/11/http-request-timings-with-curl.html | |
r = ranges = {} | |
r['dns'], r['redirect'] = m['time']['namelookup_time'], m['time']['redirect_time'] | |
r['connect'] = m['time']['connect_time'] - m['time']['namelookup_time'] | |
r['ssl'] = 0.0 | |
if m['time']['appconnect_time'] > 0: | |
r['ssl'] = m['time']['appconnect_time'] - m['time']['connect_time'] | |
r['server'] = m['time']['starttransfer_time'] - m['time']['pretransfer_time'] | |
r['transfer'] = m['time']['total_time'] - m['time']['starttransfer_time'] | |
m['range'] = r | |
# Extra, non metric material | |
c.res['content_type'] = c.getinfo(c.CONTENT_TYPE) or None | |
c.res['encoding'] = 'iso-8859-1' | |
if c.res['content_type']: | |
content_type = c.res['content_type'].lower() | |
if 'charset=' in content_type: | |
c.res['encoding'] = content_type.split('=')[-1] | |
# Generate the Timeline | |
tr = time_and_ranges = {} | |
tr.update(m['time']) | |
tr.update(m['range']) | |
for k,v in tr.items(): | |
tr[k] = int(v*1000) | |
tr.update(ANSII) | |
c.res['timeline'] = OUTPUT.format(**tr) | |
# Store Body and Headers info | |
c.res['primary_ip'] = c.getinfo(c.PRIMARY_IP) | |
c.res['primary_port'] = c.getinfo(c.PRIMARY_PORT) | |
c.res['local_ip'] = c.getinfo(c.LOCAL_IP) | |
c.res['local_port'] = c.getinfo(c.LOCAL_PORT) | |
c.res['body'] = c.buffer.getvalue() | |
c.res['body_text'] = str(c.buffer.getvalue().decode(c.res['encoding'])) | |
c.res['body_size'] = len(c.buffer.getvalue().decode(c.res['encoding'])) | |
c.res['headers'] = c.headers | |
c.res['headers_text'] = c.headers_text | |
c.res['headers_size'] = len(c.headers_text) | |
#}}} | |
#{{{ def print_info | |
def print_info(self, c): | |
t, m = c.tags, c.measurements | |
header_format = ( ANSII['bold'] | |
, c.res['id'], c.res['method'], c.res['url'] | |
, c.res['local_ip'] or None, c.res['local_port'] or None, c.res['primary_ip'] or None, c.res['primary_port'] or None | |
, c.res['reuse'] | |
, ANSII['end'] | |
) | |
print('{}[{}] {} \'{}\' {}:{} -> {}:{} Reuse:{}{}'.format(*header_format)) | |
# Response Info | |
if m['code']['http_code'] == 200: | |
resp_color = ANSII['green'] | |
elif m['code']['http_code'] == 0: | |
resp_color = ANSII['red'] | |
else: | |
resp_color = ANSII['yellow'] | |
resp_format = ( resp_color | |
, m['code']['http_code'] | |
, int(m['size']['size_download']) , int(m['time']['total_time']*1000) | |
, int(m['range']['dns']*1000), int(m['range']['connect']*1000), int(m['range']['ssl']*1000), int(m['range']['server']*1000), int(m['range']['transfer']*1000) | |
, int(m['range']['redirect']*1000) | |
, c.res['content_type'] | |
, ANSII['end'] | |
) | |
print('{}>> {} <{}B> {}ms [DNS(+{}ms) TCP(+{}ms) SSL(+{}ms) TTFB(+{}ms) TTLB(+{}ms)] RED(+{}ms) {}{}'.format(*resp_format)) | |
# Headers | |
if c.output['headers']: | |
for k,v in c.headers.items(): | |
print('{lightgrey}{}{end}: {cyan}{}{end}'.format(k,v,**ANSII)) | |
# Error code and message | |
if m['code']['http_code'] == 0: | |
print('{}Error {}: {}{}'.format(ANSII['red'], t['errno'], t['errmsg'], ANSII['end'])) | |
if c.output['timeline']: | |
print(c.res['timeline']) | |
#}}} | |
#{{{ def getpoints | |
def getpoints(self, res): | |
ps = points = [] | |
for c in res: | |
for m, f in c.measurements.items(): | |
p = { 'measurement' : m | |
, 'tags' : c.tags | |
, 'fields' : f | |
#, 'time' : int(time.time()) | |
#, 'time' : strftime ("%Y-%m-%d %H:%M:%S")) | |
} | |
ps.append(p) | |
return ps | |
#}}} | |
if __name__ == '__main__': | |
# Params to send | |
params_dic = {'arg1':1,'arg2':2,'arg3':3} | |
params_str = str(params_dic) | |
params_enc = urlencode(params_dic) | |
# Create our CurlWrapper instance | |
cw = CurlWrapper() | |
# Set Some Globals | |
cw.output['influxdb'] = True | |
cw.output['console'] = True | |
cw.output['headers'] = True | |
cw.output['timeline'] = True | |
cw.output['verbose'] = False | |
cw.connect_timeout = 10 | |
cw.reuse = True | |
# Add some queries | |
cw.add({'url':'notfound'}) # DNS not found | |
cw.add({'url':'lol://what'}) # protocol not found | |
cw.add({'url':'https://expired.badssl.com'}) # Expired SSL Cert | |
cw.add({'url':'http://ip.jsontest.com'}) # Return JSON | |
cw.add({'url':'http://httpbin.org/get'}) # simple get | |
cw.add({'url':'http://httpbin.org/status/404'}) # 404 | |
cw.add({'url':'http://httpbin.org/status/500'}) # 500 | |
cw.add({'url':'http://httpbin.org/get?arg1=1&arg2=2&arg3=3'}) # get with inline args | |
cw.add({'url':'http://httpbin.org/get','params':params_enc}) # get with arg pairs | |
cw.add({'url':'http://httpbin.org/post','method':'POST','params':params_str,'headers':{'Content-type':'application/json'}}) # post json | |
cw.add({'url':'http://httpbin.org/post','method':'POST','params':params_enc}) # post form-urlencoded form data | |
cw.add({'url':'http://httpbin.org/get','output':{'timeline':False,'headers':False},'id':'concrete','ua':'IE6.0','reuse':False}) # Changing options for concrete query | |
cw.add({'url':'http://httpbin.org/get','tags':{'hostname':'httpbin_custom'},'id':'changing_tag'}) # Overwriting the hostname tag | |
# Run all queries at once | |
responses = cw.perform(max_conns=10) | |
# Get all the influxDB points | |
points = cw.getpoints(responses) | |
# Create a influxDBWrapper class | |
iw = influxDBWrapper() | |
# Send the points to influxDB | |
iw.push(points) | |
# Run a one-shot query | |
cw.fetch({'url':'http://ip.jsontest.com'}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment