Skip to content

Instantly share code, notes, and snippets.

@MauricioRoman
Created October 24, 2015 19:39
Show Gist options
  • Save MauricioRoman/a1a17791a0423e1f3a5a to your computer and use it in GitHub Desktop.
Save MauricioRoman/a1a17791a0423e1f3a5a to your computer and use it in GitHub Desktop.
def get_load_avg():
load_avg = list (os.getloadavg())
return {#"1m": load_avg[0],
"5m": load_avg[1],
"15m":load_avg[2],
}
def get_disk_usage():
disk_usage = {}
for part in psutil.disk_partitions(all=False):
if os.name == 'nt':
if 'cdrom' in part.opts or part.fstype == '':
# skip cd-rom drives with no disk in it; they may raise
# ENOENT, pop-up a Windows GUI error for a non-ready
# partition or just hang.
continue
usage = psutil.disk_usage(part.mountpoint)
disk_usage.update({part.mountpoint:usage.percent})
return disk_usage
def get_disk_io():
disk_io_cum = {}
for k,v in psutil.disk_io_counters(perdisk=True).iteritems():
disk_io_cum.update({k:{"read_time_sec":v.read_time,"write_time_sec":v.write_time}})
return disk_io_cum
def get_process_metrics():
return {"process":{ "load_avg":get_load_avg(),
"cpu_perc":psutil.cpu_percent(),
"disk_usage":get_disk_usage(),
#"disk_io_cum":get_disk_io()
}}
def build_consolidator_log(actual_timestamp, percent_distinct, percent_compression,
step_in_sec, topic):
msg = {"timestamp":actual_timestamp,
"infra":{"topic":topic},
"consolidator":{
"distinct_ips_perc":percent_distinct,
"compression_perc":percent_compression,
"step_in_sec":step_in_sec,
"distinct_ip_perc_to_time_step":
np.round(percent_distinct / float(step_in_sec),2),
}
}
return msg
def build_process_log(timestamp, processed_timestamp, delay_min, processing_time_ms, topic, volume):
msg = {"timestamp":timestamp,
"infra":{"topic":topic},
"process":{
"processed_timestamp":processed_timestamp,
"delay_min":delay_min,
"processing_time_ms":processing_time_ms,
"volume":volume
}
}
msg.update(get_process_metrics())
return msg
def send_to_skyline(series, metric_set):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Datapoints are {metric:[ [timestamp,value],[timestamp,value]...]}
for record in series:
metric = "%s.%s" % (metric_set, record['metric'])
datapoint = [record['timestamp'],record['value']]
packet = msgpack.packb((metric, datapoint))
sock.sendto(packet, (socket.gethostname(), SKYLINE_UDP_PORT))
return
def flatten(d, parent_key='', sep='.'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key).items())
else:
items.append((new_key, [v]))
return dict(items)
def log_to_hash(msg):
df = (pd.DataFrame
.from_dict(flatten(msg), orient='columns'))
df.drop(['timestamp'], inplace=True, axis=1)
df = (df
.select_dtypes(include = ['object'])
.T
.reset_index())
s = ".".join( dict(df.values).values() )
s_hashed = abs(mmh3.hash(s))
log.info("Mapping %s to %d" % (s, s_hashed))
return s_hashed
def log_to_series(msg, timestamp):
"""
Transforms a single log event into a series of datapoints
"""
df = (pd.DataFrame
.from_dict(flatten(msg), orient='columns')
.select_dtypes(exclude = ['object','datetime64'])
.T
.reset_index())
df.columns = ['metric','value']
df['timestamp'] = timestamp
return df.to_dict(orient='records')
def date_to_unix(timestamp):
return int ((timestamp - datetime.datetime(1970,1,1)).total_seconds())
def log_via_anomaly_detector(msg_list, actual_time, tag):
"""
We hash the categorical features and extract the metrics
using the hash to identify each metric
as well as to identify the log event which generated it
"""
logs_to_send = []
for msg in msg_list:
try:
series = log_to_series(msg,date_to_unix(actual_time))
metric_hash = log_to_hash(msg)
send_to_skyline(series, "id.%d"%metric_hash)
msg.update({"id":metric_hash})
logs_to_send.append(msg)
except Exception as e:
log.warn("Exception: %s" %format_exc(e))
status = send_log_bulk_data(Loggly_Token, logs_to_send, False, tag)
return status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment