Created
May 14, 2026 01:38
-
-
Save lrdcasimir/b8e37407691b4fd8c1fc0a7f5fe5a719 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Practical Coding: Async Event Coalescer (Debouncer)\n", | |
| "\n", | |
| "**Context:** In a large ML fleet, inference nodes often subscribe to a stream of configuration updates (e.g., dynamic model weights, feature flags). When a massive configuration change rolls out, nodes may receive a rapid burst of updates. Applying them immediately causes CPU thrashing and momentary latency spikes. You need an `EventCoalescer` that batches these updates and only applies them when the stream has \"settled\" for a short period, or forces an apply if too much time has passed.\n", | |
| "\n", | |
| "**Problem:** Implement an `AsyncEventCoalescer` that absorbs incoming events and flushes them to a provided async callback.\n", | |
| "\n", | |
| "**Requirements:**\n", | |
| "- `__init__(self, idle_timeout: float, max_wait: float, flush_callback: Callable)`: Initialize the coalescer. \n", | |
| "- `async def add_event(self, event: dict) -> None`: Add an event to the buffer. This function should return immediately (non-blocking) but ensure the background timer is updated.\n", | |
| "- `flush_callback(events: list[dict]) -> None`: This async function must be called exactly once per batch of events when either:\n", | |
| " 1. `idle_timeout` seconds have passed since the *last* event was added.\n", | |
| " 2. `max_wait` seconds have passed since the *first* event in the current batch was added.\n", | |
| "\n", | |
| "**Constraints:**\n", | |
| "- Use `asyncio` for background task management.\n", | |
| "- Ensure thread/task safety when modifying the buffer and managing timers." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import asyncio\n", | |
| "from typing import Callable, Coroutine, Any\n", | |
| "\n", | |
| "class AsyncEventCoalescer:\n", | |
| " def __init__(self, idle_timeout: float, max_wait: float, flush_callback: Callable[[list[dict]], Coroutine[Any, Any, None]]):\n", | |
| " self.idle_timeout = idle_timeout\n", | |
| " self.max_wait = max_wait\n", | |
| " self.flush_callback = flush_callback\n", | |
| " # TODO: Initialize buffer, state, and asyncio tasks/timers\n", | |
| " pass\n", | |
| "\n", | |
| " async def add_event(self, event: dict) -> None:\n", | |
| " \"\"\"\n", | |
| " Add an event to the batch. Update the idle timer.\n", | |
| " Ensure max_wait is respected.\n", | |
| " \"\"\"\n", | |
| " # TODO: Implement\n", | |
| " pass\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# --- Test Cases ---\n", | |
| "import time\n", | |
| "\n", | |
| "async def run_tests():\n", | |
| " flushed_batches = []\n", | |
| " \n", | |
| " async def mock_flush(events: list[dict]):\n", | |
| " flushed_batches.append((time.time(), events))\n", | |
| "\n", | |
| " coalescer = AsyncEventCoalescer(idle_timeout=0.1, max_wait=0.3, flush_callback=mock_flush)\n", | |
| " \n", | |
| " start_time = time.time()\n", | |
| " \n", | |
| " # Scenario 1: Idle timeout triggers flush\n", | |
| " await coalescer.add_event({\"id\": 1})\n", | |
| " await coalescer.add_event({\"id\": 2})\n", | |
| " await asyncio.sleep(0.15) # Wait longer than idle_timeout\n", | |
| " \n", | |
| " assert len(flushed_batches) == 1\n", | |
| " assert len(flushed_batches[0][1]) == 2, \"Should flush the first 2 events\"\n", | |
| " \n", | |
| " # Scenario 2: Max wait triggers flush despite constant activity\n", | |
| " flushed_batches.clear()\n", | |
| " for i in range(4):\n", | |
| " await coalescer.add_event({\"id\": i + 10})\n", | |
| " await asyncio.sleep(0.08) # Less than idle_timeout (0.1), so idle never triggers\n", | |
| " \n", | |
| " # 4 * 0.08 = 0.32 seconds elapsed, max_wait (0.3) should have triggered!\n", | |
| " assert len(flushed_batches) == 1, \"Max wait should have forced a flush\"\n", | |
| " assert len(flushed_batches[0][1]) > 0, \"Events should be present\"\n", | |
| " \n", | |
| " print(\"All tests passed!\")\n", | |
| "\n", | |
| "# asyncio.run(run_tests())\n", | |
| "# Note: Jupyter environments sometimes handle asyncio differently.\n", | |
| "# Await the run_tests() directly if running inside a cell.\n" | |
| ] | |
| } | |
| ], | |
| "metadata": {}, | |
| "nbformat": 4, | |
| "nbformat_minor": 4 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment