-
-
Save cliffxuan/13ddf0e378ae990e7a42592e32f07180 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zlib | |
from threading import Lock, Thread | |
from typing import ( | |
Literal, | |
Optional, | |
Protocol, | |
Sequence, | |
Type, | |
TypeAlias, | |
TypeVar, | |
overload, | |
) | |
TNum: TypeAlias = int | float | |
MetricType: TypeAlias = Literal["c", "d", "s"] | |
BucketKey: TypeAlias = tuple[int, MetricType, str, tuple[tuple[str, str], ...]] | |
T = TypeVar("T", contravariant=True) | |
class Metric(Protocol[T]): | |
def add(self, value: T): | |
... | |
def flush(self) -> TNum | Sequence[TNum]: | |
... | |
class CounterMetric(Metric[TNum]): | |
def __init__(self) -> None: | |
self.value: float = 0.0 | |
def add(self, value: TNum) -> None: | |
self.value += value | |
def flush(self) -> float: | |
return self.value | |
class DistributionMetric(Metric[TNum]): | |
def __init__(self) -> None: | |
self.value: list[TNum] = [] | |
def add(self, value: TNum) -> None: | |
self.value.append(value) | |
def flush(self) -> list[TNum]: | |
return self.value | |
class SetMetric(Metric[int | str]): | |
def __init__(self) -> None: | |
self.value: set[int | str] = set() | |
def add(self, value: int | str) -> None: | |
self.value.add(value) | |
def flush(self) -> list[int]: | |
def _hash(x: int | str) -> int: | |
if isinstance(x, str): | |
return zlib.crc32(x.encode("utf-8")) & 0xFFFFFFFF | |
return int(x) | |
return [_hash(x) for x in self.value] | |
METRIC_TYPES: dict[MetricType, Type[Metric]] = { | |
"c": CounterMetric, | |
"d": DistributionMetric, | |
"s": SetMetric, | |
} | |
class Aggregator: | |
ROLLUP: float = 10.0 | |
def __init__(self) -> None: | |
self.buckets: dict[BucketKey, Metric] = {} | |
self._lock = Lock() | |
self._running = True | |
self._flusher = Thread(target=self._flush) | |
self._flusher.daemon = True | |
self._flusher.start() | |
def _flush(self) -> None: | |
while self._running: | |
cutoff = time.time() - self.ROLLUP | |
cleanup = set() | |
metrics = [] | |
buckets = self.buckets | |
with self._lock: | |
for bucket_key, metric in buckets.items(): | |
ts, ty, name, tags = bucket_key | |
if ts > cutoff: | |
continue | |
m = { | |
"timestamp": ts, | |
"name": name, | |
"type": ty, | |
"value": metric.flush(), | |
} | |
if tags: | |
m["tags"] = dict(tags) | |
metrics.append(m) | |
cleanup.add(bucket_key) | |
for key in cleanup: | |
buckets.pop(key) | |
if metrics: | |
self._emit(metrics) | |
time.sleep(2.0) | |
def _emit(self, metrics): | |
print(metrics) | |
@overload | |
def add( | |
self, | |
ty: Literal["c"], | |
key: str, | |
value: TNum, | |
tags: Optional[dict[str, str]], | |
timestamp: Optional[float], | |
) -> None: | |
... | |
@overload | |
def add( | |
self, | |
ty: Literal["d"], | |
key: str, | |
value: TNum, | |
tags: Optional[dict[str, str]], | |
timestamp: Optional[float], | |
) -> None: | |
... | |
@overload | |
def add( | |
self, | |
ty: Literal["s"], | |
key: str, | |
value: int | str, | |
tags: Optional[dict[str, str]], | |
timestamp: Optional[float], | |
) -> None: | |
... | |
def add( | |
self, | |
ty: MetricType, | |
key: str, | |
value: TNum | str, | |
tags: Optional[dict[str, str]], | |
timestamp: Optional[float], | |
) -> None: | |
if timestamp is None: | |
timestamp = time.time() | |
bucket_key = ( | |
int((timestamp // self.ROLLUP) * self.ROLLUP), | |
ty, | |
key, | |
tuple(sorted(tuple((tags or {}).items()))), | |
) | |
with self._lock: | |
metric = self.buckets.get(bucket_key) | |
if metric is None: | |
metric = METRIC_TYPES[ty]() | |
self.buckets[bucket_key] = metric | |
metric.add(value) | |
class Client: | |
def __init__(self) -> None: | |
self.aggregator = Aggregator() | |
def incr( | |
self, | |
key: str, | |
value: int = 1, | |
tags: Optional[dict[str, str]] = None, | |
timestamp: Optional[float] = None, | |
) -> None: | |
self.aggregator.add("c", key, value, tags, timestamp) | |
def timing( | |
self, | |
key: str, | |
value: TNum, | |
tags: Optional[dict[str, str]] = None, | |
timestamp: Optional[float] = None, | |
) -> None: | |
self.aggregator.add("d", key, value, tags, timestamp) | |
def set( | |
self, | |
key: str, | |
value: int | str, | |
tags: Optional[dict[str, str]] = None, | |
timestamp: Optional[float] = None, | |
): | |
self.aggregator.add("s", key, value, tags, timestamp) | |
def main() -> None: | |
import random | |
c = Client() | |
while True: | |
for x in range(int(random.random() * 500)): | |
c.timing( | |
"foo.bar", | |
random.random() * 4.7 * random.random() / 2.0, | |
tags={"foo": "bar"}, | |
) | |
c.incr( | |
"foo.barilla", | |
tags={"foo": "bar"}, | |
) | |
c.set( | |
"foo.baz", | |
random.choice(["foo", "bar", "baz"]), | |
tags={"foo": "bar"}, | |
) | |
time.sleep(random.random() * 0.2) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment