Last active
May 17, 2017 10:50
-
-
Save cdahlqvist/76692b2274c062e9bb5864618e516272 to your computer and use it in GitHub Desktop.
Script to transform Metricbeat system metrics into multiple formats for benchmarking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import json | |
import sys | |
import md5 | |
import collections | |
import datetime | |
import re | |
epoch = datetime.datetime.utcfromtimestamp(0) | |
def flatten(d, parent_key='', sep='.'): | |
items = [] | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, collections.MutableMapping): | |
items.extend(flatten(v, new_key, sep=sep).items()) | |
else: | |
items.append((new_key, v)) | |
return dict(items) | |
def timestamp_to_ms(ts): | |
ts_utc = ts.replace('Z','UTC') | |
dt = datetime.datetime.strptime(ts_utc, '%Y-%m-%dT%H:%M:%S.%f%Z') | |
return int((dt - epoch).total_seconds() * 1000.0) | |
def is_not_number(s): | |
try: | |
float(s) | |
return False | |
except ValueError: | |
return True | |
regexp = re.compile(r'[ ,=]') | |
# Open output files | |
mbf = open('metricbeat_full.json', 'w') | |
mbm = open('metricbeat_metrics.json', 'w') | |
idf = open('influxdb_full.txt', 'w') | |
idm = open('influxdb_metrics.txt', 'w') | |
stats = {'total': {'docs': 0, 'metrics': 0}} | |
for line in sys.stdin: | |
rec = json.loads(line) | |
# Remove select tags | |
del rec['tags'] | |
del rec['metricset']['rtt'] | |
# Obfuscate select fields | |
if 'meta' in rec.keys(): | |
rec['meta']['cloud']['instance_id'] = md5.new(rec['meta']['cloud']['instance_id']).hexdigest() | |
meta = rec['meta'] | |
del rec['meta'] | |
else: | |
meta = {} | |
rec['beat']['name'] = md5.new(rec['beat']['name']).hexdigest() | |
rec['beat']['hostname'] = md5.new(rec['beat']['hostname']).hexdigest() | |
if 'process' in rec['system'].keys(): | |
rec['system']['process']['name'] = md5.new(rec['system']['process']['name']).hexdigest() | |
if 'cmdline' in rec['system']['process'].keys(): | |
rec['system']['process']['cmdline'] = md5.new(rec['system']['process']['cmdline']).hexdigest() | |
if 'network' in rec['system'].keys(): | |
if 'name' in rec['system']['network'].keys(): | |
rec['system']['network']['name'] = md5.new(rec['system']['network']['name']).hexdigest() | |
if 'filesystem' in rec['system'].keys(): | |
if 'device_name' in rec['system']['filesystem'].keys(): | |
rec['system']['filesystem']['device_name'] = md5.new(rec['system']['filesystem']['device_name']).hexdigest() | |
if 'mount_point' in rec['system']['filesystem'].keys(): | |
rec['system']['filesystem']['mount_point'] = md5.new(rec['system']['filesystem']['mount_point']).hexdigest() | |
# Process record | |
ts = rec['@timestamp'] | |
system = rec['system'] | |
# Write anonymized record before further processing | |
mbf.write("{}\n".format(json.dumps(rec))) | |
# Remove parts not to be flattened | |
del rec['@timestamp'] | |
del rec['system'] | |
# Extract file system mount point and device name from system | |
if 'filesystem' in system.keys(): | |
if 'mount_point' in system['filesystem'].keys(): | |
if 'system' not in rec.keys(): | |
rec['system'] = {} | |
rec['system']['filesystem'] = { 'mount_point': system['filesystem']['mount_point'], 'device_name': system['filesystem']['device_name'] } | |
del system['filesystem']['mount_point'] | |
del system['filesystem']['device_name'] | |
# Extract process data | |
if 'process' in system.keys(): | |
if 'system' not in rec.keys(): | |
rec['system'] = {} | |
rec['system']['process'] = { 'state': system['process']['state'], 'username': system['process']['username'], 'name': system['process']['name'], 'cpu': { 'start_time': system['process']['cpu']['start_time']}} | |
del system['process']['name'] | |
del system['process']['username'] | |
del system['process']['cpu']['start_time'] | |
del system['process']['state'] | |
if 'cmdline' in system['process'].keys(): | |
rec['system']['process']['cmdline'] = system['process']['cmdline'] | |
del system['process']['cmdline'] | |
if 'pid' in system['process'].keys(): | |
rec['system']['process']['pid'] = system['process']['pid'] | |
del system['process']['pid'] | |
if 'ppid' in system['process'].keys(): | |
rec['system']['process']['ppid'] = system['process']['ppid'] | |
del system['process']['ppid'] | |
if 'pgid' in system['process'].keys(): | |
rec['system']['process']['pgid'] = system['process']['pgid'] | |
del system['process']['pgid'] | |
# Extract diskio name | |
if 'diskio' in system.keys(): | |
if 'system' not in rec.keys(): | |
rec['system'] = {} | |
rec['system']['diskio'] = { 'name': system['diskio']['name'] } | |
del system['diskio']['name'] | |
if 'serial_number' in system['diskio'].keys(): | |
rec['system']['diskio']['serial_number'] = system['diskio']['serial_number'] | |
del system['diskio']['serial_number'] | |
# Extract network name | |
if 'network' in system.keys(): | |
if 'system' not in rec.keys(): | |
rec['system'] = {} | |
rec['system']['network'] = { 'name': system['network']['name'] } | |
del system['network']['name'] | |
# Flatten system and rec dicts | |
flat_rec = flatten(rec) | |
flat_system = flatten(system, 'system') | |
flat_points = flatten(system) | |
metricset = "{}.{}".format(rec['metricset']['module'], rec['metricset']['name']) | |
# Record stats | |
if rec['metricset']['name'] not in stats.keys(): | |
stats[rec['metricset']['name']] = {'docs': 0, 'metrics': 0} | |
stats[rec['metricset']['name']]['docs'] += 1 | |
stats[rec['metricset']['name']]['metrics'] += len(flat_system.keys()) | |
stats['total']['docs'] += 1 | |
stats['total']['metrics'] += len(flat_system.keys()) | |
# Create InfluxDB key-value list | |
kv_list = [] | |
for key in flat_rec.keys(): | |
if isinstance(flat_rec[key], basestring): | |
val = flat_rec[key].replace(',', ';').replace('=', ':') | |
kv_list.append("{}={}".format(key, val)) | |
else: | |
val = flat_rec[key] | |
kv_list.append("{}={}".format(key, val)) | |
idm_kvstring = ",".join(kv_list) | |
idm_ts = timestamp_to_ms(ts) | |
# Create poimnts list | |
points_list = [] | |
for key in flat_points.keys(): | |
points_list.append("{}={}".format(key, flat_points[key])) | |
if is_not_number(flat_points[key]): | |
print("VALUE ERROR ==> {} => {}".format(key, flat_points[key])) | |
idm_pointsstring = ",".join(points_list) | |
# Create and write full InfluxDB records | |
idfrec = "{},{} {} {}".format(metricset, idm_kvstring, idm_pointsstring, idm_ts) | |
# Write anonymized record before further processing | |
idf.write("{}\n".format(idfrec)) | |
# Write InfluxDB metrics records | |
for key in flat_system.keys(): | |
idm.write("{},{} value={} {}\n".format(key, idm_kvstring, flat_system[key], idm_ts)) | |
# Write metricbeat metrics records | |
rec['@timestamp'] = ts | |
for key in flat_system.keys(): | |
if isinstance(flat_system[key], float): | |
if 'long_value' in rec.keys(): | |
del rec['long_value'] | |
rec['metric'] = key | |
rec['float_value'] = flat_system[key] | |
else: | |
if 'float_value' in rec.keys(): | |
del rec['float_value'] | |
rec['metric'] = key | |
rec['long_value'] = flat_system[key] | |
mbm.write("{}\n".format(json.dumps(rec))) | |
print(json.dumps(stats)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment