Last active
September 22, 2023 13:07
-
-
Save AlphaSheep/0e846c6b6835f8e56394c46b793043cc to your computer and use it in GitHub Desktop.
Averages with outliers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@timeit | |
def calc_averages_naive(times: List[float], window_size: int, cutoff_size: int): | |
divisor = window_size - (cutoff_size * 2) | |
averages = [] | |
for i in range(len(times) - window_size + 1): | |
window = times[i:i+window_size] | |
window.sort() | |
averages.append(sum(window[cutoff_size: -cutoff_size]) / divisor) | |
return averages | |
averages_naive = calc_averages_naive(times, WINDOW_SIZE, CUTOFF_SIZE) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
SAMPLE_SIZE: Final[int] = 100000 | |
WINDOW_SIZE: Final[int] = 1000 | |
THRESHOLD: Final[float] = 0.05 | |
CUTOFF_SIZE: Final[int] = math.ceil(WINDOW_SIZE * THRESHOLD) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from numpy.random import normal | |
def generate_sample_data() -> List[float]: | |
y_0 = 60 | |
y_final = 20 | |
base = 0.5 | |
power = 0.00005 | |
noise = lambda : 1 + 0.2 * normal() | |
trend = lambda x : y_final + (y_0-y_final) * (base ** (power * x)) | |
return [trend(x) * noise() for x in range(SAMPLE_SIZE)] | |
times = generate_sample_data() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pylint: disable=missing-docstring, wrong-import-order, wrong-import-position, unnecessary-lambda-assignment | |
from typing import List, Final | |
import pylab as pl | |
# ------------------------------------------------ | |
# 0. Timing Decorator | |
# ------------------------------------------------ | |
from time import perf_counter | |
import functools | |
def timeit(func): | |
@functools.wraps(func) | |
def timed(*args, **kwargs): | |
start = perf_counter() | |
result = func(*args, **kwargs) | |
end = perf_counter() | |
print(f'{func.__name__} took {end - start:.6} seconds') | |
return result | |
return timed | |
# ------------------------------------------------ | |
# 1. Define some constants | |
# ------------------------------------------------ | |
import math | |
SAMPLE_SIZE: Final[int] = 100000 | |
WINDOW_SIZE: Final[int] = 1000 | |
THRESHOLD: Final[float] = 0.05 | |
CUTOFF_SIZE: Final[int] = math.ceil(WINDOW_SIZE * THRESHOLD) | |
# ------------------------------------------------ | |
# 2. Generate some data with a trend | |
# ------------------------------------------------ | |
from numpy.random import normal | |
def generate_sample_data() -> List[float]: | |
y_0 = 60 | |
y_final = 20 | |
base = 0.5 | |
power = 0.00005 | |
noise = lambda : 1 + 0.2 * normal() | |
trend = lambda x : y_final + (y_0-y_final) * (base ** (power * x)) | |
return [trend(x) * noise() for x in range(SAMPLE_SIZE)] | |
times = generate_sample_data() | |
# ------------------------------------------------ | |
# pl.plot(times, '.') | |
# pl.show() | |
# ------------------------------------------------ | |
# 3. Naive Approach | |
# ------------------------------------------------ | |
# Means | |
@timeit | |
def calc_means_naive(times: List[float], window_size: int) -> List[float]: | |
means = [] | |
for i in range(len(times) - window_size + 1): | |
window = times[i:i+window_size] | |
means.append(sum(window) / window_size) | |
return means | |
means_naive = calc_means_naive(times, WINDOW_SIZE) | |
# Averages | |
@timeit | |
def calc_averages_naive(times: List[float], window_size: int, cutoff_size: int): | |
divisor = window_size - (cutoff_size * 2) | |
averages = [] | |
for i in range(len(times) - window_size + 1): | |
window = times[i:i+window_size] | |
window.sort() | |
averages.append(sum(window[cutoff_size: -cutoff_size]) / divisor) | |
return averages | |
averages_naive = calc_averages_naive(times, WINDOW_SIZE, CUTOFF_SIZE) | |
# ------------------------------------------------ | |
# pl.plot(times, '.', alpha=0.1) | |
# pl.plot(range(WINDOW_SIZE-1, len(times)), means_naive, 'g') | |
# pl.plot(range(WINDOW_SIZE-1, len(times)), averages_naive, 'r') | |
# pl.show() | |
# ------------------------------------------------ | |
# 4. A Smarter Approach for the mean | |
# ------------------------------------------------ | |
class MovingMeanWindow(): | |
def __init__(self, initial_window: List[float]) -> None: | |
self._total: float = sum(initial_window) | |
self._window: List[float] = initial_window | |
self._size: int = len(initial_window) | |
def next(self, value: float) -> float: | |
self._window.append(value) | |
self._total += value - self._window.pop(0) | |
return self._total / self._size | |
@timeit | |
def calc_means_moving_window(times: List[float], window_size: int): | |
initial_window = times[:window_size] | |
window = MovingMeanWindow(times[:window_size]) | |
initial_mean = sum(initial_window) / window_size | |
return [initial_mean] + [window.next(x) for x in times[window_size:]] | |
means_moving_window = calc_means_moving_window(times, WINDOW_SIZE) | |
# ------------------------------------------------ | |
# pl.plot(times, '.', alpha=0.1) | |
# pl.plot(range(WINDOW_SIZE-1, len(times)), means_naive, 'g') | |
# pl.plot(range(WINDOW_SIZE-1, len(times)), averages_naive, 'r') | |
# pl.show() | |
# ------------------------------------------------ | |
# 4. A Smarter Approach for the average | |
# ------------------------------------------------ | |
from bisect import bisect_left | |
class MovingAverageWindow(): | |
def __init__(self, initial_window: List[float], cutoff_size: int) -> None: | |
self._cutoff_size = cutoff_size | |
self._values: List[float] = initial_window | |
self._sorted_values: List[float] = sorted(initial_window) | |
self._total: float = sum(initial_window) | |
self._upper_total: float = sum(self._sorted_values[-self._cutoff_size:]) | |
self._lower_total: float = sum(self._sorted_values[:self._cutoff_size]) | |
self._size: int = len(initial_window) | |
def _insert_new_value(self, value_to_add: float) -> None: | |
low_to_drop = self._sorted_values[self._cutoff_size - 1] | |
high_to_drop = self._sorted_values[-self._cutoff_size] | |
self._values.append(value_to_add) | |
insert_pos = bisect_left(self._sorted_values, value_to_add) | |
self._sorted_values.insert(insert_pos, value_to_add) | |
if insert_pos < self._cutoff_size: | |
self._lower_total += value_to_add - low_to_drop | |
if insert_pos >= (len(self._values) - self._cutoff_size): | |
self._upper_total += value_to_add - high_to_drop | |
self._total += value_to_add | |
def _remove_old_value(self) -> None: | |
value_to_remove = self._values.pop(0) | |
remove_pos = bisect_left(self._sorted_values, value_to_remove) | |
next_lowest = self._sorted_values[self._cutoff_size] | |
next_highest = self._sorted_values[-self._cutoff_size - 1] | |
if remove_pos < self._cutoff_size: | |
self._lower_total += next_lowest - value_to_remove | |
if remove_pos >= (len(self._values) - self._cutoff_size): | |
self._upper_total += next_highest - value_to_remove | |
del self._sorted_values[remove_pos] | |
self._total -= value_to_remove | |
def next(self, value_to_add: float) -> float: | |
self._insert_new_value(value_to_add) | |
self._remove_old_value() | |
middle_total = self._total - self._upper_total - self._lower_total | |
divisor = len(self._values) - (self._cutoff_size * 2) | |
return middle_total / divisor | |
@timeit | |
def calc_averages_moving_window(times: List[float], window_size: int, cutoff_size: int): | |
divisor = window_size - (cutoff_size * 2) | |
initial_window = times[:window_size] | |
window = MovingAverageWindow(times[:window_size], cutoff_size) | |
initial_average = sum(sorted(initial_window)[cutoff_size: -cutoff_size]) / divisor | |
return [initial_average] + [window.next(x) for x in times[window_size:]] | |
averages_moving_window = calc_averages_moving_window(times, WINDOW_SIZE, CUTOFF_SIZE) | |
# ------------------------------------------------ | |
pl.plot(times, '.', alpha=0.25) | |
pl.plot(range(WINDOW_SIZE-1, len(times)), means_naive, 'g') | |
pl.plot(range(WINDOW_SIZE-1, len(times)), averages_naive, 'r') | |
pl.plot(range(WINDOW_SIZE-1, len(times)), means_moving_window, 'g--') | |
pl.plot(range(WINDOW_SIZE-1, len(times)), averages_moving_window, 'r--') | |
pl.show() | |
# ------------------------------------------------ | |
for naive, better in zip(means_naive, means_moving_window): | |
assert abs(naive - better) < 1e-8 | |
for naive, better in zip(averages_naive, averages_moving_window): | |
assert abs(naive - better) < 1e-8 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@timeit | |
def calc_means_naive(times: List[float], window_size: int) -> List[float]: | |
means = [] | |
for i in range(len(times) - window_size + 1): | |
window = times[i:i+window_size] | |
means.append(sum(window) / window_size) | |
return means | |
means_naive = calc_means_naive(times, WINDOW_SIZE) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bisect import bisect_left | |
class MovingAverageWindow(): | |
def __init__(self, initial_window: List[float], cutoff_size: int) -> None: | |
self._cutoff_size = cutoff_size | |
self._values: List[float] = initial_window | |
self._sorted_values: List[float] = sorted(initial_window) | |
self._total: float = sum(initial_window) | |
self._upper_total: float = sum(self._sorted_values[-self._cutoff_size:]) | |
self._lower_total: float = sum(self._sorted_values[:self._cutoff_size]) | |
self._size: int = len(initial_window) | |
def _insert_new_value(self, value_to_add: float) -> None: | |
low_to_drop = self._sorted_values[self._cutoff_size - 1] | |
high_to_drop = self._sorted_values[-self._cutoff_size] | |
self._values.append(value_to_add) | |
insert_pos = bisect_left(self._sorted_values, value_to_add) | |
self._sorted_values.insert(insert_pos, value_to_add) | |
if insert_pos < self._cutoff_size: | |
self._lower_total += value_to_add - low_to_drop | |
if insert_pos >= (len(self._values) - self._cutoff_size): | |
self._upper_total += value_to_add - high_to_drop | |
self._total += value_to_add | |
def _remove_old_value(self) -> None: | |
value_to_remove = self._values.pop(0) | |
remove_pos = bisect_left(self._sorted_values, value_to_remove) | |
next_lowest = self._sorted_values[self._cutoff_size] | |
next_highest = self._sorted_values[-self._cutoff_size - 1] | |
if remove_pos < self._cutoff_size: | |
self._lower_total += next_lowest - value_to_remove | |
if remove_pos >= (len(self._values) - self._cutoff_size): | |
self._upper_total += next_highest - value_to_remove | |
del self._sorted_values[remove_pos] | |
self._total -= value_to_remove | |
def next(self, value_to_add: float) -> float: | |
self._insert_new_value(value_to_add) | |
self._remove_old_value() | |
middle_total = self._total - self._upper_total - self._lower_total | |
divisor = len(self._values) - (self._cutoff_size * 2) | |
return middle_total / divisor |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@timeit | |
def calc_averages_moving_window(times: List[float], window_size: int, cutoff_size: int): | |
divisor = window_size - (cutoff_size * 2) | |
initial_window = times[:window_size] | |
window = MovingAverageWindow(times[:window_size], cutoff_size) | |
initial_average = sum(sorted(initial_window)[cutoff_size: -cutoff_size]) / divisor | |
return [initial_average] + [window.next(x) for x in times[window_size:]] | |
averages_moving_window = calc_averages_moving_window(times, WINDOW_SIZE, CUTOFF_SIZE) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@timeit | |
def calc_means_moving_window(times: List[float], window_size: int): | |
initial_window = times[:window_size] | |
window = MovingMeanWindow(times[:window_size]) | |
initial_mean = sum(initial_window) / window_size | |
return [initial_mean] + [window.next(x) for x in times[window_size:]] | |
means_moving_window = calc_means_moving_window(times, WINDOW_SIZE) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MovingMeanWindow(): | |
def __init__(self, initial_window: List[float]) -> None: | |
self._total: float = sum(initial_window) | |
self._window: List[float] = initial_window | |
self._size: int = len(initial_window) | |
def next(self, value: float) -> float: | |
self._window.append(value) | |
self._total += value - self._window.pop(0) | |
return self._total / self._size |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import perf_counter | |
import functools | |
def timeit(func): | |
@functools.wraps(func) | |
def timed(*args, **kwargs): | |
start = perf_counter() | |
result = func(*args, **kwargs) | |
end = perf_counter() | |
print(f'{func.__name__} took {end - start:.6} seconds') | |
return result | |
return timed |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment