Skip to content

Instantly share code, notes, and snippets.

@piotr1212
Created May 30, 2018 20:19
Show Gist options
  • Save piotr1212/0868a415d33aea013021922bb7256f4a to your computer and use it in GitHub Desktop.
Save piotr1212/0868a415d33aea013021922bb7256f4a to your computer and use it in GitHub Desktop.
Trying out Python's asyncio: Minimal statsd thingy
import asyncio
import time
import threading
from time import time
SERVER_ADDRESS = ('127.0.0.1', 5566)
INTERVAL = 10
class StatsdServer(asyncio.Protocol):
def __init__(self, loop, queue):
self.loop = loop
self.queue = queue
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
for m in data.decode().splitlines():
self.queue.put_nowait(m.strip())
class Statsd:
def __init__(self):
self.types = {
'c': self._count,
'ms': self._timing,
'g': self._gauge,
's': self._set
}
self.buckets = {
'counters': {},
'timings': {},
'gauges': {},
'sets': {}
}
self.lock = threading.Lock()
async def aggregate(self, queue):
while True:
item = await queue.get()
self.process_metric(item)
def unknown(self, a, b):
print('unknown type')
def process_metric(self, metric):
name, data = metric.split(':', 1)
val, typ = data.split('|', 1)
disp = self.types.get(typ, self.unknown)
disp(name, float(val))
def _timing(self):
""" not implemented """
pass
def _gauge(self):
""" not implemented """
pass
def _set(self):
""" not implemented """
pass
def _count(self, name, value):
""" Add value to the value in the bucket """
with self.lock:
try:
self.buckets['counters'][name] += value
except KeyError:
self.buckets['counters'][name] = value
async def flush(self, protocol):
while True:
with self.lock:
for name, value in self.buckets['counters'].items():
await protocol.send_message("{} {} {}\n".format(
name, value, int(time())))
self.buckets['counters'] = {}
await asyncio.sleep(INTERVAL)
class GraphiteClient(asyncio.Protocol):
def __init__(self, loop, queue):
self._ready = asyncio.Event()
self.transport = None
self.loop = loop
self.queue = queue
asyncio.ensure_future(self._send_message())
def connection_made(self, transport):
print('connection made called')
self.transport = transport
self._ready.set()
async def _send_message(self):
print('-- send_message called')
await self._ready.wait()
print('we are ready')
while True:
item = await self.queue.get()
self.transport.write(item.encode())
async def send_message(self, msg):
await self.queue.put(msg)
def main():
loop = asyncio.get_event_loop()
recv_queue = asyncio.Queue()
send_queue = asyncio.Queue()
server_factory = lambda: StatsdServer(loop, recv_queue)
server = loop.create_datagram_endpoint(
server_factory, local_addr=SERVER_ADDRESS)
client_factory = lambda: GraphiteClient(loop, send_queue)
client = loop.create_connection(
client_factory, '127.0.0.1', 2000)
s = Statsd()
asyncio.ensure_future(s.aggregate(recv_queue))
loop.run_until_complete(server)
_, proto = loop.run_until_complete(client)
asyncio.ensure_future(s.flush(proto))
loop.run_forever()
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment