Skip to content

Instantly share code, notes, and snippets.

@jag-k
Last active September 8, 2025 03:29
Show Gist options
  • Select an option

  • Save jag-k/01b99ea0c5c4e07bc2b864a3ceffdaae to your computer and use it in GitHub Desktop.

Select an option

Save jag-k/01b99ea0c5c4e07bc2b864a3ceffdaae to your computer and use it in GitHub Desktop.
Convert Messages from VK Archive to JSON and Markdown
#!/usr/bin/env uv run --script
# /// script
# dependencies = [
# "selectolax",
# "pathvalidate",
# ]
# ///
import datetime
import json
import logging
import os
from collections.abc import Callable, Iterator
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from contextlib import contextmanager
from pathlib import Path
from typing import Literal, NotRequired, TypedDict
from pathvalidate import sanitize_filename
from selectolax.parser import HTMLParser
CLEAR_LINE = "\r\033[K"
GROUP_CHAT_ID_THRESHOLD = 2e9
MAX_PREFETCH_LIMIT = 8192
DEFAULT_PREFETCH_MULTIPLIER = 4
NAME_MIN_PARTS = 4
DATE_PARTS = 3
logger = logging.getLogger(__name__)
_MONTHS_RU = {
"янв": 1,
"января": 1,
"январь": 1,
"фев": 2,
"февраля": 2,
"февраль": 2,
"мар": 3,
"марта": 3,
"март": 3,
"апр": 4,
"апреля": 4,
"апрель": 4,
"май": 5,
"мая": 5,
"июн": 6,
"июня": 6,
"июнь": 6,
"июл": 7,
"июля": 7,
"июль": 7,
"авг": 8,
"августа": 8,
"август": 8,
"сен": 9,
"сентября": 9,
"сентябрь": 9,
"окт": 10,
"октября": 10,
"октябрь": 10,
"ноя": 11,
"ноября": 11,
"ноябрь": 11,
"дек": 12,
"декабря": 12,
"декабрь": 12,
}
class ChatMessage(TypedDict):
id: int
user_url: NotRequired[str]
user_name: str
message: str
datetime: datetime.datetime
redacted: bool
type ChatType = Literal["private", "channel", "group"]
class Chat(TypedDict):
name: str
type: ChatType
messages: list[ChatMessage]
def _now() -> datetime.datetime:
"""
Return the current time in UTC.
"""
return datetime.datetime.now(datetime.UTC)
def _clean_print(text: str, *args: str, **kwargs):
kwargs["flush"] = True
kwargs["end"] = ""
print(f"{CLEAR_LINE}{text}", *args, **kwargs)
def _default_datetime_ru(s: str) -> datetime.datetime:
"""
Parse Russian datetime string of the form ``DD <month> YYYY в HH:MM:SS``.
"""
return datetime.datetime.strptime(s, "%d %b %Y в %H:%M:%S").replace(tzinfo=datetime.UTC)
def parse_datetime_ru(raw: str) -> tuple[datetime.datetime, bool]:
"""
Parse Russian datetime string of the form ``DD <month> YYYY в HH:MM:SS``.
Returns a tuple ``(dt, redacted)`` where ``dt`` is timezone-aware (UTC).
"""
redacted = False
s = raw.strip()
if s.endswith(" (ред.)"):
redacted = True
s = s[: -len(" (ред.)")].rstrip()
dt: datetime.datetime
s_norm = s.replace("мая", "май")
date_part, time_part = s_norm.split(" в ", 1)
day_str, month_str, year_str = date_part.split()
month_token = month_str.lower().strip(".")
month = _MONTHS_RU.get(month_token) or _MONTHS_RU.get(month_token[:3])
if month is None:
dt = _default_datetime_ru(s_norm)
return dt, redacted
try:
day = int(day_str)
year = int(year_str)
hh, mm, ss = (int(x) for x in time_part.split(":"))
dt = datetime.datetime(year, month, day, hh, mm, ss, tzinfo=datetime.UTC)
except Exception:
dt = _default_datetime_ru(s_norm)
return dt, redacted
@contextmanager
def _timeit(done_text: str = " Done", interrupt_callback: Callable[[], None] | None = None) -> Iterator[None]:
"""
Context manager for timing.
"""
start_time = _now()
try:
yield
except KeyboardInterrupt:
if interrupt_callback:
interrupt_callback()
print(f"\nProcessing cancelled (processing time: {_now() - start_time})")
raise SystemExit(1) from None
else:
print(f"{done_text} (processing time: {_now() - start_time})")
def get_chat_type(chat_id: int) -> ChatType:
"""
Determine chat type from numeric identifier.
"""
if chat_id < 0:
return "channel"
if chat_id < GROUP_CHAT_ID_THRESHOLD:
return "private"
return "group"
def resolve_archive_dir() -> Path:
"""
Resolve archive directory, honoring the ``ARCHIVE_DIR`` environment variable.
"""
archive_dir = Path("~/Downloads/Archive").expanduser()
env_archive = os.getenv("ARCHIVE_DIR")
if env_archive:
p = Path(env_archive).expanduser()
if p.is_dir():
archive_dir = p
if not archive_dir.is_dir():
archive_dir = Path(__file__).parent
return archive_dir
def build_chat_map(messages_dir: Path) -> dict[int, str]:
"""
Read ``index-messages.html`` and build a map ``chat_id -> chat_name``.
"""
chat_map: dict[int, str] = {}
try:
index_tree = HTMLParser(
(messages_dir / "index-messages.html").read_bytes(),
detect_encoding=True,
decode_errors="ignore",
)
for a in index_tree.css("a[href]"):
href_value = a.attributes.get("href")
if not isinstance(href_value, str):
continue
if not href_value.endswith("/messages0.html"):
continue
chat_id_str = href_value.split("/", 1)[0]
if not chat_id_str.lstrip("-").isdigit():
continue
chat_id_int = int(chat_id_str)
chat_map[chat_id_int] = a.text(strip=True)
except Exception as e:
logger.warning("Failed to build chat_map: %s", e)
return chat_map
def collect_html_files(messages_dir: Path, user_chat_id: int | None) -> list[Path]:
"""
Collect a list of message HTML files to process.
If ``user_chat_id`` is provided, only files for that chat are returned.
"""
c_paths: list[tuple[int, Path]]
all_html_files: list[Path] = []
if user_chat_id is not None:
target_dir = messages_dir / str(user_chat_id)
if target_dir.is_dir():
c_paths = sorted(
(
(int(i.name.removeprefix("messages").removesuffix(".html")), i)
for i in target_dir.glob("messages*.html")
),
key=lambda x: x[0],
)
all_html_files.extend([chat_path for _, chat_path in c_paths])
return all_html_files
for chat_dir in messages_dir.iterdir():
if not chat_dir.is_dir():
continue
c_paths = sorted(
((int(i.name.removeprefix("messages").removesuffix(".html")), i) for i in chat_dir.glob("messages*.html")),
key=lambda x: x[0],
)
all_html_files.extend([chat_path for _, chat_path in c_paths])
return all_html_files
def compute_max_workers() -> int:
"""
Compute number of worker threads, honoring ``PARSE_WORKERS``.
"""
cpu = os.cpu_count() or 4
env_workers = os.getenv("PARSE_WORKERS")
default_workers = min(64, max(4, cpu * 4))
return int(env_workers) if env_workers and env_workers.isdigit() else default_workers
def compute_inflight_limit(max_workers: int) -> int:
"""
Compute the upper bound of in-flight tasks (queue depth).
Honors ``PARSE_PREFETCH`` environment variable.
"""
prefetch_env = os.getenv("PARSE_PREFETCH")
try:
prefetch_multiplier = int(prefetch_env) if prefetch_env else DEFAULT_PREFETCH_MULTIPLIER
except Exception:
prefetch_multiplier = DEFAULT_PREFETCH_MULTIPLIER
return max(max_workers, min(MAX_PREFETCH_LIMIT, max_workers * max(1, prefetch_multiplier)))
def process_html_file(chat_path: Path) -> tuple[int, list[ChatMessage]]:
"""
Parse a single HTML file and return ``(chat_id, messages)``.
"""
messages: list[ChatMessage] = []
chat_name = int(chat_path.parent.name)
try:
tree = HTMLParser(
chat_path.read_bytes(),
detect_encoding=True,
decode_errors="ignore",
)
for m in tree.css("div.message"):
try:
data_id_raw = m.attributes.get("data-id")
if not data_id_raw:
continue
header = m.css_first("div.message__header") or m.css_first("div") or m
header_text = header.text(separator=" ", strip=True)
if ", " in header_text:
user_name, dt_str = header_text.rsplit(", ", 1)
else:
parts = header_text.split(" ")
user_name = " ".join(parts[:-NAME_MIN_PARTS]) if len(parts) >= NAME_MIN_PARTS else header_text
dt_str = " ".join(parts[-DATE_PARTS:]) if len(parts) >= DATE_PARTS else ""
dt, redacted = parse_datetime_ru(dt_str)
link = header.css_first("a[href]")
user_url = link.attributes.get("href") if link else None
text_parts: list[str] = []
child = m.child
while child is not None:
if child.tag == "div":
cls = child.attributes.get("class") or ""
if "message__header" not in cls:
text = child.text(separator="\n", strip=True)
if text:
text_parts.append(text)
child = child.next
message: ChatMessage = {
"id": int(data_id_raw),
"user_name": user_name,
"datetime": dt,
"redacted": redacted,
"message": "\n".join(text_parts).strip(),
}
if user_url:
message["user_url"] = user_url
messages.append(message)
except Exception as e:
logger.warning("Failed to parse message in %s: %s", chat_path, e)
except Exception as e:
logger.warning("Failed to parse file %s: %s", chat_path, e)
return chat_name, messages
def process_all_files(
all_html_files: list[Path],
chat_map: dict[int, str],
max_workers: int,
inflight_limit: int,
progress_every: int,
) -> dict[int, Chat]:
"""
Process all files with bounded concurrency and build the chats structure.
"""
chats: dict[int, Chat] = {}
processed_count = 0
total = len(all_html_files)
with (
ThreadPoolExecutor(max_workers=max_workers) as executor,
_timeit(interrupt_callback=lambda: executor.shutdown(wait=False, cancel_futures=True)),
):
paths_iter = iter(all_html_files)
futures: set = set()
for _ in range(min(inflight_limit, total)):
try:
p = next(paths_iter)
except StopIteration:
break
futures.add(executor.submit(process_html_file, p))
while futures:
done, _ = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
futures.remove(future)
chat_id, messages = future.result()
chat_name = chat_map.get(chat_id, str(chat_id))
if chat_id not in chats:
chats[chat_id] = Chat(name=chat_name, type=get_chat_type(chat_id), messages=[])
chats[chat_id]["messages"].extend(messages)
processed_count += 1
if processed_count % progress_every == 0 or processed_count == total:
_clean_print(f"Processed files [{processed_count / total * 100:5.1f}%]: {processed_count}/{total}")
try:
p = next(paths_iter)
futures.add(executor.submit(process_html_file, p))
except StopIteration:
pass
print()
return chats
def write_messages_json(archive_dir: Path, chats: dict[int, Chat]) -> int:
"""
Sort messages in each chat, write ``messages.json`` and return total message count.
"""
with _timeit():
for chat in chats.values():
chat["messages"].sort(key=lambda m: m["datetime"]) # stable sort
total_messages = sum(len(chat["messages"]) for chat in chats.values())
print(f"Writing messages.json (~{total_messages} messages)...")
(archive_dir / "messages.json").write_text(
json.dumps(chats, ensure_ascii=False, default=str),
encoding="utf-8",
errors="ignore",
)
return total_messages
def write_markdown_files(archive_dir: Path, chats: dict[int, Chat]) -> int:
"""
Create Markdown files for each chat under ``messages_md/{type}/<name>_<id>.md``.
Returns the number of generated files.
"""
result_path = archive_dir / "messages_md"
result_path.mkdir(exist_ok=True, parents=True)
items = list(chats.items())
total = len(items)
total_str = len(str(total))
written = 0
for i, (chat_id, chat) in enumerate(items):
chat_name = chat["name"]
chat_type = chat["type"]
messages = chat["messages"]
chat_path = result_path / f"{chat_type}/{sanitize_filename(chat_name)}_{chat_id}.md"
chat_path.parent.mkdir(exist_ok=True, parents=True)
content = (
"# "
+ chat_name
+ "\n\n"
+ "\n\n".join(
(
"## "
+ (f"[{name}]({m['user_url']})" if m.get("user_url") else name)
+ " "
+ m["datetime"].strftime("%d.%m.%Y %H:%M:%S")
+ (" (redacted)" if m["redacted"] else "")
+ ":\n"
+ m["message"].replace("\n", "\n\n")
)
for m in messages
if (name := m.get("user_name"))
)
)
chat_path.write_text(content, encoding="utf-8")
_clean_print(f"Saved file [{i:{total_str}}/{total}]: {chat_path}")
written += 1
print()
return written
def main(user_chat_id: int | None = None):
"""
Entry point: collect files, process them in parallel, and write JSON result.
"""
archive_dir = resolve_archive_dir()
messages_dir = archive_dir / "messages"
chat_map = build_chat_map(messages_dir)
all_html_files = collect_html_files(messages_dir, user_chat_id)
print(f"Found files to process: {len(all_html_files)}")
max_workers = compute_max_workers()
inflight_limit = compute_inflight_limit(max_workers)
total = len(all_html_files)
progress_every = max(50, total // 200) if total else 1
with _timeit("All files processed"):
chats = process_all_files(
all_html_files=all_html_files,
chat_map=chat_map,
max_workers=max_workers,
inflight_limit=inflight_limit,
progress_every=progress_every,
)
total_messages = write_messages_json(archive_dir, chats)
write_markdown_files(archive_dir, chats)
print(f"Processed chats: {len(chats)}")
print(f"Total messages: {total_messages}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment