Skip to content

Instantly share code, notes, and snippets.

@gtalarico
Last active March 21, 2025 23:28
Show Gist options
  • Save gtalarico/e478b38e950bb761a4d40c500b8f1608 to your computer and use it in GitHub Desktop.
Save gtalarico/e478b38e950bb761a4d40c500b8f1608 to your computer and use it in GitHub Desktop.
from collections import defaultdict
from typing import List, Dict
class Database():
# Simple Datastore for Readings
# where data= { "meter_id": [ {ts, flow_rate}, ... ]}
def __init__(self, path):
# Read/Write from datastore
self.data: Dict[str, List['Reading']] = defaultdict(list)
def insert(self, meter_id, reading: 'Reading'):
existing_data = self.data[meter_id]
existing_data.append(reading)
def batch_insert(self, meter_id, readings: 'List[Reading]'):
[self.insert(meter_id, reading) for reading in readings]
def get_readings(self, meter_id):
return self.data[meter_id]
class Meter():
# Place holder
def __init__(self, id):
self.id = id
class Reading():
# Meter Reading
def __init__(self, flow_rate, ts=None):
self.flow_rate: int = flow_rate
self.ts = ts # Ignoring for this exercise
class BasicAnomalyDetector():
# Simple analsyis of a ratio between historical avg and avg of the new set of readings
# This is very flawed but a just a quick proof of concept
def __init__(self, db):
self.db = db
def analyse(self, meter_id, new_readings: List[Reading]) -> 'DetectionResult':
existing_avg = average(db.get_readings(meter_id))
# If historic data is too small we may want to discard analysis...
new_avg = average(new_readings)
low, high = sorted((existing_avg, new_avg))
ratio = low / high
return DetectionResult(meter_id, ratio)
class DetectionResult:
def __init__(self, meter_id, ratio):
self.meter_id = meter_id
self.ratio = ratio
def possible_leak(self):
# Arbitrary ratio!
return self.ratio < 0.5
def __repr__(self):
return f"<Result meter_id={self.meter_id} ratio={self.ratio} possible_leak={self.possible_leak()}>"
# Analysis Util
# reaslisticaly we would want something to operate in large sets efficiantly (e.g. dataframe or numpy)
# and use models trained to detect based on real world data
def average(readings):
return sum([r.flow_rate for r in readings]) / len(readings)
db = Database(path="./db.json")
detector = BasicAnomalyDetector(db)
meters = Meter("meter-1")
base_data = [
Reading(0),
Reading(5),
Reading(3),
Reading(2),
Reading(1),
Reading(0),
]
# Bootstrap Data
db.batch_insert(meters.id, base_data)
new_readings = [
Reading(0),
Reading(5),
Reading(10),
Reading(3),
Reading(0),
]
abnormal_readings = [
Reading(0),
Reading(5),
Reading(3),
Reading(4),
Reading(50),
]
rv = detector.analyse(meters.id, new_readings)
print(f"Result: {rv}")
db.batch_insert(meters.id, new_readings)
rv2 = detector.analyse(meters.id, abnormal_readings)
print(f"Result: {rv2}")
# Output
# Result: [<Result meter_id=meter-1 ratio=0.5092592592592592 possible_leak=False>]
# Result: [<Result meter_id=meter-1 ratio=0.21260997067448678 possible_leak=True>]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment