Created
October 24, 2015 19:39
-
-
Save MauricioRoman/a1a17791a0423e1f3a5a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_load_avg(): | |
load_avg = list (os.getloadavg()) | |
return {#"1m": load_avg[0], | |
"5m": load_avg[1], | |
"15m":load_avg[2], | |
} | |
def get_disk_usage(): | |
disk_usage = {} | |
for part in psutil.disk_partitions(all=False): | |
if os.name == 'nt': | |
if 'cdrom' in part.opts or part.fstype == '': | |
# skip cd-rom drives with no disk in it; they may raise | |
# ENOENT, pop-up a Windows GUI error for a non-ready | |
# partition or just hang. | |
continue | |
usage = psutil.disk_usage(part.mountpoint) | |
disk_usage.update({part.mountpoint:usage.percent}) | |
return disk_usage | |
def get_disk_io(): | |
disk_io_cum = {} | |
for k,v in psutil.disk_io_counters(perdisk=True).iteritems(): | |
disk_io_cum.update({k:{"read_time_sec":v.read_time,"write_time_sec":v.write_time}}) | |
return disk_io_cum | |
def get_process_metrics(): | |
return {"process":{ "load_avg":get_load_avg(), | |
"cpu_perc":psutil.cpu_percent(), | |
"disk_usage":get_disk_usage(), | |
#"disk_io_cum":get_disk_io() | |
}} | |
def build_consolidator_log(actual_timestamp, percent_distinct, percent_compression, | |
step_in_sec, topic): | |
msg = {"timestamp":actual_timestamp, | |
"infra":{"topic":topic}, | |
"consolidator":{ | |
"distinct_ips_perc":percent_distinct, | |
"compression_perc":percent_compression, | |
"step_in_sec":step_in_sec, | |
"distinct_ip_perc_to_time_step": | |
np.round(percent_distinct / float(step_in_sec),2), | |
} | |
} | |
return msg | |
def build_process_log(timestamp, processed_timestamp, delay_min, processing_time_ms, topic, volume): | |
msg = {"timestamp":timestamp, | |
"infra":{"topic":topic}, | |
"process":{ | |
"processed_timestamp":processed_timestamp, | |
"delay_min":delay_min, | |
"processing_time_ms":processing_time_ms, | |
"volume":volume | |
} | |
} | |
msg.update(get_process_metrics()) | |
return msg | |
def send_to_skyline(series, metric_set): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# Datapoints are {metric:[ [timestamp,value],[timestamp,value]...]} | |
for record in series: | |
metric = "%s.%s" % (metric_set, record['metric']) | |
datapoint = [record['timestamp'],record['value']] | |
packet = msgpack.packb((metric, datapoint)) | |
sock.sendto(packet, (socket.gethostname(), SKYLINE_UDP_PORT)) | |
return | |
def flatten(d, parent_key='', sep='.'): | |
items = [] | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, collections.MutableMapping): | |
items.extend(flatten(v, new_key).items()) | |
else: | |
items.append((new_key, [v])) | |
return dict(items) | |
def log_to_hash(msg): | |
df = (pd.DataFrame | |
.from_dict(flatten(msg), orient='columns')) | |
df.drop(['timestamp'], inplace=True, axis=1) | |
df = (df | |
.select_dtypes(include = ['object']) | |
.T | |
.reset_index()) | |
s = ".".join( dict(df.values).values() ) | |
s_hashed = abs(mmh3.hash(s)) | |
log.info("Mapping %s to %d" % (s, s_hashed)) | |
return s_hashed | |
def log_to_series(msg, timestamp): | |
""" | |
Transforms a single log event into a series of datapoints | |
""" | |
df = (pd.DataFrame | |
.from_dict(flatten(msg), orient='columns') | |
.select_dtypes(exclude = ['object','datetime64']) | |
.T | |
.reset_index()) | |
df.columns = ['metric','value'] | |
df['timestamp'] = timestamp | |
return df.to_dict(orient='records') | |
def date_to_unix(timestamp): | |
return int ((timestamp - datetime.datetime(1970,1,1)).total_seconds()) | |
def log_via_anomaly_detector(msg_list, actual_time, tag): | |
""" | |
We hash the categorical features and extract the metrics | |
using the hash to identify each metric | |
as well as to identify the log event which generated it | |
""" | |
logs_to_send = [] | |
for msg in msg_list: | |
try: | |
series = log_to_series(msg,date_to_unix(actual_time)) | |
metric_hash = log_to_hash(msg) | |
send_to_skyline(series, "id.%d"%metric_hash) | |
msg.update({"id":metric_hash}) | |
logs_to_send.append(msg) | |
except Exception as e: | |
log.warn("Exception: %s" %format_exc(e)) | |
status = send_log_bulk_data(Loggly_Token, logs_to_send, False, tag) | |
return status | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment