Last active
November 15, 2015 23:38
-
-
Save kunickiaj/8157aa0a30ed4ffce26f to your computer and use it in GitHub Desktop.
Python script to convert collectd records to influxdb format
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Sample Jython code | |
# | |
# Available Objects: | |
# | |
# records: an array of records to process, depending on Jython processor | |
# processing mode it may have 1 record or all the records in the batch. | |
# | |
# state: a dict that is preserved between invocations of this script. | |
# Useful for caching bits of data e.g. counters. | |
# | |
# log.<loglevel>(msg, obj...): use instead of print to send log messages to the log4j log instead of stdout. | |
# loglevel is any log4j level: e.g. info, error, warn, trace. | |
# | |
# out.write(record): writes a record to processor output | |
# | |
# err.write(record, message): sends a record to error | |
# | |
# Add additional module search paths: | |
#import sys | |
#sys.path.append('/some/other/dir/to/search') | |
base_fields = ['plugin', 'time', 'time_hires'] | |
tag_fields = ['host', 'type', 'plugin_instance', 'type_instance'] | |
for record in records: | |
try: | |
# compute hash of field names | |
record_hash = str(hash('|'.join(record.value.keys()))) | |
#log.info('hash : %s' % record_hash) | |
if record_hash in state: | |
cached = state[record_hash] | |
value_names = cached['value_names'] | |
tags = cached['tags'] | |
timestamp_field = cached['timestamp_field'] | |
else: | |
value_names = filter(lambda f: f not in tag_fields and f not in base_fields, record.value.keys()) | |
tags = [record.value[tag] for tag in tag_fields if tag in record.value] | |
if 'time_hires' in record.value: | |
timestamp_field = 'time_hires' | |
else: | |
timestamp_field = 'time' | |
state[record_hash] = { | |
"value_names": value_names, | |
"tags": tags, | |
"timestamp_field": timestamp_field, | |
} | |
#log.info('Value Names: %s' % str(value_names)) | |
#log.info('Tag Names: %s' % str(tags)) | |
new_records = [] | |
for value_name in value_names: | |
new_records.append( | |
{ | |
"measurement": '_'.join([record.value['plugin'], value_name]), | |
"tags": tags, | |
"value": record.value[value_name], | |
"timestamp": record.value[timestamp_field], | |
} | |
) | |
# Write record to procesor output | |
for new_record in new_records: | |
record.value = new_record | |
out.write(record) | |
except Exception as e: | |
# Send record to error | |
err.write(record, str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment