Created
July 28, 2015 13:49
-
-
Save snowleung/14f07ccaf6cb66eea23b to your computer and use it in GitHub Desktop.
nope
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding:utf-8 | |
''' | |
Aggregation of datapoints | |
~~~~~~~~~~~~~~~~~~~~~~~~~ | |
对数据点得值进行聚合运算, 包括【sum, avg, max, min】 | |
''' | |
import json | |
import unittest | |
import time | |
from datetime import datetime | |
from functools import partial | |
from pprint import pprint | |
Q = { | |
"queries": [ | |
{ | |
"sample_size": 14368, | |
"results": [ | |
{ | |
"name": "abc_123", | |
"tags": { | |
"host": [ | |
"server1" | |
], | |
"customer": [ | |
"bar" | |
] | |
}, | |
"values": [ | |
[ | |
1364968800000, | |
11019 | |
], | |
[ | |
1366351200000, | |
2843 | |
] | |
] | |
}, | |
{ | |
"name": "abcq_123", | |
"tags": { | |
"host": [ | |
"server1" | |
], | |
"customer": [ | |
"bar" | |
] | |
}, | |
"values": [ | |
[ | |
1364968800000, | |
2 | |
], | |
[ | |
1366352200000, | |
3 | |
] | |
] | |
} | |
] | |
} | |
] | |
} | |
def generate_tags_node(pk, uid, device_sn=None): | |
""" | |
:param pk: product_key, str | |
:param uid: user_id, str | |
:param device_sn: optional, device_sn | |
""" | |
tags_node = { | |
"product_key": [str(pk),], | |
"uid": [str(uid),] | |
} | |
if device_sn: | |
tags_node['device_sn'] = [str(device_sn),] | |
return tags_node | |
def generate_start_end_time(start_abs, end_abs=None): | |
""" | |
Return dict include start_absoult, end_absoult | |
:param start_abs: start_absoulte, The time in milliseconds. | |
:param end_abs: end_absolute The time in milliseconds. | |
""" | |
time_region = {'start_absoulte': int(start_abs)} | |
if end_abs: | |
time_region['end_absoulte'] = int(end_abs) | |
return time_region | |
def generate_agg_node(agg_name, unit, value=1): | |
""" | |
Return Kairdb's request aggregators node | |
"aggregators": [ | |
{ | |
"name": "avg", | |
"sampling": { | |
"value": 10, | |
"unit": "minutes" | |
} | |
} | |
:param agg_name: aggregators name,support sum,avg,max,min | |
:param unit: possible unit values are “milliseconds”, “seconds”, “minutes”, “hours”, “days”, “weeks”, “months”, and “years”. Sampling specified with | |
:param value: optional, Sampling specified with | |
""" | |
_agg = { | |
"name": agg_name, | |
"sampling": | |
{ | |
"value": value, | |
"unit": str(unit) | |
} | |
} | |
return _agg | |
def generate_metrics(names, tags_node, agg_node): | |
''' | |
Return Kairdb's request metrics node | |
:param names: list, datapoint name | |
:param tags_node: Kairdb's request tags node | |
:param agg_node: Kairdb's request agg_node node | |
''' | |
def _set_name(n): | |
metric = { | |
'name': n, | |
'tags': tags_node, | |
'aggregators': agg_node | |
} | |
return metric | |
return map(_set_name, names) | |
def convert_ts_to_dt(ts, unit='HOURS'): | |
''' | |
Return: | |
HOURS: "2015072010" | |
DAYS: "20150720" | |
WEEKS: "201529" | |
MONTHS: "201507" | |
:param ts: The time in milliseconds. | |
:param unit: str, default HOURS; include DAYS,WEEKS,MONTHS | |
''' | |
dt = datetime.fromtimestamp(ts / 1000) | |
if unit == 'HOURS': | |
res = dt.strftime('%Y%m%d%H') | |
elif unit == 'DAYS': | |
res = dt.strftime('%Y%m%d') | |
elif unit == 'MONTHS': | |
res = dt.strftime('%Y%m') | |
elif unit == 'WEEKS': | |
res = dt.strftime('%Y%W') | |
return res | |
def group_values(results, convert_to_str): | |
''' | |
Return | |
{ | |
'datetime': { | |
'attr1': value, | |
'attr2': value | |
} | |
} | |
:param results: kairsdb RESTful get data response. | |
:param convert_to_str: function, partial of convert_ts_to_dt | |
''' | |
def group_result(x, y): | |
dp_name = y['name'] # value in kairsdb return by 'name' | |
dp_agg_vals = y['values'] # value in kairsdb return by 'values' | |
for v in dp_agg_vals: | |
_datetime = convert_to_str(v[0]) | |
_value = v[1] | |
attrs = x.get(_datetime) | |
if attrs: | |
attrs[dp_name] = _value | |
else: | |
attrs = {} | |
attrs[dp_name] = _value | |
x[_datetime] = attrs | |
return x | |
return reduce(group_result, results, {}) | |
def query(product_key, uid, device_sn, start_ts, end_ts, attrs, aggregator, unit): | |
# todo : validate params | |
tags_node = generate_tags_node(product_key, uid, device_sn) | |
agg_node = generate_agg_node(aggregator, unit) | |
payload = {} | |
payload['metrics'] = generate_metrics(attrs, tags_node, agg_node) | |
st_et_node = generate_start_end_time(start_ts, end_ts) | |
payload.update(st_et_node) | |
def _query_kairdb(payload): | |
pprint(payload) | |
q = Q['queries'][0] | |
return q | |
results = _query_kairdb(payload) # todo: json.loads | |
convert_str = partial(convert_ts_to_dt, unit=unit) | |
group_result = group_values(results['results'], convert_str) | |
def generate_datas(param): | |
dt, attrs = param[0], param[1] | |
d = {} | |
d['datetime'] = dt | |
d['attrs'] = attrs | |
d['product_key'] = product_key | |
d['uid'] = uid | |
d['device_sn'] = device_sn | |
return d | |
datas = map(generate_datas, group_result.items()) | |
resp_content = {} | |
resp_content['data'] = datas | |
query = { | |
"start_ts": start_ts, | |
"end_ts": end_ts, | |
"attrs": attrs, | |
"aggregator": aggregator, | |
"unit": unit | |
} | |
resp_content['query'] = query | |
json.dumps(resp_content) | |
return resp_content | |
class TestCore(unittest.TestCase): | |
"""""" | |
def setUp(self): | |
self.q = { | |
"queries": [ | |
{ | |
"sample_size": 14368, | |
"results": [ | |
{ | |
"name": "abc_123", | |
"tags": { | |
"host": [ | |
"server1" | |
], | |
"customer": [ | |
"bar" | |
] | |
}, | |
"values": [ | |
[ | |
1364968800000, | |
11019 | |
], | |
[ | |
1366351200000, | |
2843 | |
] | |
] | |
}, | |
{ | |
"name": "abcq_123", | |
"tags": { | |
"host": [ | |
"server1" | |
], | |
"customer": [ | |
"bar" | |
] | |
}, | |
"values": [ | |
[ | |
1364968800000, | |
2 | |
], | |
[ | |
1366351200000, | |
3 | |
] | |
] | |
} | |
] | |
} | |
] | |
} | |
self.results = self.q['queries'][0]['results'] | |
def test_group_values(self): | |
unit = 'HOURS' | |
convert_str = partial(convert_ts_to_dt, unit=unit) | |
group_result = group_values(self.results, convert_str) | |
pprint(group_result) | |
res = {'2013040314': {'abc_123': 11019, 'abcq_123': 2}, | |
'2013041914': {'abc_123': 2843, 'abcq_123': 3}} | |
self.assertEqual(res, group_result) | |
def test_query(self): | |
ts = time.mktime(datetime.now().timetuple()) | |
ts = int(ts * 1000) | |
product_key = 'pk' | |
uid = 'uid' | |
device_sn = 'sn' | |
start_ts = ts | |
end_ts = ts + 10000 | |
attrs = ['dp1', 'dp2'] | |
aggregator = 'SUM' | |
unit = 'HOURS' | |
res = query( | |
product_key, uid, device_sn, start_ts, end_ts, attrs, aggregator, unit) | |
pprint(res) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment