Last active
November 19, 2025 05:13
-
-
Save scriptogre/397a70e04cef84aaffb6ee1c2c818187 to your computer and use it in GitHub Desktop.
Real-time background job status updates with htmx, SSE and PostgreSQL LISTEN/NOTIFY. Database fields change → LISTEN/NOTIFY → SSE → row re-fetches it's own data. Each row listens for its own job-specific SSE event.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| {% from "jobs/job_block.html" import render_job_rows %} | |
| <table id="job-table" class="..." | |
| sse-connect="/jobs/events" | |
| {# Fixes race-condition when the job finishes before connection to /job/events is made #} | |
| hx-get="/jobs/table" | |
| hx-trigger="load once delay:500ms" | |
| hx-swap="morph" | |
| hx-disinherit="*" | |
| > | |
| <thead> | |
| <tr class="..."> | |
| <th class="..." scope="col"> | |
| Compound | |
| </th> | |
| </tr> | |
| </thead> | |
| {% for job in current_page_jobs %} | |
| {% set model = MODEL_REGISTRY[job.input.model] %} | |
| <tbody id="job-{{ job.id }}" | |
| hx-get="/jobs/{{ job.id }}/rows" | |
| hx-trigger="sse:job-updated-{{ job.id }}"> | |
| {{ render_job_rows(job, model) }} | |
| </tbody> | |
| {% endfor %} | |
| {% if has_next_page %} | |
| {# Infinite scrolling - Load rows as user scrolls #} | |
| <tbody id="trigger-infinite-scroll" | |
| hx-get="/jobs/table?page_number={{ page_number + 1 }}" | |
| hx-target="this" | |
| hx-swap="outerHTML" | |
| hx-select="tbody" | |
| hx-trigger="intersect once"> | |
| <tr> | |
| <td colspan="1" class="..."> | |
| Loading more... | |
| </td> | |
| </tr> | |
| </tbody> | |
| {% endif %} | |
| </table> | |
| <div hx-swap-oob="true" id="jobs-count" class="..."> | |
| <span>{{ job_counts.total }} jobs</span> | |
| </div> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CREATE OR REPLACE FUNCTION notify_job_updated() RETURNS TRIGGER AS $$ | |
| BEGIN | |
| -- Send notification with job ID as payload | |
| PERFORM pg_notify('job_updated', NEW.id::text); | |
| RETURN NEW; | |
| END; | |
| $$ LANGUAGE plpgsql; | |
| CREATE TRIGGER job_update_notify | |
| AFTER UPDATE ON job | |
| FOR EACH ROW | |
| WHEN (OLD.status IS DISTINCT FROM NEW.status OR | |
| OLD.output::text IS DISTINCT FROM NEW.output::text) | |
| EXECUTE FUNCTION notify_job_updated(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @web_router.post("/jobs/") | |
| async def submit_job_form( | |
| request: Request, | |
| session: SessionDep, | |
| background_tasks: BackgroundTasks, | |
| smiles: str = Form(None), | |
| model: ModelType = Form(...), | |
| ): | |
| # ... Validation abbreviated | |
| compounds = [await create_compound(smiles=smiles_text.strip())] | |
| job = await create_prediction_job( | |
| model=model, | |
| compounds=compounds, | |
| session=session | |
| ) | |
| background_tasks.add_task(run_predictions_and_update_job, job=job) | |
| return Response( | |
| status_code=201, | |
| headers={"HX-Location": str(request.url_for("job_list_page"))}, | |
| ) | |
| @web_router.get("/jobs/events") | |
| async def job_status_events(request: Request): | |
| """ | |
| Server-sent events for job status updates via PostgreSQL LISTEN/NOTIFY. | |
| Listens to 'job-updated' channel and generates 'job-updated-{job_id}' | |
| SSE events for the client. | |
| """ | |
| async def generate_events(): | |
| connection = await asyncpg.connect(str(settings.ASYNCPG_RAW_URL)) | |
| try: | |
| notification_received = asyncio.Event() | |
| updated_job_id = None | |
| async def notification_handler(conn, pid, channel, payload): | |
| nonlocal updated_job_id | |
| updated_job_id = payload # Job ID from NOTIFY | |
| notification_received.set() | |
| await connection.add_listener("job-updated", notification_handler) | |
| while True: | |
| if await request.is_disconnected(): | |
| break | |
| try: | |
| await asyncio.wait_for(notification_received.wait(), timeout=30.0) | |
| notification_received.clear() | |
| if updated_job_id: | |
| job_id = int(updated_job_id) | |
| async with AsyncSession(db_engine) as session: | |
| job = await get_job_by_id(job_id, session) | |
| if job: | |
| yield { | |
| "event": f"job-updated-{job_id}", | |
| "data": f"Job {job_id} updated", | |
| } | |
| updated_job_id = None | |
| except asyncio.TimeoutError: | |
| yield {"data": "heartbeat"} # Keep connection alive | |
| finally: | |
| await connection.close() | |
| return EventSourceResponse(generate_events()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async def create_prediction_job( | |
| *, model_type: ModelType, compounds: list[Compound], session: AsyncSession | |
| ) -> Job: | |
| """ | |
| Create a new job in the database. This function does NOT run the job. | |
| """ | |
| job = Job(input=JobInput(model=model_type, compounds=compounds)) | |
| session.add(job) | |
| await session.commit() | |
| await session.refresh(job) | |
| return job | |
| async def run_predictions_and_update_job(*, job: Job) -> Job: | |
| """ | |
| Run a job and save the result to the database. | |
| Note: | |
| To be used as a background task. | |
| """ | |
| @retry( | |
| stop=stop_after_attempt(5), # Retry up to 5 times | |
| wait=wait_exponential(multiplier=1, min=5, max=30), # Exponential backoff | |
| reraise=True, | |
| ) | |
| async def attempt_prediction() -> list[PredictionResult]: | |
| return await request_prediction( | |
| model=job.input.model, compounds=job.input.compounds | |
| ) | |
| try: | |
| results = await attempt_prediction() | |
| job.output = JobOutput(results=results) | |
| job.status = Status.SUCCESS | |
| except Exception as e: | |
| job.output = JobOutput(error=str(e)) | |
| job.status = Status.ERROR | |
| print(f"Error executing prediction(s) for {job.input.compounds}: {e}") | |
| # Create a new session for the background task | |
| from main.config import db_engine | |
| async with AsyncSession(db_engine) as session: | |
| session.add(job) | |
| await session.commit() | |
| await session.refresh(job) | |
| # Note: Database triggers automatically send NOTIFY when job status/output changes | |
| return job |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment