Created
May 30, 2018 20:19
-
-
Save piotr1212/0868a415d33aea013021922bb7256f4a to your computer and use it in GitHub Desktop.
Trying out Python's asyncio: Minimal statsd thingy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
import threading | |
from time import time | |
SERVER_ADDRESS = ('127.0.0.1', 5566) | |
INTERVAL = 10 | |
class StatsdServer(asyncio.Protocol): | |
def __init__(self, loop, queue): | |
self.loop = loop | |
self.queue = queue | |
def connection_made(self, transport): | |
self.transport = transport | |
def datagram_received(self, data, addr): | |
for m in data.decode().splitlines(): | |
self.queue.put_nowait(m.strip()) | |
class Statsd: | |
def __init__(self): | |
self.types = { | |
'c': self._count, | |
'ms': self._timing, | |
'g': self._gauge, | |
's': self._set | |
} | |
self.buckets = { | |
'counters': {}, | |
'timings': {}, | |
'gauges': {}, | |
'sets': {} | |
} | |
self.lock = threading.Lock() | |
async def aggregate(self, queue): | |
while True: | |
item = await queue.get() | |
self.process_metric(item) | |
def unknown(self, a, b): | |
print('unknown type') | |
def process_metric(self, metric): | |
name, data = metric.split(':', 1) | |
val, typ = data.split('|', 1) | |
disp = self.types.get(typ, self.unknown) | |
disp(name, float(val)) | |
def _timing(self): | |
""" not implemented """ | |
pass | |
def _gauge(self): | |
""" not implemented """ | |
pass | |
def _set(self): | |
""" not implemented """ | |
pass | |
def _count(self, name, value): | |
""" Add value to the value in the bucket """ | |
with self.lock: | |
try: | |
self.buckets['counters'][name] += value | |
except KeyError: | |
self.buckets['counters'][name] = value | |
async def flush(self, protocol): | |
while True: | |
with self.lock: | |
for name, value in self.buckets['counters'].items(): | |
await protocol.send_message("{} {} {}\n".format( | |
name, value, int(time()))) | |
self.buckets['counters'] = {} | |
await asyncio.sleep(INTERVAL) | |
class GraphiteClient(asyncio.Protocol): | |
def __init__(self, loop, queue): | |
self._ready = asyncio.Event() | |
self.transport = None | |
self.loop = loop | |
self.queue = queue | |
asyncio.ensure_future(self._send_message()) | |
def connection_made(self, transport): | |
print('connection made called') | |
self.transport = transport | |
self._ready.set() | |
async def _send_message(self): | |
print('-- send_message called') | |
await self._ready.wait() | |
print('we are ready') | |
while True: | |
item = await self.queue.get() | |
self.transport.write(item.encode()) | |
async def send_message(self, msg): | |
await self.queue.put(msg) | |
def main(): | |
loop = asyncio.get_event_loop() | |
recv_queue = asyncio.Queue() | |
send_queue = asyncio.Queue() | |
server_factory = lambda: StatsdServer(loop, recv_queue) | |
server = loop.create_datagram_endpoint( | |
server_factory, local_addr=SERVER_ADDRESS) | |
client_factory = lambda: GraphiteClient(loop, send_queue) | |
client = loop.create_connection( | |
client_factory, '127.0.0.1', 2000) | |
s = Statsd() | |
asyncio.ensure_future(s.aggregate(recv_queue)) | |
loop.run_until_complete(server) | |
_, proto = loop.run_until_complete(client) | |
asyncio.ensure_future(s.flush(proto)) | |
loop.run_forever() | |
loop.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment