Created
May 28, 2018 13:44
-
-
Save marcelrf/2d5c33a27c5bb170260b7d71aac6c5bb to your computer and use it in GitHub Desktop.
shepherd.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
from collections import defaultdict | |
import argparse | |
import itertools | |
import random | |
import sys | |
BOOTSTRAP_SAMPLES = 100 | |
FACTOR_DECAY = 2 | |
LENGTH_FACTOR_WEIGHT = 0.2 | |
MAX_DIVERGENCE = 100 | |
MIN_DATA_SIZE = 10 | |
PERCENTS = [0.25, 0.5, 0.75] | |
def get_arguments(): | |
parser = argparse.ArgumentParser(description='Find anomalies in numerical series.') | |
parser.add_argument('-i', '--input-path', type=str, | |
help='path where to read from (default: stdin)') | |
parser.add_argument('-d', '--input-delimiter', type=str, default='\t', | |
help='input delimiter character (default: tab)') | |
parser.add_argument('-c', '--input-columnar', action='store_true', | |
help='consider each input column as a series') | |
parser.add_argument('-o', '--output-path', type=str, | |
help='path where to write to (default: stdout)') | |
parser.add_argument('-D', '--output-delimiter', type=str, default='\t', | |
help='output delimiter character (default: tab)') | |
parser.add_argument('-H', '--output-human', action='store_true', | |
help='additionally output divergence in human-readable form') | |
parser.add_argument('-t', '--threshold', type=float, default=3.0, | |
help='only output anomalies greater than this (default: 3.0)') | |
return parser.parse_args() | |
def read_input(args): | |
input_stream = open(args.input_path) if args.input_path else sys.stdin | |
input_matrix = [ | |
line.split(args.input_delimiter) | |
for line in input_stream | |
] | |
if args.input_path: | |
input_stream.close() | |
if args.input_columnar: | |
input_matrix = list(map(list, zip(*input_matrix))) | |
input_matrix = [ | |
[name.strip()] + [ | |
float(x.strip()) | |
for x in data if x.strip() not in [None, '']] | |
for (name, *data) in input_matrix | |
] | |
return input_matrix | |
def write_output(results, arguments): | |
results.sort(key=lambda x: -abs(x[1])) | |
output_stream = open(args.output_path, 'w') if args.output_path else sys.stdout | |
for (name, divergence) in results: | |
if abs(divergence) > args.threshold: | |
human_divergence = '' | |
if args.output_human: | |
if abs(divergence) < 3: human_divergence = 'Within normality' | |
elif divergence > 0 and divergence < 9: human_divergence = 'Slightly above' | |
elif divergence < 0 and divergence > -9: human_divergence = 'Slightly below' | |
elif divergence > 0 and divergence < 27: human_divergence = 'Considerably above' | |
elif divergence < 0 and divergence > -27: human_divergence = 'Considerably below' | |
elif divergence > 0: human_divergence = 'Extremely above' | |
elif divergence < 0: human_divergence = 'Extremely below' | |
human_divergence = args.output_delimiter + human_divergence | |
print( | |
name + args.output_delimiter + str(round(divergence, 2)) + human_divergence, | |
file=output_stream | |
) | |
def generate_transformations_derive(data): | |
derived_data = derive(data) | |
return { | |
**generate_transformations_periodicize(data, ''), | |
**{'derived': derived_data}, | |
**generate_transformations_periodicize(derived_data, 'derived+') | |
} | |
def generate_transformations_periodicize(data, prefix): | |
return { | |
**generate_transformations_aggregate(data, prefix), | |
**generate_transformations_cherrypick(data, prefix) | |
} | |
def generate_transformations_aggregate(data, prefix): | |
transformations = { | |
**generate_transformations_regress(data, prefix) | |
} | |
group_size = 2 | |
while int(len(data) / group_size) >= MIN_DATA_SIZE: | |
aggregated_data = aggregate(data, group_size) | |
transformation_name = prefix + 'aggregated(%s)' % group_size | |
transformations[transformation_name] = aggregated_data | |
transformations.update( | |
generate_transformations_regress(aggregated_data, transformation_name + '+') | |
) | |
group_size += 1 | |
return transformations | |
def generate_transformations_cherrypick(data, prefix): | |
transformations = { | |
**generate_transformations_regress(data, prefix) | |
} | |
pick_every = 2 | |
while int((len(data) + pick_every - 1) / pick_every) >= MIN_DATA_SIZE: | |
cherrypicked_data = cherrypick(data, pick_every) | |
transformation_name = prefix + 'cherrypicked(%s)' % pick_every | |
transformations[transformation_name] = cherrypicked_data | |
transformations.update( | |
generate_transformations_regress(cherrypicked_data, transformation_name + '+') | |
) | |
pick_every += 1 | |
return transformations | |
def generate_transformations_regress(data, prefix): | |
transformations = {} | |
transformations[prefix + 'regressed'] = regress(data) | |
return transformations | |
def derive(data): | |
derived_data = [] | |
for i in range(len(data) - 1): | |
derived_data.append(data[i + 1] - data[i]) | |
return derived_data | |
def aggregate(data, group_size): | |
reminder = len(data) % group_size | |
trimmed_data = data[reminder:] | |
aggregated_data = [] | |
aggregated_value = 0 | |
for i in range(len(trimmed_data)): | |
aggregated_value += trimmed_data[i] | |
if (i + 1) % group_size == 0: | |
aggregated_data.append(aggregated_value / group_size) | |
aggregated_value = 0 | |
return aggregated_data | |
def cherrypick(data, pick_every): | |
compress_mask = itertools.cycle([1] + list(itertools.repeat(0, pick_every - 1))) | |
return list(reversed(list(itertools.compress(reversed(data), compress_mask)))) | |
def regress(data): | |
reg_data = data[:-1] | |
part1 = reg_data[0:int(len(reg_data) / 2)] | |
part2 = reg_data[int(len(reg_data) / 2):len(reg_data)] | |
x1 = len(part1) / 2 | |
y1 = sum(part1) / len(part1) | |
x2 = len(part1) + len(part2) / 2 | |
y2 = sum(part2) / len(part2) | |
m = (y2 - y1) / (x2 - x1) | |
b = y1 - m * x1 | |
return [data[i] - (i * m + b) for i in range(len(data))] | |
def get_bootstrap_percentiles(data, percents): | |
sample_percentiles = [] | |
for i in range(BOOTSTRAP_SAMPLES): | |
sample = [ | |
random.choice(data) | |
for j in range(len(data)) | |
] | |
sample_percentiles.append(get_percentiles(sample, percents)) | |
percentiles = [ | |
sum(percentile_collection) / float(len(percentile_collection)) | |
for percentile_collection in zip(*sample_percentiles) | |
] | |
return percentiles | |
def get_percentiles(data, percents): | |
freqs = defaultdict(lambda: 0) | |
for value in data: | |
freqs[value] += 1 | |
relative_freqs = [ | |
[value, freqs[value] / float(len(data))] | |
for value in sorted(freqs.keys()) | |
] | |
accum_freq = 0 | |
percentiles = [0 for i in range(len(percents))] | |
for value, rel_freq in relative_freqs: | |
new_accum_freq = accum_freq + rel_freq | |
for i in range(len(percents)): | |
percent = percents[i] | |
if accum_freq < percent and new_accum_freq >= percent: | |
percentiles[i] = value | |
accum_freq = new_accum_freq | |
return percentiles | |
def get_accuracy_factor(data, percentiles): | |
data_gap = max(data) - min(data) | |
if data_gap == 0: return 1 | |
percentile_gap = percentiles[2] - percentiles[0] | |
if percentile_gap >= data_gap: return 0 | |
return (data_gap - percentile_gap) / data_gap | |
def get_length_factor(data): | |
return -1 / (LENGTH_FACTOR_WEIGHT * len(data) + 1) + 1 | |
def get_divergence(percentiles, last_value): | |
low, median, high = percentiles | |
if last_value == median: | |
divergence = 0 | |
elif last_value > median: | |
base = high - median | |
if base == 0: | |
divergence = MAX_DIVERGENCE | |
else: | |
divergence = min((last_value - median) / base, MAX_DIVERGENCE) | |
else: | |
base = low - median | |
if base == 0: | |
divergence = -MAX_DIVERGENCE | |
else: | |
divergence = -min((last_value - median) / base, MAX_DIVERGENCE) | |
return divergence | |
def get_results(name, data, transformed_data): | |
percentiles = get_bootstrap_percentiles(transformed_data[:-1], PERCENTS) | |
accuracy_factor = get_accuracy_factor(data[:-1], percentiles) | |
length_factor = get_length_factor(transformed_data[:-1]) | |
final_factor = (accuracy_factor * length_factor) ** FACTOR_DECAY | |
divergence = get_divergence(percentiles, transformed_data[-1]) | |
return [final_factor, divergence] | |
def analyze(data): | |
if len(data) < MIN_DATA_SIZE: | |
return 0 | |
transformations = generate_transformations_derive(data) | |
transformations['identity'] = data | |
results = [ | |
get_results(name, data, transformed_data) | |
for name, transformed_data in transformations.items() | |
] | |
divergence_acc, divergence_fac, factor_acc = 0, 0, 0 | |
for factor, divergence in results: | |
divergence_acc += divergence | |
divergence_fac += divergence * factor | |
factor_acc += factor | |
if factor_acc == 0: | |
return divergence_acc / len(results) | |
return divergence_fac / factor_acc | |
if __name__ == '__main__': | |
args = get_arguments() | |
sequences = read_input(args) | |
results = [(name, analyze(data)) for (name, *data) in sequences] | |
write_output(results, args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment