Forked from linar-jether/simple_python_datasource.py
Last active
December 17, 2018 23:26
-
-
Save hamx0r/0a8ef11f8191dbc6718051ad2f625386 to your computer and use it in GitHub Desktop.
Grafana python 3 datasource - using pandas for timeseries and table data. inspired by and compatible with the simple json datasource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Uses `flask_restful` and creates a Blueprint to be used by a parent project (ie a larger API project where | |
`/grafana/` endpoints are used by Grafana's SimpleJson plugin) | |
""" | |
from flask import Flask, request, jsonify, json, abort, Blueprint | |
from flask_cors import CORS, cross_origin | |
import flask_restful | |
import pandas as pd | |
app = Flask(__name__) | |
cors = CORS(app) | |
app.config['CORS_HEADERS'] = 'Content-Type' | |
methods = ('GET', 'POST') | |
metric_finders= {} | |
metric_readers = {} | |
annotation_readers = {} | |
panel_readers = {} | |
def add_reader(name, reader): | |
metric_readers[name] = reader | |
def add_finder(name, finder): | |
metric_finders[name] = finder | |
def add_annotation_reader(name, reader): | |
annotation_readers[name] = reader | |
def add_panel_reader(name, reader): | |
panel_readers[name] = reader | |
@app.route('/', methods=methods) | |
@cross_origin() | |
def hello_world(): | |
print request.headers, request.get_json() | |
return 'Jether\'s python Grafana datasource, used for rendering HTML panels and timeseries data.' | |
@app.route('/search', methods=methods) | |
@cross_origin() | |
def find_metrics(): | |
print request.headers, request.get_json() | |
req = request.get_json() | |
target = req.get('target', '*') | |
if ':' in target: | |
finder, target = target.split(':', 1) | |
else: | |
finder = target | |
if not target or finder not in metric_finders: | |
metrics = [] | |
if target == '*': | |
metrics += metric_finders.keys() + metric_readers.keys() | |
else: | |
metrics.append(target) | |
return jsonify(metrics) | |
else: | |
return jsonify(list(metric_finders[finder](target))) | |
def dataframe_to_response(target, df, freq=None): | |
response = [] | |
if df.empty: | |
return response | |
if freq is not None: | |
orig_tz = df.index.tz | |
df = df.tz_convert('UTC').resample(rule=freq, label='right', closed='right', how='mean').tz_convert(orig_tz) | |
if isinstance(df, pd.Series): | |
response.append(_series_to_response(df, target)) | |
elif isinstance(df, pd.DataFrame): | |
for col in df: | |
response.append(_series_to_response(df[col], target)) | |
else: | |
abort(404, Exception('Received object is not a dataframe or series.')) | |
return response | |
def dataframe_to_json_table(target, df): | |
response = [] | |
if df.empty: | |
return response | |
if isinstance(df, pd.DataFrame): | |
response.append({'type': 'table', | |
'columns': df.columns.map(lambda col: {"text": col}).tolist(), | |
'rows': df.where(pd.notnull(df), None).values.tolist()}) | |
else: | |
abort(404, Exception('Received object is not a dataframe.')) | |
return response | |
def annotations_to_response(target, df): | |
response = [] | |
# Single series with DatetimeIndex and values as text | |
if isinstance(df, pd.Series): | |
for timestamp, value in df.iteritems(): | |
response.append({ | |
"annotation": target, # The original annotation sent from Grafana. | |
"time": timestamp.value // 10 ** 6, # Time since UNIX Epoch in milliseconds. (required) | |
"title": value, # The title for the annotation tooltip. (required) | |
#"tags": tags, # Tags for the annotation. (optional) | |
#"text": text # Text for the annotation. (optional) | |
}) | |
# Dataframe with annotation text/tags for each entry | |
elif isinstance(df, pd.DataFrame): | |
for timestamp, row in df.iterrows(): | |
annotation = { | |
"annotation": target, # The original annotation sent from Grafana. | |
"time": timestamp.value // 10 ** 6, # Time since UNIX Epoch in milliseconds. (required) | |
"title": row.get('title', ''), # The title for the annotation tooltip. (required) | |
} | |
if 'text' in row: | |
annotation['text'] = str(row.get('text')) | |
if 'tags' in row: | |
annotation['tags'] = str(row.get('tags')) | |
response.append(annotation) | |
else: | |
abort(404, Exception('Received object is not a dataframe or series.')) | |
return response | |
def _series_to_annotations(df, target): | |
if df.empty: | |
return {'target': '%s' % (target), | |
'datapoints': []} | |
sorted_df = df.dropna().sort_index() | |
timestamps = (sorted_df.index.astype(pd.np.int64) // 10 ** 6).values.tolist() | |
values = sorted_df.values.tolist() | |
return {'target': '%s' % (df.name), | |
'datapoints': zip(values, timestamps)} | |
def _series_to_response(df, target): | |
if df.empty: | |
return {'target': '%s' % (target), | |
'datapoints': []} | |
sorted_df = df.dropna().sort_index() | |
try: | |
timestamps = (sorted_df.index.astype(pd.np.int64) // 10 ** 6).values.tolist() # New pandas version | |
except: | |
timestamps = (sorted_df.index.astype(pd.np.int64) // 10 ** 6).tolist() | |
values = sorted_df.values.tolist() | |
return {'target': '%s' % (df.name), | |
'datapoints': zip(values, timestamps)} | |
@app.route('/query', methods=methods) | |
@cross_origin(max_age=600) | |
def query_metrics(): | |
print request.headers, request.get_json() | |
req = request.get_json() | |
results = [] | |
ts_range = {'$gt': pd.Timestamp(req['range']['from']).to_pydatetime(), | |
'$lte': pd.Timestamp(req['range']['to']).to_pydatetime()} | |
if 'intervalMs' in req: | |
freq = str(req.get('intervalMs')) + 'ms' | |
else: | |
freq = None | |
for target in req['targets']: | |
if ':' not in target.get('target', ''): | |
abort(404, Exception('Target must be of type: <finder>:<metric_query>, got instead: ' + target['target'])) | |
req_type = target.get('type', 'timeserie') | |
finder, target = target['target'].split(':', 1) | |
query_results = metric_readers[finder](target, ts_range) | |
if req_type == 'table': | |
results.extend(dataframe_to_json_table(target, query_results)) | |
else: | |
results.extend(dataframe_to_response(target, query_results, freq=freq)) | |
return jsonify(results) | |
@app.route('/annotations', methods=methods) | |
@cross_origin(max_age=600) | |
def query_annotations(): | |
print request.headers, request.get_json() | |
req = request.get_json() | |
results = [] | |
ts_range = {'$gt': pd.Timestamp(req['range']['from']).to_pydatetime(), | |
'$lte': pd.Timestamp(req['range']['to']).to_pydatetime()} | |
query = req['annotation']['query'] | |
if ':' not in query: | |
abort(404, Exception('Target must be of type: <finder>:<metric_query>, got instead: ' + query)) | |
finder, target = query.split(':', 1) | |
results.extend(annotations_to_response(query, annotation_readers[finder](target, ts_range))) | |
return jsonify(results) | |
@app.route('/panels', methods=methods) | |
@cross_origin() | |
def get_panel(): | |
print request.headers, request.get_json() | |
req = request.args | |
ts_range = {'$gt': pd.Timestamp(int(req['from']), unit='ms').to_pydatetime(), | |
'$lte': pd.Timestamp(int(req['to']), unit='ms').to_pydatetime()} | |
query = req['query'] | |
if ':' not in query: | |
abort(404, Exception('Target must be of type: <finder>:<metric_query>, got instead: ' + query)) | |
finder, target = query.split(':', 1) | |
return panel_readers[finder](target, ts_range) | |
if __name__ == '__main__': | |
# Sample annotation reader : add_annotation_reader('midnights', lambda query_string, ts_range: pd.Series(index=pd.date_range(ts_range['$gt'], ts_range['$lte'], freq='D', normalize=True)).fillna('Text for annotation - midnight')) | |
# Sample timeseries reader : | |
# def get_sine(freq, ts_range): | |
# freq = int(freq) | |
# ts = pd.date_range(ts_range['$gt'], ts_range['$lte'], freq='H') | |
# return pd.Series(np.sin(np.arange(len(ts)) * np.pi * freq * 2 / float(len(ts))), index=ts).to_frame('value') | |
# add_reader('sine_wave', get_sine) | |
# To query the wanted reader, use `<reader_name>:<query_string>`, e.g. 'sine_wave:24' | |
app.run(host='0.0.0.0', port=3003, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment