Last active
September 8, 2025 03:29
-
-
Save jag-k/01b99ea0c5c4e07bc2b864a3ceffdaae to your computer and use it in GitHub Desktop.
Convert Messages from VK Archive to JSON and Markdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env uv run --script | |
| # /// script | |
| # dependencies = [ | |
| # "selectolax", | |
| # "pathvalidate", | |
| # ] | |
| # /// | |
| import datetime | |
| import json | |
| import logging | |
| import os | |
| from collections.abc import Callable, Iterator | |
| from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| from typing import Literal, NotRequired, TypedDict | |
| from pathvalidate import sanitize_filename | |
| from selectolax.parser import HTMLParser | |
| CLEAR_LINE = "\r\033[K" | |
| GROUP_CHAT_ID_THRESHOLD = 2e9 | |
| MAX_PREFETCH_LIMIT = 8192 | |
| DEFAULT_PREFETCH_MULTIPLIER = 4 | |
| NAME_MIN_PARTS = 4 | |
| DATE_PARTS = 3 | |
| logger = logging.getLogger(__name__) | |
| _MONTHS_RU = { | |
| "янв": 1, | |
| "января": 1, | |
| "январь": 1, | |
| "фев": 2, | |
| "февраля": 2, | |
| "февраль": 2, | |
| "мар": 3, | |
| "марта": 3, | |
| "март": 3, | |
| "апр": 4, | |
| "апреля": 4, | |
| "апрель": 4, | |
| "май": 5, | |
| "мая": 5, | |
| "июн": 6, | |
| "июня": 6, | |
| "июнь": 6, | |
| "июл": 7, | |
| "июля": 7, | |
| "июль": 7, | |
| "авг": 8, | |
| "августа": 8, | |
| "август": 8, | |
| "сен": 9, | |
| "сентября": 9, | |
| "сентябрь": 9, | |
| "окт": 10, | |
| "октября": 10, | |
| "октябрь": 10, | |
| "ноя": 11, | |
| "ноября": 11, | |
| "ноябрь": 11, | |
| "дек": 12, | |
| "декабря": 12, | |
| "декабрь": 12, | |
| } | |
| class ChatMessage(TypedDict): | |
| id: int | |
| user_url: NotRequired[str] | |
| user_name: str | |
| message: str | |
| datetime: datetime.datetime | |
| redacted: bool | |
| type ChatType = Literal["private", "channel", "group"] | |
| class Chat(TypedDict): | |
| name: str | |
| type: ChatType | |
| messages: list[ChatMessage] | |
| def _now() -> datetime.datetime: | |
| """ | |
| Return the current time in UTC. | |
| """ | |
| return datetime.datetime.now(datetime.UTC) | |
| def _clean_print(text: str, *args: str, **kwargs): | |
| kwargs["flush"] = True | |
| kwargs["end"] = "" | |
| print(f"{CLEAR_LINE}{text}", *args, **kwargs) | |
| def _default_datetime_ru(s: str) -> datetime.datetime: | |
| """ | |
| Parse Russian datetime string of the form ``DD <month> YYYY в HH:MM:SS``. | |
| """ | |
| return datetime.datetime.strptime(s, "%d %b %Y в %H:%M:%S").replace(tzinfo=datetime.UTC) | |
| def parse_datetime_ru(raw: str) -> tuple[datetime.datetime, bool]: | |
| """ | |
| Parse Russian datetime string of the form ``DD <month> YYYY в HH:MM:SS``. | |
| Returns a tuple ``(dt, redacted)`` where ``dt`` is timezone-aware (UTC). | |
| """ | |
| redacted = False | |
| s = raw.strip() | |
| if s.endswith(" (ред.)"): | |
| redacted = True | |
| s = s[: -len(" (ред.)")].rstrip() | |
| dt: datetime.datetime | |
| s_norm = s.replace("мая", "май") | |
| date_part, time_part = s_norm.split(" в ", 1) | |
| day_str, month_str, year_str = date_part.split() | |
| month_token = month_str.lower().strip(".") | |
| month = _MONTHS_RU.get(month_token) or _MONTHS_RU.get(month_token[:3]) | |
| if month is None: | |
| dt = _default_datetime_ru(s_norm) | |
| return dt, redacted | |
| try: | |
| day = int(day_str) | |
| year = int(year_str) | |
| hh, mm, ss = (int(x) for x in time_part.split(":")) | |
| dt = datetime.datetime(year, month, day, hh, mm, ss, tzinfo=datetime.UTC) | |
| except Exception: | |
| dt = _default_datetime_ru(s_norm) | |
| return dt, redacted | |
| @contextmanager | |
| def _timeit(done_text: str = " Done", interrupt_callback: Callable[[], None] | None = None) -> Iterator[None]: | |
| """ | |
| Context manager for timing. | |
| """ | |
| start_time = _now() | |
| try: | |
| yield | |
| except KeyboardInterrupt: | |
| if interrupt_callback: | |
| interrupt_callback() | |
| print(f"\nProcessing cancelled (processing time: {_now() - start_time})") | |
| raise SystemExit(1) from None | |
| else: | |
| print(f"{done_text} (processing time: {_now() - start_time})") | |
| def get_chat_type(chat_id: int) -> ChatType: | |
| """ | |
| Determine chat type from numeric identifier. | |
| """ | |
| if chat_id < 0: | |
| return "channel" | |
| if chat_id < GROUP_CHAT_ID_THRESHOLD: | |
| return "private" | |
| return "group" | |
| def resolve_archive_dir() -> Path: | |
| """ | |
| Resolve archive directory, honoring the ``ARCHIVE_DIR`` environment variable. | |
| """ | |
| archive_dir = Path("~/Downloads/Archive").expanduser() | |
| env_archive = os.getenv("ARCHIVE_DIR") | |
| if env_archive: | |
| p = Path(env_archive).expanduser() | |
| if p.is_dir(): | |
| archive_dir = p | |
| if not archive_dir.is_dir(): | |
| archive_dir = Path(__file__).parent | |
| return archive_dir | |
| def build_chat_map(messages_dir: Path) -> dict[int, str]: | |
| """ | |
| Read ``index-messages.html`` and build a map ``chat_id -> chat_name``. | |
| """ | |
| chat_map: dict[int, str] = {} | |
| try: | |
| index_tree = HTMLParser( | |
| (messages_dir / "index-messages.html").read_bytes(), | |
| detect_encoding=True, | |
| decode_errors="ignore", | |
| ) | |
| for a in index_tree.css("a[href]"): | |
| href_value = a.attributes.get("href") | |
| if not isinstance(href_value, str): | |
| continue | |
| if not href_value.endswith("/messages0.html"): | |
| continue | |
| chat_id_str = href_value.split("/", 1)[0] | |
| if not chat_id_str.lstrip("-").isdigit(): | |
| continue | |
| chat_id_int = int(chat_id_str) | |
| chat_map[chat_id_int] = a.text(strip=True) | |
| except Exception as e: | |
| logger.warning("Failed to build chat_map: %s", e) | |
| return chat_map | |
| def collect_html_files(messages_dir: Path, user_chat_id: int | None) -> list[Path]: | |
| """ | |
| Collect a list of message HTML files to process. | |
| If ``user_chat_id`` is provided, only files for that chat are returned. | |
| """ | |
| c_paths: list[tuple[int, Path]] | |
| all_html_files: list[Path] = [] | |
| if user_chat_id is not None: | |
| target_dir = messages_dir / str(user_chat_id) | |
| if target_dir.is_dir(): | |
| c_paths = sorted( | |
| ( | |
| (int(i.name.removeprefix("messages").removesuffix(".html")), i) | |
| for i in target_dir.glob("messages*.html") | |
| ), | |
| key=lambda x: x[0], | |
| ) | |
| all_html_files.extend([chat_path for _, chat_path in c_paths]) | |
| return all_html_files | |
| for chat_dir in messages_dir.iterdir(): | |
| if not chat_dir.is_dir(): | |
| continue | |
| c_paths = sorted( | |
| ((int(i.name.removeprefix("messages").removesuffix(".html")), i) for i in chat_dir.glob("messages*.html")), | |
| key=lambda x: x[0], | |
| ) | |
| all_html_files.extend([chat_path for _, chat_path in c_paths]) | |
| return all_html_files | |
| def compute_max_workers() -> int: | |
| """ | |
| Compute number of worker threads, honoring ``PARSE_WORKERS``. | |
| """ | |
| cpu = os.cpu_count() or 4 | |
| env_workers = os.getenv("PARSE_WORKERS") | |
| default_workers = min(64, max(4, cpu * 4)) | |
| return int(env_workers) if env_workers and env_workers.isdigit() else default_workers | |
| def compute_inflight_limit(max_workers: int) -> int: | |
| """ | |
| Compute the upper bound of in-flight tasks (queue depth). | |
| Honors ``PARSE_PREFETCH`` environment variable. | |
| """ | |
| prefetch_env = os.getenv("PARSE_PREFETCH") | |
| try: | |
| prefetch_multiplier = int(prefetch_env) if prefetch_env else DEFAULT_PREFETCH_MULTIPLIER | |
| except Exception: | |
| prefetch_multiplier = DEFAULT_PREFETCH_MULTIPLIER | |
| return max(max_workers, min(MAX_PREFETCH_LIMIT, max_workers * max(1, prefetch_multiplier))) | |
| def process_html_file(chat_path: Path) -> tuple[int, list[ChatMessage]]: | |
| """ | |
| Parse a single HTML file and return ``(chat_id, messages)``. | |
| """ | |
| messages: list[ChatMessage] = [] | |
| chat_name = int(chat_path.parent.name) | |
| try: | |
| tree = HTMLParser( | |
| chat_path.read_bytes(), | |
| detect_encoding=True, | |
| decode_errors="ignore", | |
| ) | |
| for m in tree.css("div.message"): | |
| try: | |
| data_id_raw = m.attributes.get("data-id") | |
| if not data_id_raw: | |
| continue | |
| header = m.css_first("div.message__header") or m.css_first("div") or m | |
| header_text = header.text(separator=" ", strip=True) | |
| if ", " in header_text: | |
| user_name, dt_str = header_text.rsplit(", ", 1) | |
| else: | |
| parts = header_text.split(" ") | |
| user_name = " ".join(parts[:-NAME_MIN_PARTS]) if len(parts) >= NAME_MIN_PARTS else header_text | |
| dt_str = " ".join(parts[-DATE_PARTS:]) if len(parts) >= DATE_PARTS else "" | |
| dt, redacted = parse_datetime_ru(dt_str) | |
| link = header.css_first("a[href]") | |
| user_url = link.attributes.get("href") if link else None | |
| text_parts: list[str] = [] | |
| child = m.child | |
| while child is not None: | |
| if child.tag == "div": | |
| cls = child.attributes.get("class") or "" | |
| if "message__header" not in cls: | |
| text = child.text(separator="\n", strip=True) | |
| if text: | |
| text_parts.append(text) | |
| child = child.next | |
| message: ChatMessage = { | |
| "id": int(data_id_raw), | |
| "user_name": user_name, | |
| "datetime": dt, | |
| "redacted": redacted, | |
| "message": "\n".join(text_parts).strip(), | |
| } | |
| if user_url: | |
| message["user_url"] = user_url | |
| messages.append(message) | |
| except Exception as e: | |
| logger.warning("Failed to parse message in %s: %s", chat_path, e) | |
| except Exception as e: | |
| logger.warning("Failed to parse file %s: %s", chat_path, e) | |
| return chat_name, messages | |
| def process_all_files( | |
| all_html_files: list[Path], | |
| chat_map: dict[int, str], | |
| max_workers: int, | |
| inflight_limit: int, | |
| progress_every: int, | |
| ) -> dict[int, Chat]: | |
| """ | |
| Process all files with bounded concurrency and build the chats structure. | |
| """ | |
| chats: dict[int, Chat] = {} | |
| processed_count = 0 | |
| total = len(all_html_files) | |
| with ( | |
| ThreadPoolExecutor(max_workers=max_workers) as executor, | |
| _timeit(interrupt_callback=lambda: executor.shutdown(wait=False, cancel_futures=True)), | |
| ): | |
| paths_iter = iter(all_html_files) | |
| futures: set = set() | |
| for _ in range(min(inflight_limit, total)): | |
| try: | |
| p = next(paths_iter) | |
| except StopIteration: | |
| break | |
| futures.add(executor.submit(process_html_file, p)) | |
| while futures: | |
| done, _ = wait(futures, return_when=FIRST_COMPLETED) | |
| for future in done: | |
| futures.remove(future) | |
| chat_id, messages = future.result() | |
| chat_name = chat_map.get(chat_id, str(chat_id)) | |
| if chat_id not in chats: | |
| chats[chat_id] = Chat(name=chat_name, type=get_chat_type(chat_id), messages=[]) | |
| chats[chat_id]["messages"].extend(messages) | |
| processed_count += 1 | |
| if processed_count % progress_every == 0 or processed_count == total: | |
| _clean_print(f"Processed files [{processed_count / total * 100:5.1f}%]: {processed_count}/{total}") | |
| try: | |
| p = next(paths_iter) | |
| futures.add(executor.submit(process_html_file, p)) | |
| except StopIteration: | |
| pass | |
| print() | |
| return chats | |
| def write_messages_json(archive_dir: Path, chats: dict[int, Chat]) -> int: | |
| """ | |
| Sort messages in each chat, write ``messages.json`` and return total message count. | |
| """ | |
| with _timeit(): | |
| for chat in chats.values(): | |
| chat["messages"].sort(key=lambda m: m["datetime"]) # stable sort | |
| total_messages = sum(len(chat["messages"]) for chat in chats.values()) | |
| print(f"Writing messages.json (~{total_messages} messages)...") | |
| (archive_dir / "messages.json").write_text( | |
| json.dumps(chats, ensure_ascii=False, default=str), | |
| encoding="utf-8", | |
| errors="ignore", | |
| ) | |
| return total_messages | |
| def write_markdown_files(archive_dir: Path, chats: dict[int, Chat]) -> int: | |
| """ | |
| Create Markdown files for each chat under ``messages_md/{type}/<name>_<id>.md``. | |
| Returns the number of generated files. | |
| """ | |
| result_path = archive_dir / "messages_md" | |
| result_path.mkdir(exist_ok=True, parents=True) | |
| items = list(chats.items()) | |
| total = len(items) | |
| total_str = len(str(total)) | |
| written = 0 | |
| for i, (chat_id, chat) in enumerate(items): | |
| chat_name = chat["name"] | |
| chat_type = chat["type"] | |
| messages = chat["messages"] | |
| chat_path = result_path / f"{chat_type}/{sanitize_filename(chat_name)}_{chat_id}.md" | |
| chat_path.parent.mkdir(exist_ok=True, parents=True) | |
| content = ( | |
| "# " | |
| + chat_name | |
| + "\n\n" | |
| + "\n\n".join( | |
| ( | |
| "## " | |
| + (f"[{name}]({m['user_url']})" if m.get("user_url") else name) | |
| + " " | |
| + m["datetime"].strftime("%d.%m.%Y %H:%M:%S") | |
| + (" (redacted)" if m["redacted"] else "") | |
| + ":\n" | |
| + m["message"].replace("\n", "\n\n") | |
| ) | |
| for m in messages | |
| if (name := m.get("user_name")) | |
| ) | |
| ) | |
| chat_path.write_text(content, encoding="utf-8") | |
| _clean_print(f"Saved file [{i:{total_str}}/{total}]: {chat_path}") | |
| written += 1 | |
| print() | |
| return written | |
| def main(user_chat_id: int | None = None): | |
| """ | |
| Entry point: collect files, process them in parallel, and write JSON result. | |
| """ | |
| archive_dir = resolve_archive_dir() | |
| messages_dir = archive_dir / "messages" | |
| chat_map = build_chat_map(messages_dir) | |
| all_html_files = collect_html_files(messages_dir, user_chat_id) | |
| print(f"Found files to process: {len(all_html_files)}") | |
| max_workers = compute_max_workers() | |
| inflight_limit = compute_inflight_limit(max_workers) | |
| total = len(all_html_files) | |
| progress_every = max(50, total // 200) if total else 1 | |
| with _timeit("All files processed"): | |
| chats = process_all_files( | |
| all_html_files=all_html_files, | |
| chat_map=chat_map, | |
| max_workers=max_workers, | |
| inflight_limit=inflight_limit, | |
| progress_every=progress_every, | |
| ) | |
| total_messages = write_messages_json(archive_dir, chats) | |
| write_markdown_files(archive_dir, chats) | |
| print(f"Processed chats: {len(chats)}") | |
| print(f"Total messages: {total_messages}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment