Created
August 20, 2025 14:07
-
-
Save lepinkainen/7c0b9227e40bace3451e322b452396ef to your computer and use it in GitHub Desktop.
HN reply watcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# requires-python = ">=3.9" | |
# dependencies = [ | |
# "httpx>=0.27", | |
# "anyio>=4.4", | |
# ] | |
# /// | |
""" | |
hn_watch.py — Watch your Hacker News comments for score changes and replies. | |
Now with: | |
- uv-style headers: run with `uv` and it auto-installs deps | |
- sqlite state storage: durable, resumable across restarts | |
Usage: | |
uv run hn_watch.py --user <hn_username> [--interval 30] [--recent 50] [--db .hn_watch_state.sqlite] | |
Notes: | |
- The Algolia mirror is eventually consistent; allow a few minutes of lag. | |
- "Replies" are counted as *descendants* (direct or nested). New direct reply IDs are printed when detected. | |
""" | |
import argparse | |
import asyncio | |
import json | |
import os | |
import sys | |
import time | |
from dataclasses import dataclass, asdict | |
from typing import Dict, List, Optional, Set, Tuple | |
import anyio | |
import httpx | |
import sqlite3 | |
ALGOLIA_BASE = "https://hn.algolia.com/api/v1" | |
@dataclass | |
class CommentState: | |
id: int | |
points: int | |
descendants: int # total replies (all depths) | |
direct_children_ids: List[int] # only direct replies (for author diffing if desired) | |
story_id: Optional[int] | |
parent_id: Optional[int] | |
created_at_i: int | |
url: str | |
story_title: Optional[str] = None | |
def flatten_descendants(children: List[dict]) -> List[dict]: | |
out = [] | |
stack = list(children) if children else [] | |
while stack: | |
node = stack.pop() | |
out.append(node) | |
if node.get("children"): | |
stack.extend(node["children"]) | |
return out | |
def direct_children_ids(children: List[dict]) -> List[int]: | |
if not children: | |
return [] | |
return [c["id"] for c in children if isinstance(c.get("id"), int)] | |
async def get_json(client: httpx.AsyncClient, url: str, *, retries: int = 3, timeout: float = 15.0): | |
backoff = 1.0 | |
for attempt in range(retries): | |
try: | |
r = await client.get(url, timeout=timeout) | |
r.raise_for_status() | |
return r.json() | |
except (httpx.RequestError, httpx.HTTPStatusError): | |
if attempt == retries - 1: | |
raise | |
await anyio.sleep(backoff) | |
backoff *= 2 | |
async def fetch_user_comment_ids(client: httpx.AsyncClient, username: str, limit: int) -> List[Tuple[int, int]]: | |
"""Return list of (comment_id, created_at_i), newest first, up to limit.""" | |
ids: List[Tuple[int, int]] = [] | |
page = 0 | |
while len(ids) < limit: | |
url = (f"{ALGOLIA_BASE}/search_by_date?tags=comment,author_{username}" | |
f"&hitsPerPage=100&page={page}") | |
data = await get_json(client, url) | |
hits = data.get("hits", []) | |
if not hits: | |
break | |
for h in hits: | |
if "objectID" in h and "created_at_i" in h: | |
try: | |
cid = int(h["objectID"]) | |
except ValueError: | |
continue | |
ids.append((cid, int(h["created_at_i"]))) | |
if len(ids) >= limit: | |
break | |
page += 1 | |
if page >= data.get("nbPages", 0): | |
break | |
return ids | |
async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict: | |
return await get_json(client, f"{ALGOLIA_BASE}/items/{item_id}") | |
async def fetch_story_title(client: httpx.AsyncClient, story_id: int) -> Optional[str]: | |
try: | |
data = await fetch_item(client, story_id) | |
return data.get("title") | |
except Exception: | |
return None | |
def build_state_from_item(item: dict) -> CommentState: | |
points = int(item.get("points") or 0) | |
children = item.get("children") or [] | |
desc = flatten_descendants(children) | |
return CommentState( | |
id=int(item["id"]), | |
points=points, | |
descendants=len([c for c in desc if c.get("type") == "comment"]), | |
direct_children_ids=direct_children_ids(children), | |
story_id=item.get("story_id"), | |
parent_id=item.get("parent_id"), | |
created_at_i=int(item.get("created_at_i") or time.time()), | |
url=f"https://news.ycombinator.com/item?id={item['id']}", | |
story_title=None, | |
) | |
async def gather_items(client: httpx.AsyncClient, ids: List[int], concurrency: int = 8) -> Dict[int, dict]: | |
results: Dict[int, dict] = {} | |
sem = anyio.Semaphore(concurrency) | |
async def worker(i: int): | |
async with sem: | |
try: | |
data = await fetch_item(client, i) | |
results[i] = data | |
except Exception: | |
pass | |
async with anyio.create_task_group() as tg: | |
for i in ids: | |
tg.start_soon(worker, i) | |
return results | |
async def maybe_fill_titles(client: httpx.AsyncClient, states: Dict[int, CommentState]) -> None: | |
to_fetch: Set[int] = {s.story_id for s in states.values() if s.story_id and not s.story_title} | |
if not to_fetch: | |
return | |
titles: Dict[int, str] = {} | |
sem = anyio.Semaphore(5) | |
async def worker(sid: int): | |
async with sem: | |
title = await fetch_story_title(client, sid) | |
if title: | |
titles[sid] = title | |
async with anyio.create_task_group() as tg: | |
for sid in to_fetch: | |
tg.start_soon(worker, sid) | |
for s in states.values(): | |
if s.story_id and s.story_id in titles: | |
s.story_title = titles[s.story_id] | |
# ------------------------ | |
# SQLite persistence layer | |
# ------------------------ | |
DDL = """ | |
PRAGMA journal_mode = WAL; | |
CREATE TABLE IF NOT EXISTS comments ( | |
id INTEGER PRIMARY KEY, | |
points INTEGER NOT NULL, | |
descendants INTEGER NOT NULL, | |
direct_children_ids TEXT NOT NULL, -- JSON array | |
story_id INTEGER, | |
parent_id INTEGER, | |
created_at_i INTEGER NOT NULL, | |
url TEXT NOT NULL, | |
story_title TEXT, | |
last_seen_at INTEGER NOT NULL | |
); | |
CREATE INDEX IF NOT EXISTS idx_comments_last_seen ON comments(last_seen_at); | |
""" | |
def open_db(path: str) -> sqlite3.Connection: | |
conn = sqlite3.connect(path) | |
conn.execute("PRAGMA foreign_keys = ON;") | |
for stmt in DDL.strip().split(";"): | |
s = stmt.strip() | |
if s: | |
conn.execute(s) | |
conn.commit() | |
return conn | |
def load_prev_state(conn: sqlite3.Connection, ids: List[int]) -> Dict[int, CommentState]: | |
if not ids: | |
return {} | |
q_marks = ",".join("?" for _ in ids) | |
cur = conn.execute( | |
f"SELECT id, points, descendants, direct_children_ids, story_id, parent_id, created_at_i, url, story_title " | |
f"FROM comments WHERE id IN ({q_marks})", | |
ids, | |
) | |
out: Dict[int, CommentState] = {} | |
for row in cur.fetchall(): | |
cid, points, descendants, direct_children_ids_json, story_id, parent_id, created_at_i, url, story_title = row | |
try: | |
dlist = json.loads(direct_children_ids_json) if direct_children_ids_json else [] | |
except json.JSONDecodeError: | |
dlist = [] | |
out[cid] = CommentState( | |
id=cid, | |
points=points, | |
descendants=descendants, | |
direct_children_ids=dlist, | |
story_id=story_id, | |
parent_id=parent_id, | |
created_at_i=created_at_i, | |
url=url, | |
story_title=story_title, | |
) | |
return out | |
def persist_state(conn: sqlite3.Connection, states: Dict[int, CommentState]) -> None: | |
now_i = int(time.time()) | |
with conn: | |
for st in states.values(): | |
conn.execute( | |
""" | |
INSERT INTO comments (id, points, descendants, direct_children_ids, story_id, parent_id, created_at_i, url, story_title, last_seen_at) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
ON CONFLICT(id) DO UPDATE SET | |
points=excluded.points, | |
descendants=excluded.descendants, | |
direct_children_ids=excluded.direct_children_ids, | |
story_id=excluded.story_id, | |
parent_id=excluded.parent_id, | |
created_at_i=excluded.created_at_i, | |
url=excluded.url, | |
story_title=excluded.story_title, | |
last_seen_at=excluded.last_seen_at | |
""", | |
( | |
st.id, | |
st.points, | |
st.descendants, | |
json.dumps(st.direct_children_ids, ensure_ascii=False), | |
st.story_id, | |
st.parent_id, | |
st.created_at_i, | |
st.url, | |
st.story_title, | |
now_i, | |
), | |
) | |
def diff_and_report(old: Dict[int, CommentState], new: Dict[int, CommentState]) -> None: | |
ts = time.strftime('%Y-%m-%d %H:%M:%S') | |
for cid, ns in new.items(): | |
os_ = old.get(cid) | |
if not os_: | |
continue # first observation; baseline only | |
if ns.points != os_.points: | |
delta = ns.points - os_.points | |
direction = "▲" if delta > 0 else "▼" | |
print(f"[{ts}] score {direction}{delta:+d} (now {ns.points}) on comment {ns.id} | " | |
f"{(ns.story_title or 'story')} | {ns.url}") | |
if ns.descendants != os_.descendants: | |
delta = ns.descendants - os_.descendants | |
print(f"[{ts}] replies +{delta} (now {ns.descendants}) on comment {ns.id} | " | |
f"{(ns.story_title or 'story')} | {ns.url}") | |
new_direct = set(ns.direct_children_ids) - set(os_.direct_children_ids) | |
if new_direct: | |
print(f"[{ts}] new direct replies on {ns.id}: {sorted(new_direct)} | {ns.url}") | |
async def watch(username: str, interval: int, recent: int, db_path: str) -> None: | |
limits = httpx.Limits(max_connections=20, max_keepalive_connections=10) | |
headers = {"User-Agent": "hn_watch/2.0 (+https://hn.algolia.com/)"} | |
async with httpx.AsyncClient(limits=limits, headers=headers) as client: | |
conn = open_db(db_path) | |
try: | |
while True: | |
try: | |
discovered = await fetch_user_comment_ids(client, username, recent) | |
ids = [cid for cid, _ in discovered] | |
if not ids: | |
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] no comments found for '{username}'") | |
await anyio.sleep(interval) | |
continue | |
old_state = load_prev_state(conn, ids) | |
items = await gather_items(client, ids, concurrency=8) | |
new_state: Dict[int, CommentState] = {} | |
for cid in ids: | |
item = items.get(cid) | |
if not item: | |
continue | |
try: | |
st = build_state_from_item(item) | |
new_state[cid] = st | |
except Exception: | |
continue | |
await maybe_fill_titles(client, new_state) | |
diff_and_report(old_state, new_state) | |
persist_state(conn, new_state) | |
await anyio.sleep(interval) | |
except KeyboardInterrupt: | |
print("Interrupted. Exiting...") | |
break | |
except Exception as e: | |
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] error: {e}", file=sys.stderr) | |
await anyio.sleep(min(60, interval)) | |
finally: | |
conn.close() | |
def parse_args(argv: List[str]) -> argparse.Namespace: | |
p = argparse.ArgumentParser(description="Watch your HN comments for score/replies via Algolia API (sqlite state).") | |
p.add_argument("--user", required=True, help="Hacker News username to watch") | |
p.add_argument("--interval", type=int, default=30, help="Polling interval in seconds (default: 30)") | |
p.add_argument("--recent", type=int, default=50, help="How many recent comments to track (default: 50)") | |
p.add_argument("--db", default=".hn_watch_state.sqlite", help="Path to sqlite database (default: .hn_watch_state.sqlite)") | |
return p.parse_args(argv) | |
def main(argv: List[str]) -> None: | |
args = parse_args(argv) | |
try: | |
anyio.run(watch, args.user, args.interval, args.recent, args.db) | |
except KeyboardInterrupt: | |
pass | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment