Skip to content

Instantly share code, notes, and snippets.

@lrdcasimir
Created May 14, 2026 01:38
Show Gist options
  • Select an option

  • Save lrdcasimir/b8e37407691b4fd8c1fc0a7f5fe5a719 to your computer and use it in GitHub Desktop.

Select an option

Save lrdcasimir/b8e37407691b4fd8c1fc0a7f5fe5a719 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Practical Coding: Async Event Coalescer (Debouncer)\n",
"\n",
"**Context:** In a large ML fleet, inference nodes often subscribe to a stream of configuration updates (e.g., dynamic model weights, feature flags). When a massive configuration change rolls out, nodes may receive a rapid burst of updates. Applying them immediately causes CPU thrashing and momentary latency spikes. You need an `EventCoalescer` that batches these updates and only applies them when the stream has \"settled\" for a short period, or forces an apply if too much time has passed.\n",
"\n",
"**Problem:** Implement an `AsyncEventCoalescer` that absorbs incoming events and flushes them to a provided async callback.\n",
"\n",
"**Requirements:**\n",
"- `__init__(self, idle_timeout: float, max_wait: float, flush_callback: Callable)`: Initialize the coalescer. \n",
"- `async def add_event(self, event: dict) -> None`: Add an event to the buffer. This function should return immediately (non-blocking) but ensure the background timer is updated.\n",
"- `flush_callback(events: list[dict]) -> None`: This async function must be called exactly once per batch of events when either:\n",
" 1. `idle_timeout` seconds have passed since the *last* event was added.\n",
" 2. `max_wait` seconds have passed since the *first* event in the current batch was added.\n",
"\n",
"**Constraints:**\n",
"- Use `asyncio` for background task management.\n",
"- Ensure thread/task safety when modifying the buffer and managing timers."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"from typing import Callable, Coroutine, Any\n",
"\n",
"class AsyncEventCoalescer:\n",
" def __init__(self, idle_timeout: float, max_wait: float, flush_callback: Callable[[list[dict]], Coroutine[Any, Any, None]]):\n",
" self.idle_timeout = idle_timeout\n",
" self.max_wait = max_wait\n",
" self.flush_callback = flush_callback\n",
" # TODO: Initialize buffer, state, and asyncio tasks/timers\n",
" pass\n",
"\n",
" async def add_event(self, event: dict) -> None:\n",
" \"\"\"\n",
" Add an event to the batch. Update the idle timer.\n",
" Ensure max_wait is respected.\n",
" \"\"\"\n",
" # TODO: Implement\n",
" pass\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# --- Test Cases ---\n",
"import time\n",
"\n",
"async def run_tests():\n",
" flushed_batches = []\n",
" \n",
" async def mock_flush(events: list[dict]):\n",
" flushed_batches.append((time.time(), events))\n",
"\n",
" coalescer = AsyncEventCoalescer(idle_timeout=0.1, max_wait=0.3, flush_callback=mock_flush)\n",
" \n",
" start_time = time.time()\n",
" \n",
" # Scenario 1: Idle timeout triggers flush\n",
" await coalescer.add_event({\"id\": 1})\n",
" await coalescer.add_event({\"id\": 2})\n",
" await asyncio.sleep(0.15) # Wait longer than idle_timeout\n",
" \n",
" assert len(flushed_batches) == 1\n",
" assert len(flushed_batches[0][1]) == 2, \"Should flush the first 2 events\"\n",
" \n",
" # Scenario 2: Max wait triggers flush despite constant activity\n",
" flushed_batches.clear()\n",
" for i in range(4):\n",
" await coalescer.add_event({\"id\": i + 10})\n",
" await asyncio.sleep(0.08) # Less than idle_timeout (0.1), so idle never triggers\n",
" \n",
" # 4 * 0.08 = 0.32 seconds elapsed, max_wait (0.3) should have triggered!\n",
" assert len(flushed_batches) == 1, \"Max wait should have forced a flush\"\n",
" assert len(flushed_batches[0][1]) > 0, \"Events should be present\"\n",
" \n",
" print(\"All tests passed!\")\n",
"\n",
"# asyncio.run(run_tests())\n",
"# Note: Jupyter environments sometimes handle asyncio differently.\n",
"# Await the run_tests() directly if running inside a cell.\n"
]
}
],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment