Skip to content

Instantly share code, notes, and snippets.

@scriptogre
Last active November 19, 2025 05:13
Show Gist options
  • Select an option

  • Save scriptogre/397a70e04cef84aaffb6ee1c2c818187 to your computer and use it in GitHub Desktop.

Select an option

Save scriptogre/397a70e04cef84aaffb6ee1c2c818187 to your computer and use it in GitHub Desktop.
Real-time background job status updates with htmx, SSE and PostgreSQL LISTEN/NOTIFY. Database fields change → LISTEN/NOTIFY → SSE → row re-fetches it's own data. Each row listens for its own job-specific SSE event.
{% from "jobs/job_block.html" import render_job_rows %}
<table id="job-table" class="..."
sse-connect="/jobs/events"
{# Fixes race-condition when the job finishes before connection to /job/events is made #}
hx-get="/jobs/table"
hx-trigger="load once delay:500ms"
hx-swap="morph"
hx-disinherit="*"
>
<thead>
<tr class="...">
<th class="..." scope="col">
Compound
</th>
</tr>
</thead>
{% for job in current_page_jobs %}
{% set model = MODEL_REGISTRY[job.input.model] %}
<tbody id="job-{{ job.id }}"
hx-get="/jobs/{{ job.id }}/rows"
hx-trigger="sse:job-updated-{{ job.id }}">
{{ render_job_rows(job, model) }}
</tbody>
{% endfor %}
{% if has_next_page %}
{# Infinite scrolling - Load rows as user scrolls #}
<tbody id="trigger-infinite-scroll"
hx-get="/jobs/table?page_number={{ page_number + 1 }}"
hx-target="this"
hx-swap="outerHTML"
hx-select="tbody"
hx-trigger="intersect once">
<tr>
<td colspan="1" class="...">
Loading more...
</td>
</tr>
</tbody>
{% endif %}
</table>
<div hx-swap-oob="true" id="jobs-count" class="...">
<span>{{ job_counts.total }} jobs</span>
</div>
CREATE OR REPLACE FUNCTION notify_job_updated() RETURNS TRIGGER AS $$
BEGIN
-- Send notification with job ID as payload
PERFORM pg_notify('job_updated', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER job_update_notify
AFTER UPDATE ON job
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status OR
OLD.output::text IS DISTINCT FROM NEW.output::text)
EXECUTE FUNCTION notify_job_updated();
@web_router.post("/jobs/")
async def submit_job_form(
request: Request,
session: SessionDep,
background_tasks: BackgroundTasks,
smiles: str = Form(None),
model: ModelType = Form(...),
):
# ... Validation abbreviated
compounds = [await create_compound(smiles=smiles_text.strip())]
job = await create_prediction_job(
model=model,
compounds=compounds,
session=session
)
background_tasks.add_task(run_predictions_and_update_job, job=job)
return Response(
status_code=201,
headers={"HX-Location": str(request.url_for("job_list_page"))},
)
@web_router.get("/jobs/events")
async def job_status_events(request: Request):
"""
Server-sent events for job status updates via PostgreSQL LISTEN/NOTIFY.
Listens to 'job-updated' channel and generates 'job-updated-{job_id}'
SSE events for the client.
"""
async def generate_events():
connection = await asyncpg.connect(str(settings.ASYNCPG_RAW_URL))
try:
notification_received = asyncio.Event()
updated_job_id = None
async def notification_handler(conn, pid, channel, payload):
nonlocal updated_job_id
updated_job_id = payload # Job ID from NOTIFY
notification_received.set()
await connection.add_listener("job-updated", notification_handler)
while True:
if await request.is_disconnected():
break
try:
await asyncio.wait_for(notification_received.wait(), timeout=30.0)
notification_received.clear()
if updated_job_id:
job_id = int(updated_job_id)
async with AsyncSession(db_engine) as session:
job = await get_job_by_id(job_id, session)
if job:
yield {
"event": f"job-updated-{job_id}",
"data": f"Job {job_id} updated",
}
updated_job_id = None
except asyncio.TimeoutError:
yield {"data": "heartbeat"} # Keep connection alive
finally:
await connection.close()
return EventSourceResponse(generate_events())
async def create_prediction_job(
*, model_type: ModelType, compounds: list[Compound], session: AsyncSession
) -> Job:
"""
Create a new job in the database. This function does NOT run the job.
"""
job = Job(input=JobInput(model=model_type, compounds=compounds))
session.add(job)
await session.commit()
await session.refresh(job)
return job
async def run_predictions_and_update_job(*, job: Job) -> Job:
"""
Run a job and save the result to the database.
Note:
To be used as a background task.
"""
@retry(
stop=stop_after_attempt(5), # Retry up to 5 times
wait=wait_exponential(multiplier=1, min=5, max=30), # Exponential backoff
reraise=True,
)
async def attempt_prediction() -> list[PredictionResult]:
return await request_prediction(
model=job.input.model, compounds=job.input.compounds
)
try:
results = await attempt_prediction()
job.output = JobOutput(results=results)
job.status = Status.SUCCESS
except Exception as e:
job.output = JobOutput(error=str(e))
job.status = Status.ERROR
print(f"Error executing prediction(s) for {job.input.compounds}: {e}")
# Create a new session for the background task
from main.config import db_engine
async with AsyncSession(db_engine) as session:
session.add(job)
await session.commit()
await session.refresh(job)
# Note: Database triggers automatically send NOTIFY when job status/output changes
return job
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment