Created
August 19, 2023 07:48
-
-
Save mitsuhiko/aaf097181aa3ce0afdf93ff4f16ec017 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zlib | |
from threading import Lock, Thread | |
class Metric(object): | |
def add(self, value): | |
raise NotImplementedError() | |
def flush(self): | |
raise NotImplementedError() | |
class CounterMetric(Metric): | |
def __init__(self): | |
self.value = 0.0 | |
def add(self, value): | |
self.value += value | |
def flush(self): | |
return self.value | |
class DistributionMetric(Metric): | |
def __init__(self) -> None: | |
self.value = [] | |
def add(self, value: float): | |
self.value.append(value) | |
def flush(self): | |
return self.value | |
class SetMetric(Metric): | |
def __init__(self): | |
self.value = set() | |
def add(self, value): | |
self.value.add(value) | |
def flush(self): | |
def _hash(x) -> int: | |
if isinstance(x, str): | |
return zlib.crc32(x.encode("utf-8")) & 0xFFFFFFFF | |
return int(x) | |
return [_hash(x) for x in self.value] | |
METRIC_TYPES = { | |
"c": CounterMetric, | |
"d": DistributionMetric, | |
"s": SetMetric, | |
} | |
class Aggregator: | |
ROLLUP = 10.0 | |
def __init__(self) -> None: | |
self.buckets = {} | |
self._lock = Lock() | |
self._running = True | |
self._flusher = Thread(target=self._flush) | |
self._flusher.daemon = True | |
self._flusher.start() | |
def _flush(self) -> None: | |
while self._running: | |
cutoff = time.time() - self.ROLLUP | |
cleanup = set() | |
metrics = [] | |
buckets = self.buckets | |
with self._lock: | |
for bucket_key, metric in buckets.items(): | |
ts, ty, name, tags = bucket_key | |
if ts > cutoff: | |
continue | |
m = { | |
"timestamp": ts, | |
"name": name, | |
"type": ty, | |
"value": metric.flush(), | |
} | |
if tags: | |
m["tags"] = dict(tags) | |
metrics.append(m) | |
cleanup.add(bucket_key) | |
for key in cleanup: | |
buckets.pop(key) | |
if metrics: | |
self._emit(metrics) | |
time.sleep(2.0) | |
def _emit(self, metrics): | |
print(metrics) | |
def add(self, ty, key, value, tags, timestamp) -> None: | |
if timestamp is None: | |
timestamp = time.time() | |
bucket_key = ( | |
int((timestamp // self.ROLLUP) * self.ROLLUP), | |
ty, | |
key, | |
tuple(sorted(tuple((tags or {}).items()))), | |
) | |
with self._lock: | |
metric = self.buckets.get(bucket_key) | |
if metric is None: | |
metric = METRIC_TYPES[ty]() | |
self.buckets[bucket_key] = metric | |
metric.add(value) | |
class Client: | |
def __init__(self) -> None: | |
self.aggregator = Aggregator() | |
def incr(self, key, value=1, tags=None, timestamp=None) -> None: | |
self.aggregator.add("c", key, value, tags, timestamp) | |
def timing(self, key, value, tags=None, timestamp=None) -> None: | |
self.aggregator.add("d", key, value, tags, timestamp) | |
def set( | |
self, | |
key, | |
value, | |
tags=None, | |
timestamp=None, | |
): | |
self.aggregator.add("s", key, value, tags, timestamp) | |
def main() -> None: | |
import random | |
c = Client() | |
while True: | |
for x in range(int(random.random() * 500)): | |
c.timing( | |
"foo.bar", | |
random.random() * 4.7 * random.random() / 2.0, | |
tags={"foo": "bar"}, | |
) | |
c.incr( | |
"foo.barilla", | |
tags={"foo": "bar"}, | |
) | |
c.set( | |
"foo.baz", | |
random.choice(["foo", "bar", "baz"]), | |
tags={"foo": "bar"}, | |
) | |
time.sleep(random.random() * 0.2) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment