Skip to content

Instantly share code, notes, and snippets.

@snowleung
Created July 28, 2015 13:49
Show Gist options
  • Save snowleung/10abbe7d22382d283489 to your computer and use it in GitHub Desktop.
Save snowleung/10abbe7d22382d283489 to your computer and use it in GitHub Desktop.
nope
# coding:utf-8
'''
Aggregation of datapoints
~~~~~~~~~~~~~~~~~~~~~~~~~
对数据点得值进行聚合运算, 包括【sum, avg, max, min】
'''
import json
import unittest
import time
from datetime import datetime
from functools import partial
from pprint import pprint
Q = {
"queries": [
{
"sample_size": 14368,
"results": [
{
"name": "abc_123",
"tags": {
"host": [
"server1"
],
"customer": [
"bar"
]
},
"values": [
[
1364968800000,
11019
],
[
1366351200000,
2843
]
]
},
{
"name": "abcq_123",
"tags": {
"host": [
"server1"
],
"customer": [
"bar"
]
},
"values": [
[
1364968800000,
2
],
[
1366352200000,
3
]
]
}
]
}
]
}
def generate_tags_node(pk, uid, device_sn=None):
"""
:param pk: product_key, str
:param uid: user_id, str
:param device_sn: optional, device_sn
"""
tags_node = {
"product_key": [str(pk),],
"uid": [str(uid),]
}
if device_sn:
tags_node['device_sn'] = [str(device_sn),]
return tags_node
def generate_start_end_time(start_abs, end_abs=None):
"""
Return dict include start_absoult, end_absoult
:param start_abs: start_absoulte, The time in milliseconds.
:param end_abs: end_absolute The time in milliseconds.
"""
time_region = {'start_absoulte': int(start_abs)}
if end_abs:
time_region['end_absoulte'] = int(end_abs)
return time_region
def generate_agg_node(agg_name, unit, value=1):
"""
Return Kairdb's request aggregators node
"aggregators": [
{
"name": "avg",
"sampling": {
"value": 10,
"unit": "minutes"
}
}
:param agg_name: aggregators name,support sum,avg,max,min
:param unit: possible unit values are “milliseconds”, “seconds”, “minutes”, “hours”, “days”, “weeks”, “months”, and “years”. Sampling specified with
:param value: optional, Sampling specified with
"""
_agg = {
"name": agg_name,
"sampling":
{
"value": value,
"unit": str(unit)
}
}
return _agg
def generate_metrics(names, tags_node, agg_node):
'''
Return Kairdb's request metrics node
:param names: list, datapoint name
:param tags_node: Kairdb's request tags node
:param agg_node: Kairdb's request agg_node node
'''
def _set_name(n):
metric = {
'name': n,
'tags': tags_node,
'aggregators': agg_node
}
return metric
return map(_set_name, names)
def convert_ts_to_dt(ts, unit='HOURS'):
'''
Return:
HOURS: "2015072010"
DAYS: "20150720"
WEEKS: "201529"
MONTHS: "201507"
:param ts: The time in milliseconds.
:param unit: str, default HOURS; include DAYS,WEEKS,MONTHS
'''
dt = datetime.fromtimestamp(ts / 1000)
if unit == 'HOURS':
res = dt.strftime('%Y%m%d%H')
elif unit == 'DAYS':
res = dt.strftime('%Y%m%d')
elif unit == 'MONTHS':
res = dt.strftime('%Y%m')
elif unit == 'WEEKS':
res = dt.strftime('%Y%W')
return res
def group_values(results, convert_to_str):
'''
Return
{
'datetime': {
'attr1': value,
'attr2': value
}
}
:param results: kairsdb RESTful get data response.
:param convert_to_str: function, partial of convert_ts_to_dt
'''
def group_result(x, y):
dp_name = y['name'] # value in kairsdb return by 'name'
dp_agg_vals = y['values'] # value in kairsdb return by 'values'
for v in dp_agg_vals:
_datetime = convert_to_str(v[0])
_value = v[1]
attrs = x.get(_datetime)
if attrs:
attrs[dp_name] = _value
else:
attrs = {}
attrs[dp_name] = _value
x[_datetime] = attrs
return x
return reduce(group_result, results, {})
def query(product_key, uid, device_sn, start_ts, end_ts, attrs, aggregator, unit):
# todo : validate params
tags_node = generate_tags_node(product_key, uid, device_sn)
agg_node = generate_agg_node(aggregator, unit)
payload = {}
payload['metrics'] = generate_metrics(attrs, tags_node, agg_node)
st_et_node = generate_start_end_time(start_ts, end_ts)
payload.update(st_et_node)
def _query_kairdb(payload):
pprint(payload)
q = Q['queries'][0]
return q
results = _query_kairdb(payload) # todo: json.loads
convert_str = partial(convert_ts_to_dt, unit=unit)
group_result = group_values(results['results'], convert_str)
def generate_datas(param):
dt, attrs = param[0], param[1]
d = {}
d['datetime'] = dt
d['attrs'] = attrs
d['product_key'] = product_key
d['uid'] = uid
d['device_sn'] = device_sn
return d
datas = map(generate_datas, group_result.items())
resp_content = {}
resp_content['data'] = datas
query = {
"start_ts": start_ts,
"end_ts": end_ts,
"attrs": attrs,
"aggregator": aggregator,
"unit": unit
}
resp_content['query'] = query
json.dumps(resp_content)
return resp_content
class TestCore(unittest.TestCase):
""""""
def setUp(self):
self.q = {
"queries": [
{
"sample_size": 14368,
"results": [
{
"name": "abc_123",
"tags": {
"host": [
"server1"
],
"customer": [
"bar"
]
},
"values": [
[
1364968800000,
11019
],
[
1366351200000,
2843
]
]
},
{
"name": "abcq_123",
"tags": {
"host": [
"server1"
],
"customer": [
"bar"
]
},
"values": [
[
1364968800000,
2
],
[
1366351200000,
3
]
]
}
]
}
]
}
self.results = self.q['queries'][0]['results']
def test_group_values(self):
unit = 'HOURS'
convert_str = partial(convert_ts_to_dt, unit=unit)
group_result = group_values(self.results, convert_str)
pprint(group_result)
res = {'2013040314': {'abc_123': 11019, 'abcq_123': 2},
'2013041914': {'abc_123': 2843, 'abcq_123': 3}}
self.assertEqual(res, group_result)
def test_query(self):
ts = time.mktime(datetime.now().timetuple())
ts = int(ts * 1000)
product_key = 'pk'
uid = 'uid'
device_sn = 'sn'
start_ts = ts
end_ts = ts + 10000
attrs = ['dp1', 'dp2']
aggregator = 'SUM'
unit = 'HOURS'
res = query(
product_key, uid, device_sn, start_ts, end_ts, attrs, aggregator, unit)
pprint(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment