Last active
March 21, 2025 23:28
-
-
Save gtalarico/e478b38e950bb761a4d40c500b8f1608 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from typing import List, Dict | |
class Database(): | |
# Simple Datastore for Readings | |
# where data= { "meter_id": [ {ts, flow_rate}, ... ]} | |
def __init__(self, path): | |
# Read/Write from datastore | |
self.data: Dict[str, List['Reading']] = defaultdict(list) | |
def insert(self, meter_id, reading: 'Reading'): | |
existing_data = self.data[meter_id] | |
existing_data.append(reading) | |
def batch_insert(self, meter_id, readings: 'List[Reading]'): | |
[self.insert(meter_id, reading) for reading in readings] | |
def get_readings(self, meter_id): | |
return self.data[meter_id] | |
class Meter(): | |
# Place holder | |
def __init__(self, id): | |
self.id = id | |
class Reading(): | |
# Meter Reading | |
def __init__(self, flow_rate, ts=None): | |
self.flow_rate: int = flow_rate | |
self.ts = ts # Ignoring for this exercise | |
class BasicAnomalyDetector(): | |
# Simple analsyis of a ratio between historical avg and avg of the new set of readings | |
# This is very flawed but a just a quick proof of concept | |
def __init__(self, db): | |
self.db = db | |
def analyse(self, meter_id, new_readings: List[Reading]) -> 'DetectionResult': | |
existing_avg = average(db.get_readings(meter_id)) | |
# If historic data is too small we may want to discard analysis... | |
new_avg = average(new_readings) | |
low, high = sorted((existing_avg, new_avg)) | |
ratio = low / high | |
return DetectionResult(meter_id, ratio) | |
class DetectionResult: | |
def __init__(self, meter_id, ratio): | |
self.meter_id = meter_id | |
self.ratio = ratio | |
def possible_leak(self): | |
# Arbitrary ratio! | |
return self.ratio < 0.5 | |
def __repr__(self): | |
return f"<Result meter_id={self.meter_id} ratio={self.ratio} possible_leak={self.possible_leak()}>" | |
# Analysis Util | |
# reaslisticaly we would want something to operate in large sets efficiantly (e.g. dataframe or numpy) | |
# and use models trained to detect based on real world data | |
def average(readings): | |
return sum([r.flow_rate for r in readings]) / len(readings) | |
db = Database(path="./db.json") | |
detector = BasicAnomalyDetector(db) | |
meters = Meter("meter-1") | |
base_data = [ | |
Reading(0), | |
Reading(5), | |
Reading(3), | |
Reading(2), | |
Reading(1), | |
Reading(0), | |
] | |
# Bootstrap Data | |
db.batch_insert(meters.id, base_data) | |
new_readings = [ | |
Reading(0), | |
Reading(5), | |
Reading(10), | |
Reading(3), | |
Reading(0), | |
] | |
abnormal_readings = [ | |
Reading(0), | |
Reading(5), | |
Reading(3), | |
Reading(4), | |
Reading(50), | |
] | |
rv = detector.analyse(meters.id, new_readings) | |
print(f"Result: {rv}") | |
db.batch_insert(meters.id, new_readings) | |
rv2 = detector.analyse(meters.id, abnormal_readings) | |
print(f"Result: {rv2}") | |
# Output | |
# Result: [<Result meter_id=meter-1 ratio=0.5092592592592592 possible_leak=False>] | |
# Result: [<Result meter_id=meter-1 ratio=0.21260997067448678 possible_leak=True>] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment