Skip to content

Instantly share code, notes, and snippets.

@KolevDarko
Created May 12, 2020 11:16
Show Gist options
  • Save KolevDarko/2134a6efe8312180f77abe8bb37717c9 to your computer and use it in GitHub Desktop.
Save KolevDarko/2134a6efe8312180f77abe8bb37717c9 to your computer and use it in GitHub Desktop.
Moving average calculator
from datetime import datetime
from random import randint
class MovAvgCalculator:
def __init__(self, starting_list=None, window_duration=3600):
self.moving_average = None
if starting_list:
self.my_list = starting_list
self.sum = sum(starting_list)
self.count = len(starting_list)
self.calculate_average()
else:
self.my_list = []
self.sum = 0
self.count = 0
self.window_duration = window_duration
def update_window(self, new_items):
self.add_new_items(new_items)
self.remove_old_items()
self.calculate_average()
def add_new_items(self, new_items):
for el_date, el_price in new_items:
self.my_list.append((el_date, el_price))
self.sum += el_price
self.count += 1
def remove_old_items(self):
i = self.detect_old_items_idx()
if i > 0:
self.process_old_items(i)
def process_old_items(self, idx):
to_remove = self.my_list[:idx]
for _, el_price in to_remove:
self.count -= 1
self.sum -= el_price
def detect_old_items_idx(self):
latest_date = self.my_list[-1][0]
window_end = latest_date - self.window_duration
i = 0
el_date, el_price = self.my_list[i]
while el_date < window_end:
i += 1
el_date, el_price = self.my_list[i]
return i
def generate_new_items(self):
"""
Generating date/price tuples
"""
count = randint(2, 10)
new_prices = []
last_date = self.get_last_timestamp()
for i in range(count):
new_el = randint(3000, 5000)
el_date = last_date + randint(1, 10)
new_prices.append((el_date, new_el))
return new_prices
def get_last_timestamp(self):
if self.my_list:
return self.my_list[-1][0]
else:
return datetime.now().timestamp()
def calculate_average(self):
self.moving_average = self.sum / self.count
if __name__ == '__main__':
calc = MovAvgCalculator()
new_price_points = calc.generate_new_items()
calc.update_window(new_price_points)
print(calc.moving_average)
@inselbuch
Copy link

To get this to work I needed to "pop" the remove items from the list. I suggest this modification:

def remove_old_items(self):
    i = self.detect_old_items_idx()
    if i > 0:
        self.process_old_items(i)
    while i > 0:
       self.my_list.pop(0)
       i=i-1  

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment