To try it, run:
./summarize-udp-async.py 29559 stats.jsonthen send sample messages:
./send-sample-messages.pyInspect stats.json file.
| /stats.json |
| #!/usr/bin/env python3 | |
| """Send sample udp messages to sanity-check the `summarize-udp` script.""" | |
| import json | |
| import socket | |
| import time | |
| host, port = "localhost", 29559 | |
| def send_udp_message(message: bytes): | |
| with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: | |
| s.sendto(message, (host, port)) | |
| def main(): | |
| for message in [ | |
| dict(a=1, b=2, c=3), | |
| dict(a=2, b=4, c=9), | |
| dict(a=3, b=8, c=27), | |
| ]: | |
| send_udp_message(json.dumps(message).encode()) | |
| time.sleep(1) | |
| for message in [ | |
| dict(a=4, b=16, c=81), | |
| dict(a=5, b=32, c=243), | |
| dict(a=6, b=64, c=729), | |
| ]: | |
| send_udp_message(json.dumps(message).encode()) | |
| if __name__ == '__main__': | |
| main() |
| #!/usr/bin/env python3 | |
| """Receive udp messages and write summary info to file. | |
| Usage: summarize-udp <udp-port> <output-file> | |
| Input message format: json object with 3 integer fields (KEYS) e.g.: | |
| {"a":1, "b":2, "c":3} | |
| Output file contains stats (sum,max,min correspondingly for | |
| STATS_INTERVAL_SEC seconds). | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import sys | |
| import time | |
| from collections import defaultdict | |
| from operator import itemgetter | |
| from pathlib import Path | |
| from typing import DefaultDict, List, Tuple | |
| #: intervals in seconds to summarize stats for | |
| STATS_INTERVAL_SEC = 1, 6 | |
| #: recognized keys in the input json messages | |
| KEYS = "a", "b", "c" | |
| #: corresponding statistics (aggregates) | |
| STATS = sum, max, min | |
| logger = logging.getLogger("summarize-udp") | |
| class UDPProtocol(asyncio.DatagramProtocol): | |
| """Call given coroutine, passing it the decoded json message.""" | |
| def __init__(self, coro): | |
| self.handler = coro | |
| self.background_tasks = set() | |
| def datagram_received(self, data, addr): | |
| try: | |
| message = json.loads(data.decode()) | |
| except json.JSONDecodeError as e: | |
| logger.error( | |
| "Failed to decode %r from %r, reason: %s", data, addr, e | |
| ) | |
| else: | |
| task = asyncio.create_task(self.handler(message)) | |
| # save strong reference to the task, to avoid it disappearing mid-execution | |
| self.background_tasks.add(task) | |
| task.add_done_callback(self.background_tasks.discard) | |
| class UDPSummarizer: | |
| """Receive udp messages and write summary info to file.""" | |
| def __init__(self, port: int, output_filename: str): | |
| self.port = port | |
| self.path = Path(output_filename) | |
| self.stats: DefaultDict[int, List[Tuple[int, int, int]]] = defaultdict( | |
| list | |
| ) | |
| async def run(self): | |
| """Entry point for UDPSummarizer. | |
| Listen for udp messages on *self.port* on this host and write | |
| stats to self.path. | |
| """ | |
| loop = asyncio.get_running_loop() | |
| transport, _protocol = await loop.create_datagram_endpoint( | |
| lambda: UDPProtocol(self.process_message), | |
| local_addr=("0.0.0.0", self.port), | |
| ) | |
| try: | |
| await asyncio.gather( | |
| *[ | |
| self.write_stats(interval) | |
| for interval in STATS_INTERVAL_SEC | |
| ] | |
| ) | |
| finally: | |
| transport.close() | |
| async def process_message(self, message: dict): | |
| """Duplicate KEYS values for each interval in STATS_INTERVAL_SEC.""" | |
| values = itemgetter(*KEYS)(message) | |
| for interval in STATS_INTERVAL_SEC: | |
| self.stats[interval].append(values) | |
| async def write_stats(self, interval: int): | |
| """Write stats at *interval* seconds boundary.""" | |
| loop = asyncio.get_running_loop() | |
| timer = loop.time | |
| while True: | |
| # write stats and "interval" boundary according to timer(), see: | |
| # https://ru.stackoverflow.com/a/577378/23044 | |
| await asyncio.sleep(interval - timer() % interval) | |
| stats = self.stats[interval] | |
| if not stats: | |
| # no stats accumulated yet | |
| stats = [(0,)] * len(KEYS) # write zero statistics anyway | |
| data = { | |
| f"{key}_{aggregate.__name__}": aggregate(values) | |
| for key, values, aggregate in zip(KEYS, stats, STATS) | |
| } | |
| stats.clear() | |
| # TODO consider async. I/O if it is a bottleneck | |
| with self.path.open("a", encoding="utf-8") as file: | |
| json.dump( | |
| dict( | |
| data, | |
| timestamp=int(time.time()), | |
| count_type=f"{interval}s", | |
| ), | |
| file, | |
| ) | |
| file.write("\n") | |
| async def main(): | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| if len(sys.argv) != 3: | |
| sys.exit(__doc__) | |
| port = int(sys.argv[1]) | |
| filename = sys.argv[2] | |
| summarizer = UDPSummarizer(port, filename) | |
| try: | |
| await summarizer.run() | |
| except KeyboardInterrupt: | |
| logger.warning("Got Ctrl-C") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |