Last active
October 20, 2020 19:15
-
-
Save AlmightyOatmeal/c26eb3139d6a483fa71ec23a7cb8a0e3 to your computer and use it in GitHub Desktop.
Wrapper for SignalFx's Python client library for executing SignalFlow programs and returning a dictionary object that can be serialized to JSON. This also features exception handling to retry program text execution which is especially helpful for long-running SignalFlow programs that should be restarted in the event of a failure.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Disclaimer | |
========== | |
This has been designed as a solution for a specific task, or set of tasks, by the code author which is outside | |
the locus of SignalFx supportability. This code is provided as-is and if any support is needed then it should | |
be provided by the original code author on a best-effort only basis; it cannot be supported through break/fix | |
support channels. | |
Synopsis | |
======== | |
# | |
# SignalFlow execute - large result body | |
# | |
pt_parts = [ | |
"HOSTS = filter('host', 'irony')", | |
"data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x).publish('Process Count')", | |
] | |
pt = '\n'.join(pt_parts) | |
now_utc = datetime.datetime.utcnow() | |
now_utc = now_utc.replace(second=0) | |
pt_start = now_utc - datetime.timedelta(days=1) | |
pt_start_ms = convert_dt_to_milliseconds(pt_start) | |
pt_stop = now_utc - datetime.timedelta(minutes=10) | |
pt_stop_ms = convert_dt_to_milliseconds(pt_stop) | |
results = get_signalflow_results( | |
program=pt, | |
start=pt_start_ms, | |
stop=pt_stop_ms, | |
resolution=1000, | |
api_token=api_token, | |
immediate=True | |
) | |
# | |
# SignalFlow execute - generator | |
# | |
pt_parts = [ | |
"HOSTS = filter('host', 'irony')", | |
"data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x).publish('Process Count')", | |
] | |
pt = '\n'.join(pt_parts) | |
now_utc = datetime.datetime.utcnow() | |
now_utc = now_utc.replace(second=0) | |
pt_start = now_utc - datetime.timedelta(days=1) | |
pt_start_ms = convert_dt_to_milliseconds(pt_start) | |
pt_stop = now_utc - datetime.timedelta(minutes=10) | |
pt_stop_ms = convert_dt_to_milliseconds(pt_stop) | |
results = get_signalflow_results( | |
program=pt, | |
start=pt_start_ms, | |
stop=pt_stop_ms, | |
resolution=1000, | |
api_token=api_token, | |
generator=True, | |
immediate=True | |
) | |
for result in results: | |
do_something(result) | |
# | |
# SignalFlow execute - callback | |
# | |
pt_parts = [ | |
"HOSTS = filter('host', 'irony')", | |
"data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x).publish('Process Count')", | |
] | |
pt = '\n'.join(pt_parts) | |
now_utc = datetime.datetime.utcnow() | |
now_utc = now_utc.replace(second=0) | |
pt_start = now_utc - datetime.timedelta(days=1) | |
pt_start_ms = convert_dt_to_milliseconds(pt_start) | |
pt_stop = now_utc - datetime.timedelta(minutes=10) | |
pt_stop_ms = convert_dt_to_milliseconds(pt_stop) | |
results = get_signalflow_results( | |
program=pt, | |
start=pt_start_ms, | |
stop=pt_stop_ms, | |
resolution=1000, | |
api_token=api_token, | |
generator=True, | |
callback=do_something_with_data, | |
immediate=True | |
) | |
# | |
# SignalFlow preflight | |
# | |
pt_parts = [ | |
"HOSTS = filter('host', 'irony')", | |
"d = data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x)", | |
"detect(when(d < 1, lasting='30s')).publish('Service Not Running')", | |
"detect(when(d > 500, lasting='30s')).publish('Process has too many children')" | |
] | |
pt = '\n'.join(pt_parts) | |
results = get_signalflow_results( | |
program=pt, | |
start=1522950265000, | |
stop=1522950736000, | |
max_delay=60000, | |
api_token=api_token, | |
preflight=True | |
) | |
# | |
# Using SignalFlow execute to get more information than preflight | |
# | |
pt_parts = [ | |
"HOSTS = filter('host', 'irony')", | |
"d = data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x)", | |
"detect(when(d < 1, lasting='30s')).publish('Service Not Running')", | |
"detect(when(d > 500, lasting='30s')).publish('Process has too many children')" | |
] | |
pt = '\n'.join(pt_parts) | |
results = get_signalflow_results( | |
program=pt, | |
start=1522950265000, | |
stop=1522950736000, | |
max_delay=60000, | |
api_token=api_token | |
) | |
# | |
# Using SignalFlow execute to get more information than preflight but get *ONLY* events | |
# | |
pt_parts = [ | |
"HOSTS = filter('host', 'irony')", | |
"d = data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x)", | |
"detect(when(d < 1, lasting='30s')).publish('Service Not Running')", | |
"detect(when(d > 500, lasting='30s')).publish('Process has too many children')" | |
] | |
pt = '\n'.join(pt_parts) | |
results = get_signalflow_results( | |
program=pt, | |
start=1522950265000, | |
stop=1522950736000, | |
max_delay=60000, | |
api_token=api_token, | |
events_only=True | |
) | |
Abstract | |
======== | |
Demonstrating executing a SignalFlow program in Python and converting the results into a Python dictionary/list | |
structure that can be iterated, processed, otherwise interacted-with, or serialized to JSON. | |
Features: | |
* Supports passing additional keyword arguments directly into the SignalFlow execute/preflight methods (as long | |
as the keyword arguments are supported by their respective method). | |
* Retry program execution in the event of a failure which can be advantageous when using a dynamic start time | |
and no end time for continuous streaming. | |
* Simple parameter to switch between generating a large result object (useful for short time ranges) or a | |
generator so results can be iterated over as they become available. | |
* Callback functionality to call an outside function and pass in the resulting dictionary object. | |
License | |
======= | |
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated | |
documentation files (the "Software"), to deal in the Software without restriction, including without | |
limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the | |
Software, and to permit persons to whom the Software is furnished to do so, subject to the following | |
conditions: | |
The above copyright notice and this permission notice shall be included in all copies or substantial portions | |
of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED | |
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT | |
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN | |
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR | |
OTHER DEALINGS IN THE SOFTWARE. | |
:author: Jamie Ivanov <jamie <-at-> signalfx.com> | |
:organization: SignalFx <https://www.signalfx.com> | |
:contact: <jamie <-at-> signalfx.com> | |
:version: 1.0.0 | |
:copyright: Copyright (c): Jamie Ivanov and contributors | |
""" | |
import datetime | |
from copy import deepcopy | |
import signalfx | |
import time | |
# OPTIONAL HELPER | |
def convert_dt_to_milliseconds(dt_obj): | |
"""Convert datetime object to a Unix epoch timestamp in milliseconds. | |
:param dt_obj: Datetime object to be converted. | |
:type dt_obj: instance | |
:return: Milliseconds since the Unix epoch. | |
:rtype: int or long | |
""" | |
return int((dt_obj - datetime.datetime(1970, 1, 1)).total_seconds() * 1000) | |
def get_signalflow_results(program, start, stop=None, realm=None, resolution=None, max_execution_retries=50, | |
api_token=None, preflight=False, generator=False, data_callback=None, event_callback=None, | |
events_only=False, **kwargs): | |
"""Use the SignalFlow Python client library to execute/preflight SignalFlow programs. | |
Datapoint: | |
.. example: json | |
{ | |
"1509797400000": { | |
"AAAAAA-RANQ": { | |
"@value": 50.0, | |
"@job_id": "AAbbCCddEE" | |
"_properties": { | |
"computationId": "ZYXWVUTSRQ", | |
"orgId": "ABCDEFGHIJK", | |
"sf_isPreQuantized": true, | |
"sf_key": [ | |
"sf_originatingMetric", | |
"orgId", | |
"sf_metric", | |
"computationId" | |
], | |
"sf_metric": "_SF_COMP_DN3vE1QAcAA_02-PUBLISH_METRIC", | |
"sf_organizationID": "ABCDEFGHIJK", | |
"sf_originatingMetric": "sf.org.numActiveTimeSeries", | |
"sf_resolutionMs": 600000, | |
"sf_singletonFixedDimensions": [ | |
"sf_metric" | |
], | |
"sf_type": "MetricTimeSeries" | |
}, | |
"_tsid": "AAAAAA-RANQ" | |
} | |
} | |
} | |
Event: | |
..example: json | |
{ | |
"1522950690000": { | |
"AAAAAHMugz0": { | |
"@event": { | |
"@job_id": "DaC_KG2AgAA", | |
"_metadata": {}, | |
"_properties": { | |
"incidentId": "DZ8uoHhAgA8", | |
"inputs": { | |
"_S1": { | |
"key": { | |
"host": "irony", | |
"plugin": "processes", | |
"plugin_instance": "transmission-daemon", | |
"sf_metric": "ps_count.processes" | |
}, | |
"value": 1.0 | |
} | |
}, | |
"is": "ok", | |
"sf_resolutionMs": 10000, | |
"sf_schema": 3, | |
"sf_severity": "unknown", | |
"was": "anomalous" | |
}, | |
"_timestamp_ms": 1522950690000, | |
"_tsid": "AAAAAHMugz0" | |
} | |
} | |
} | |
} | |
Preflight: | |
..example: json | |
{ | |
"1522950330000": { | |
"AAAAAKoEhLo": { | |
"@job_id": "DaChf_EAYAA", | |
"@value": 1, | |
"_properties": { | |
"computationId": "DaChf_EAYAA", | |
"is": "anomalous", | |
"sf_isPreQuantized": true, | |
"sf_key": [ | |
"sf_severity", | |
"is", | |
"sf_metric", | |
"computationId" | |
], | |
"sf_metric": "_SF_COMP_DaChf_EAYAA_09-PUBLISH_METRIC", | |
"sf_organizationID": "ABCDEFGHIJK", | |
"sf_resolutionMs": 10000, | |
"sf_severity": "unknown", | |
"sf_singletonFixedDimensions": [], | |
"sf_streamLabel": "Service Not Running", | |
"sf_type": "MetricTimeSeries" | |
}, | |
"_tsid": "AAAAAKoEhLo" | |
} | |
} | |
} | |
Expected object structure including optional "@event"; events without datapoints will have the same structure | |
but only include the "@event" object data and not "@value" and etc for the datapoint timeseries. | |
.. example: json | |
{ | |
"<TIMESTAMP IN MILLISECONDS>": { | |
"<TIMESERIES ID>": { | |
"@value": "<DATAPOINT VALUE>", | |
"@job_id": "<SignalFlow job ID>", | |
"@event": { | |
"@job_id": "<SignalFlow job ID>", | |
"_metadata": "<EVENT METADATA>", | |
"_properties": "<EVENT PROPERTIES INCLUDING INPUT TIMESERIES WITH DIMENSIONS, STATUS, ETC>", | |
"_timestamp_ms": "<TIMESTAMP IN MILLISECONDS>", | |
"_tsid": "<TIMESERIES ID (redundant but it is part of the metadata so it was kept)>" | |
}, | |
"_properties": "<DATAPOINT PROPERTIES, DIMENSIONS, ETC>", | |
"_tsid": "<TIMESERIES ID (redundant but it is part of the metadata so it was kept)>", | |
} | |
} | |
} | |
:param program: Program text to be executed. | |
:type program: str or unicode | |
:param start: SignalFlow start time in milliseconds. | |
:param start: int or long | |
:param stop: (optional) SignalFlow start time in milliseconds. | |
(default: None) | |
:type stop: int or long | |
:param realm: (optional) Which SignalFx realm to execute against. If the wrong realm is used then execution | |
will fail with an unauthorized response. As of October 2019, the available realms are: us0, us1, | |
us2, eu0, and ap0. (default: 'us0') | |
:type realm: str or unicode | |
:param resolution: (optional) Data resolution. (default: None) | |
:type resolution: int, long, str, or unicode | |
:param max_execution_retries: (optional) Number of times to retry incase of an exception with a 1 minute grace | |
period before the next attempt. | |
(default: 50) | |
:type max_execution_retries: int | |
:param api_token: SignalFx API token to be used when executing the SignalFlow program. | |
:type api_token: str or unicode | |
:param preflight: (optional) When true, execute the SignalFlow program via the preflight API otherwise use the | |
execute API. | |
(default: False) | |
:type preflight: bool | |
:param generator: (optional) When true, returns a generator instance to yield individual result objects rather than | |
one large results object. This is especially useful for low-memory environments, where each | |
result needs to be acted upon, or when continuously streaming program text. When false, generates | |
a large result object. | |
(default: False) | |
:type generator: bool | |
:param data_callback: (optional) If specified, this function will be called for every DataMessage result and | |
said result will be passed into it. | |
(default: None) | |
:type data_callback: function | |
:param event_callback: (optional) If specified, this function will be called for every EventMessage result and | |
said result will be passed into it. | |
(default: None) | |
:type event_callback: function | |
:param events_only: (optional) Return only EventMessage results when using the SignalFlow execute API in the same | |
fashion as the preflight API. This will not apply to the preflight API. | |
(default: False) | |
:type events_only: bool | |
:param kwargs: Miscellaneous keyword arguments to be used in :meth:`signalfx.SignalFx.signalflow().execute()`. | |
:type kwargs: keyword arguments | |
:return: Dictionary of results or empty dictionary if execution retries exceeded. | |
:rtype: dict | |
""" | |
# TODO: Cleanup kwargs that are passed into the SignalFlow executioner for the sake of good data hygiene. | |
if api_token is None: | |
raise ValueError('Missing SignalFx API token! Please consult the documentation!') | |
# Setting the default to "us0" here which can be more program-friendly. | |
if not realm: | |
realm = 'us0' | |
# Switch stream URL in order to accommodate the appropriate organization realm. | |
endpoint_url = kwargs.pop('endpoint_url', None) | |
if not endpoint_url: | |
endpoint_url = 'wss://stream.{}.signalfx.com'.format(realm.lower()) | |
if not endpoint_url.startswith('wss://'): | |
endpoint_url = 'wss://{}'.format(endpoint_url) | |
results = {} | |
argz = { | |
'program': program, | |
'start': start, | |
'stop': stop, | |
'resolution': resolution | |
} | |
argz.update(**kwargs) | |
logger.debug('endpoint_url={}'.format(endpoint_url)) | |
# Executioner selection | |
if preflight: | |
executioner = signalfx.SignalFx().signalflow(endpoint=endpoint_url, token=api_token).preflight(**argz) | |
else: | |
executioner = signalfx.SignalFx().signalflow(endpoint=endpoint_url, token=api_token).execute(**argz) | |
def execute_signalflow(): | |
meta_objs = [] | |
metadata = {} | |
try_counter = 0 | |
while True: | |
job_id = None | |
if metadata: | |
metadata.clear() | |
if try_counter >= max_execution_retries: | |
return | |
try: | |
logger.debug('Waiting for events from executioner stream.') | |
for msg in executioner.stream(): | |
# logger.info('Received "{}" object \t {}'.format(type(msg), msg.__dict__)) | |
if isinstance(msg, signalfx.signalflow.messages.JobStartMessage): | |
job_id = msg.__dict__.get('_handle') | |
logger.info('SignalFlow job: {}'.format(job_id)) | |
continue | |
# [ ] TODO: Add option to remove metadata for timeseries that leave the membership. | |
if isinstance(msg, signalfx.signalflow.messages.MetadataMessage): | |
md = msg.__dict__ | |
# logger.info('Received "{}" object \t {}'.format(type(msg), md)) | |
metadata[md.get('_tsid')] = {k: md[k] for k in md.keys() if not k.startswith('__')} | |
meta_objs.append(md) | |
continue | |
if isinstance(msg, signalfx.signalflow.messages.DataMessage): | |
if not events_only and not preflight: | |
result = {} | |
for tsid, val in msg.data.items(): | |
result[tsid] = deepcopy(metadata.get(tsid, {})) | |
result[tsid]['@value'] = val | |
result[tsid]['@job_id'] = job_id | |
# logger.info('GOT {} RESULTS!'.format(len(result))) | |
# logger.info('result:\n{}'.format(pretty_json(result))) | |
# logger.info('result:\n{}'.format(pretty_json(msg.data))) | |
# logger.info('result:\n{}'.format(msg.data)) | |
logger.info('DataMessage: {} datapoints.'.format(len(msg.data))) | |
dispatchable_result = {msg.logical_timestamp_ms: result} | |
if data_callback is not None: | |
data_callback(dispatchable_result) | |
yield dispatchable_result | |
continue | |
if isinstance(msg, signalfx.signalflow.messages.EventMessage): | |
result = { | |
msg._tsid: { | |
'@event': deepcopy(msg.__dict__) | |
} | |
} | |
result[msg._tsid]['@event']['@job_id'] = job_id | |
dispatchable_result = {msg._timestamp_ms: result} | |
logger.info('EVENT FOUND:\n{}'.format(pretty_json(result))) | |
if event_callback is not None: | |
event_callback(dispatchable_result) | |
yield dispatchable_result | |
continue | |
if isinstance(msg, signalfx.signalflow.messages.JobProgressMessage): | |
continue | |
# *OPTIONAL* | |
# This is used for exploring objects not yet handled above. | |
logger.info('Received "{}" object.'.format(type(msg))) | |
logger.info(dir(msg)) | |
logger.info(msg.__dict__) | |
logger.info(pretty_json(msg.__dict__)) | |
# *OPTIONAL* | |
# Dump metadata for your viewing pleasure! | |
with open('signalflow_metadata.json', 'w') as f: | |
f.write(pretty_json(metadata)) | |
with open('signalflow_metadata_raw-ish_objects.json', 'w') as f: | |
f.write(pretty_json(meta_objs)) | |
return | |
except Exception as err: | |
# NOTE: | |
# If Python logging is not used, you can turn `logger.*()` into `print()` or handle setup additional | |
# exception handling here. | |
try: | |
logger.error('HANDLING EXCEPTION; RETRY IN 60 SECONDS') | |
logger.exception(err) | |
except Exception as log_err: | |
print('HANDLING EXCEPTION; RETRY IN 60 SECONDS') | |
print(str(log_err)) | |
try_counter += 1 | |
time.sleep(60) | |
if generator: | |
return execute_signalflow() | |
for item in execute_signalflow(): | |
results.update(item) | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment