Last active
February 12, 2025 02:17
-
-
Save kxzk/92155de036a6aaee720778d84192d63a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import csv | |
| import uuid | |
| from datetime import datetime | |
| from fastapi import FastAPI | |
| from fastapi.responses import JSONResponse | |
| app = FastAPI() | |
| # global in-memory queue and event to signal flush | |
| log_queue = asyncio.Queue() | |
| flush_event = asyncio.Event() | |
| async def write_csv(flush_interval: float = 10.0, flush_threshold: int = 5): | |
| """ | |
| periodically flushes the log_queue to a csv file | |
| it waits until either: | |
| - flush_event is set (triggered by reaching the threshold), or | |
| - flush_interval seconds have elapsed. | |
| """ | |
| while True: | |
| try: | |
| # wait for the flush event or timeout | |
| await asyncio.wait_for(flush_event.wait(), timeout=flush_interval) | |
| except asyncio.TimeoutError: | |
| # timeout reached; proceed with flushing whatever is in the queue | |
| pass | |
| finally: | |
| flush_event.clear() | |
| if log_queue.qsize() > 0: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| file_name = f"logs_{timestamp}_{uuid.uuid4().hex[:8]}.csv" | |
| entries = [] | |
| while not log_queue.empty(): | |
| entries.append(await log_queue.get()) | |
| with open(file_name, "w", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=["event_ts", "data"]) | |
| writer.writeheader() | |
| writer.writerows(entries) | |
| # write to s3 | |
| print(f"Flushed {len(entries)} entries to {file_name}") | |
| @app.on_event("startup") | |
| async def startup_event(): | |
| # schedule the background csv writer | |
| asyncio.create_task(write_csv()) | |
| @app.post("/log") | |
| async def log_data(data: dict): | |
| """ | |
| accepts a json payload {"event_ts": "", "data": ""} and enqueues it | |
| """ | |
| await log_queue.put(data) | |
| # trigger a flush if the queue size meets or exceeds the threshold | |
| if log_queue.qsize() >= 5: | |
| flush_event.set() | |
| return JSONResponse({"status": "logged"}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment