Skip to content

Instantly share code, notes, and snippets.

@lepinkainen
Created August 20, 2025 14:07
Show Gist options
  • Save lepinkainen/7c0b9227e40bace3451e322b452396ef to your computer and use it in GitHub Desktop.
Save lepinkainen/7c0b9227e40bace3451e322b452396ef to your computer and use it in GitHub Desktop.
HN reply watcher
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "httpx>=0.27",
# "anyio>=4.4",
# ]
# ///
"""
hn_watch.py — Watch your Hacker News comments for score changes and replies.
Now with:
- uv-style headers: run with `uv` and it auto-installs deps
- sqlite state storage: durable, resumable across restarts
Usage:
uv run hn_watch.py --user <hn_username> [--interval 30] [--recent 50] [--db .hn_watch_state.sqlite]
Notes:
- The Algolia mirror is eventually consistent; allow a few minutes of lag.
- "Replies" are counted as *descendants* (direct or nested). New direct reply IDs are printed when detected.
"""
import argparse
import asyncio
import json
import os
import sys
import time
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Set, Tuple
import anyio
import httpx
import sqlite3
ALGOLIA_BASE = "https://hn.algolia.com/api/v1"
@dataclass
class CommentState:
id: int
points: int
descendants: int # total replies (all depths)
direct_children_ids: List[int] # only direct replies (for author diffing if desired)
story_id: Optional[int]
parent_id: Optional[int]
created_at_i: int
url: str
story_title: Optional[str] = None
def flatten_descendants(children: List[dict]) -> List[dict]:
out = []
stack = list(children) if children else []
while stack:
node = stack.pop()
out.append(node)
if node.get("children"):
stack.extend(node["children"])
return out
def direct_children_ids(children: List[dict]) -> List[int]:
if not children:
return []
return [c["id"] for c in children if isinstance(c.get("id"), int)]
async def get_json(client: httpx.AsyncClient, url: str, *, retries: int = 3, timeout: float = 15.0):
backoff = 1.0
for attempt in range(retries):
try:
r = await client.get(url, timeout=timeout)
r.raise_for_status()
return r.json()
except (httpx.RequestError, httpx.HTTPStatusError):
if attempt == retries - 1:
raise
await anyio.sleep(backoff)
backoff *= 2
async def fetch_user_comment_ids(client: httpx.AsyncClient, username: str, limit: int) -> List[Tuple[int, int]]:
"""Return list of (comment_id, created_at_i), newest first, up to limit."""
ids: List[Tuple[int, int]] = []
page = 0
while len(ids) < limit:
url = (f"{ALGOLIA_BASE}/search_by_date?tags=comment,author_{username}"
f"&hitsPerPage=100&page={page}")
data = await get_json(client, url)
hits = data.get("hits", [])
if not hits:
break
for h in hits:
if "objectID" in h and "created_at_i" in h:
try:
cid = int(h["objectID"])
except ValueError:
continue
ids.append((cid, int(h["created_at_i"])))
if len(ids) >= limit:
break
page += 1
if page >= data.get("nbPages", 0):
break
return ids
async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict:
return await get_json(client, f"{ALGOLIA_BASE}/items/{item_id}")
async def fetch_story_title(client: httpx.AsyncClient, story_id: int) -> Optional[str]:
try:
data = await fetch_item(client, story_id)
return data.get("title")
except Exception:
return None
def build_state_from_item(item: dict) -> CommentState:
points = int(item.get("points") or 0)
children = item.get("children") or []
desc = flatten_descendants(children)
return CommentState(
id=int(item["id"]),
points=points,
descendants=len([c for c in desc if c.get("type") == "comment"]),
direct_children_ids=direct_children_ids(children),
story_id=item.get("story_id"),
parent_id=item.get("parent_id"),
created_at_i=int(item.get("created_at_i") or time.time()),
url=f"https://news.ycombinator.com/item?id={item['id']}",
story_title=None,
)
async def gather_items(client: httpx.AsyncClient, ids: List[int], concurrency: int = 8) -> Dict[int, dict]:
results: Dict[int, dict] = {}
sem = anyio.Semaphore(concurrency)
async def worker(i: int):
async with sem:
try:
data = await fetch_item(client, i)
results[i] = data
except Exception:
pass
async with anyio.create_task_group() as tg:
for i in ids:
tg.start_soon(worker, i)
return results
async def maybe_fill_titles(client: httpx.AsyncClient, states: Dict[int, CommentState]) -> None:
to_fetch: Set[int] = {s.story_id for s in states.values() if s.story_id and not s.story_title}
if not to_fetch:
return
titles: Dict[int, str] = {}
sem = anyio.Semaphore(5)
async def worker(sid: int):
async with sem:
title = await fetch_story_title(client, sid)
if title:
titles[sid] = title
async with anyio.create_task_group() as tg:
for sid in to_fetch:
tg.start_soon(worker, sid)
for s in states.values():
if s.story_id and s.story_id in titles:
s.story_title = titles[s.story_id]
# ------------------------
# SQLite persistence layer
# ------------------------
DDL = """
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY,
points INTEGER NOT NULL,
descendants INTEGER NOT NULL,
direct_children_ids TEXT NOT NULL, -- JSON array
story_id INTEGER,
parent_id INTEGER,
created_at_i INTEGER NOT NULL,
url TEXT NOT NULL,
story_title TEXT,
last_seen_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_comments_last_seen ON comments(last_seen_at);
"""
def open_db(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.execute("PRAGMA foreign_keys = ON;")
for stmt in DDL.strip().split(";"):
s = stmt.strip()
if s:
conn.execute(s)
conn.commit()
return conn
def load_prev_state(conn: sqlite3.Connection, ids: List[int]) -> Dict[int, CommentState]:
if not ids:
return {}
q_marks = ",".join("?" for _ in ids)
cur = conn.execute(
f"SELECT id, points, descendants, direct_children_ids, story_id, parent_id, created_at_i, url, story_title "
f"FROM comments WHERE id IN ({q_marks})",
ids,
)
out: Dict[int, CommentState] = {}
for row in cur.fetchall():
cid, points, descendants, direct_children_ids_json, story_id, parent_id, created_at_i, url, story_title = row
try:
dlist = json.loads(direct_children_ids_json) if direct_children_ids_json else []
except json.JSONDecodeError:
dlist = []
out[cid] = CommentState(
id=cid,
points=points,
descendants=descendants,
direct_children_ids=dlist,
story_id=story_id,
parent_id=parent_id,
created_at_i=created_at_i,
url=url,
story_title=story_title,
)
return out
def persist_state(conn: sqlite3.Connection, states: Dict[int, CommentState]) -> None:
now_i = int(time.time())
with conn:
for st in states.values():
conn.execute(
"""
INSERT INTO comments (id, points, descendants, direct_children_ids, story_id, parent_id, created_at_i, url, story_title, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
points=excluded.points,
descendants=excluded.descendants,
direct_children_ids=excluded.direct_children_ids,
story_id=excluded.story_id,
parent_id=excluded.parent_id,
created_at_i=excluded.created_at_i,
url=excluded.url,
story_title=excluded.story_title,
last_seen_at=excluded.last_seen_at
""",
(
st.id,
st.points,
st.descendants,
json.dumps(st.direct_children_ids, ensure_ascii=False),
st.story_id,
st.parent_id,
st.created_at_i,
st.url,
st.story_title,
now_i,
),
)
def diff_and_report(old: Dict[int, CommentState], new: Dict[int, CommentState]) -> None:
ts = time.strftime('%Y-%m-%d %H:%M:%S')
for cid, ns in new.items():
os_ = old.get(cid)
if not os_:
continue # first observation; baseline only
if ns.points != os_.points:
delta = ns.points - os_.points
direction = "▲" if delta > 0 else "▼"
print(f"[{ts}] score {direction}{delta:+d} (now {ns.points}) on comment {ns.id} | "
f"{(ns.story_title or 'story')} | {ns.url}")
if ns.descendants != os_.descendants:
delta = ns.descendants - os_.descendants
print(f"[{ts}] replies +{delta} (now {ns.descendants}) on comment {ns.id} | "
f"{(ns.story_title or 'story')} | {ns.url}")
new_direct = set(ns.direct_children_ids) - set(os_.direct_children_ids)
if new_direct:
print(f"[{ts}] new direct replies on {ns.id}: {sorted(new_direct)} | {ns.url}")
async def watch(username: str, interval: int, recent: int, db_path: str) -> None:
limits = httpx.Limits(max_connections=20, max_keepalive_connections=10)
headers = {"User-Agent": "hn_watch/2.0 (+https://hn.algolia.com/)"}
async with httpx.AsyncClient(limits=limits, headers=headers) as client:
conn = open_db(db_path)
try:
while True:
try:
discovered = await fetch_user_comment_ids(client, username, recent)
ids = [cid for cid, _ in discovered]
if not ids:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] no comments found for '{username}'")
await anyio.sleep(interval)
continue
old_state = load_prev_state(conn, ids)
items = await gather_items(client, ids, concurrency=8)
new_state: Dict[int, CommentState] = {}
for cid in ids:
item = items.get(cid)
if not item:
continue
try:
st = build_state_from_item(item)
new_state[cid] = st
except Exception:
continue
await maybe_fill_titles(client, new_state)
diff_and_report(old_state, new_state)
persist_state(conn, new_state)
await anyio.sleep(interval)
except KeyboardInterrupt:
print("Interrupted. Exiting...")
break
except Exception as e:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] error: {e}", file=sys.stderr)
await anyio.sleep(min(60, interval))
finally:
conn.close()
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Watch your HN comments for score/replies via Algolia API (sqlite state).")
p.add_argument("--user", required=True, help="Hacker News username to watch")
p.add_argument("--interval", type=int, default=30, help="Polling interval in seconds (default: 30)")
p.add_argument("--recent", type=int, default=50, help="How many recent comments to track (default: 50)")
p.add_argument("--db", default=".hn_watch_state.sqlite", help="Path to sqlite database (default: .hn_watch_state.sqlite)")
return p.parse_args(argv)
def main(argv: List[str]) -> None:
args = parse_args(argv)
try:
anyio.run(watch, args.user, args.interval, args.recent, args.db)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment