Skip to content

Instantly share code, notes, and snippets.

@kxzk
Last active February 12, 2025 02:17
Show Gist options
  • Select an option

  • Save kxzk/92155de036a6aaee720778d84192d63a to your computer and use it in GitHub Desktop.

Select an option

Save kxzk/92155de036a6aaee720778d84192d63a to your computer and use it in GitHub Desktop.
import asyncio
import csv
import uuid
from datetime import datetime
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
# global in-memory queue and event to signal flush
log_queue = asyncio.Queue()
flush_event = asyncio.Event()
async def write_csv(flush_interval: float = 10.0, flush_threshold: int = 5):
"""
periodically flushes the log_queue to a csv file
it waits until either:
- flush_event is set (triggered by reaching the threshold), or
- flush_interval seconds have elapsed.
"""
while True:
try:
# wait for the flush event or timeout
await asyncio.wait_for(flush_event.wait(), timeout=flush_interval)
except asyncio.TimeoutError:
# timeout reached; proceed with flushing whatever is in the queue
pass
finally:
flush_event.clear()
if log_queue.qsize() > 0:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"logs_{timestamp}_{uuid.uuid4().hex[:8]}.csv"
entries = []
while not log_queue.empty():
entries.append(await log_queue.get())
with open(file_name, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["event_ts", "data"])
writer.writeheader()
writer.writerows(entries)
# write to s3
print(f"Flushed {len(entries)} entries to {file_name}")
@app.on_event("startup")
async def startup_event():
# schedule the background csv writer
asyncio.create_task(write_csv())
@app.post("/log")
async def log_data(data: dict):
"""
accepts a json payload {"event_ts": "", "data": ""} and enqueues it
"""
await log_queue.put(data)
# trigger a flush if the queue size meets or exceeds the threshold
if log_queue.qsize() >= 5:
flush_event.set()
return JSONResponse({"status": "logged"})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment