Created
April 12, 2026 16:43
-
-
Save fairchild/cf75aa93323752531e2bfa4d80aa020c to your computer and use it in GitHub Desktop.
Efficient async reading and writing of front yaml frontmatter with safe concurrent operation across directories
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "aiofiles>=24.1.0", | |
| # "PyYAML>=6.0.2", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import tempfile | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any, AsyncIterator | |
| import aiofiles | |
| import yaml | |
| MARKDOWN_EXTENSIONS: set[str] = {".md", ".markdown", ".mdx"} | |
| DEFAULT_SCAN_BYTES = 64 * 1024 | |
| DEFAULT_CHUNK_SIZE = 64 * 1024 | |
| DEFAULT_CONCURRENCY = 32 | |
| @dataclass(slots=True) | |
| class ProcessResult: | |
| path: str | |
| changed: bool | |
| status: str | |
| message: str = "" | |
| class FrontmatterError(Exception): | |
| pass | |
| def _detect_newline(prefix: bytes) -> bytes: | |
| return b"\r\n" if b"\r\n" in prefix else b"\n" | |
| def _strip_utf8_bom(prefix: bytes) -> tuple[bytes, int]: | |
| if prefix.startswith(b"\xef\xbb\xbf"): | |
| return prefix[3:], 3 | |
| return prefix, 0 | |
| def _extract_frontmatter(prefix: bytes) -> tuple[dict[str, Any], int, bytes] | None: | |
| scanned, bom_len = _strip_utf8_bom(prefix) | |
| newline = _detect_newline(scanned) | |
| opening = b"---" + newline | |
| if not scanned.startswith(opening): | |
| return None | |
| start = len(opening) | |
| close_marker = newline + b"---" | |
| end = scanned.find(close_marker, start) | |
| if end == -1: | |
| raise FrontmatterError("Closing frontmatter fence not found in scan window") | |
| after_close = end + len(close_marker) | |
| if scanned[after_close:after_close + len(newline)] == newline: | |
| body_offset = bom_len + after_close + len(newline) | |
| else: | |
| body_offset = bom_len + after_close | |
| raw_yaml = scanned[start:end] | |
| try: | |
| data = yaml.safe_load(raw_yaml.decode("utf-8")) or {} | |
| except Exception as e: | |
| raise FrontmatterError(f"Invalid YAML: {e}") from e | |
| if not isinstance(data, dict): | |
| raise FrontmatterError("Frontmatter must be a YAML mapping/object") | |
| return data, body_offset, newline | |
| def _serialize_frontmatter(meta: dict[str, Any], newline: bytes) -> bytes: | |
| dumped = yaml.safe_dump( | |
| meta, | |
| sort_keys=False, | |
| default_flow_style=False, | |
| allow_unicode=True, | |
| ).encode("utf-8") | |
| return b"---" + newline + dumped + b"---" + newline | |
| async def _copy_remaining(src_path: Path, dst_path: Path, start_offset: int, chunk_size: int) -> None: | |
| async with aiofiles.open(src_path, "rb") as src, aiofiles.open(dst_path, "ab") as dst: | |
| await src.seek(start_offset) | |
| while True: | |
| chunk = await src.read(chunk_size) | |
| if not chunk: | |
| break | |
| await dst.write(chunk) | |
| async def _replace_file_atomic(tmp_path: Path, dest_path: Path) -> None: | |
| await asyncio.to_thread(os.replace, tmp_path, dest_path) | |
| async def update_file( | |
| path: str | Path, | |
| *, | |
| field: str = "status", | |
| value: Any = "read", | |
| scan_bytes: int = DEFAULT_SCAN_BYTES, | |
| chunk_size: int = DEFAULT_CHUNK_SIZE, | |
| ) -> ProcessResult: | |
| file_path = Path(path) | |
| if not file_path.is_file(): | |
| return ProcessResult(str(file_path), False, "error", "Not a file") | |
| try: | |
| async with aiofiles.open(file_path, "rb") as f: | |
| prefix = await f.read(scan_bytes) | |
| parsed = _extract_frontmatter(prefix) | |
| if parsed is None: | |
| metadata: dict[str, Any] = {field: value} | |
| body_offset = 0 | |
| newline = b"\n" | |
| else: | |
| metadata, body_offset, newline = parsed | |
| if metadata.get(field) == value: | |
| return ProcessResult(str(file_path), False, "skipped", f"{field} already set") | |
| metadata[field] = value | |
| frontmatter_bytes = _serialize_frontmatter(metadata, newline) | |
| fd, tmp_name = tempfile.mkstemp( | |
| dir=str(file_path.parent), | |
| prefix=f".{file_path.name}.", | |
| suffix=".tmp", | |
| ) | |
| os.close(fd) | |
| tmp_path = Path(tmp_name) | |
| try: | |
| async with aiofiles.open(tmp_path, "wb") as tmp: | |
| await tmp.write(frontmatter_bytes) | |
| if body_offset == 0: | |
| async with aiofiles.open(file_path, "rb") as src, aiofiles.open(tmp_path, "ab") as dst: | |
| first = await src.read(chunk_size) | |
| if first.startswith(b"\xef\xbb\xbf"): | |
| first = first[3:] | |
| first = first.lstrip(b"\r\n") | |
| if first: | |
| await dst.write(first) | |
| while True: | |
| chunk = await src.read(chunk_size) | |
| if not chunk: | |
| break | |
| await dst.write(chunk) | |
| else: | |
| await _copy_remaining(file_path, tmp_path, body_offset, chunk_size) | |
| await _replace_file_atomic(tmp_path, file_path) | |
| return ProcessResult(str(file_path), True, "updated", f"set {field}={value!r}") | |
| except Exception: | |
| try: | |
| await asyncio.to_thread(tmp_path.unlink) | |
| except Exception: | |
| pass | |
| raise | |
| except FrontmatterError as e: | |
| return ProcessResult(str(file_path), False, "error", str(e)) | |
| except FileNotFoundError as e: | |
| return ProcessResult(str(file_path), False, "error", f"File not found: {e}") | |
| except PermissionError as e: | |
| return ProcessResult(str(file_path), False, "error", f"Permission denied: {e}") | |
| except UnicodeDecodeError as e: | |
| return ProcessResult(str(file_path), False, "error", f"UTF-8 decode error: {e}") | |
| except Exception as e: | |
| return ProcessResult(str(file_path), False, "error", f"Unexpected error: {e}") | |
| async def iter_markdown_files(path: str | Path) -> AsyncIterator[Path]: | |
| root = Path(path) | |
| if root.is_file(): | |
| if root.suffix.lower() in MARKDOWN_EXTENSIONS: | |
| yield root | |
| return | |
| if not root.exists(): | |
| return | |
| for dirpath, _, filenames in await asyncio.to_thread(lambda: list(os.walk(root))): | |
| base = Path(dirpath) | |
| for filename in filenames: | |
| candidate = base / filename | |
| if candidate.suffix.lower() in MARKDOWN_EXTENSIONS: | |
| yield candidate | |
| async def process_path( | |
| path: str | Path, | |
| *, | |
| field: str = "status", | |
| value: Any = "read", | |
| concurrency: int = DEFAULT_CONCURRENCY, | |
| scan_bytes: int = DEFAULT_SCAN_BYTES, | |
| chunk_size: int = DEFAULT_CHUNK_SIZE, | |
| ) -> list[ProcessResult]: | |
| semaphore = asyncio.Semaphore(concurrency) | |
| tasks: list[asyncio.Task[ProcessResult]] = [] | |
| async def worker(file_path: Path) -> ProcessResult: | |
| async with semaphore: | |
| return await update_file( | |
| file_path, | |
| field=field, | |
| value=value, | |
| scan_bytes=scan_bytes, | |
| chunk_size=chunk_size, | |
| ) | |
| async for file_path in iter_markdown_files(path): | |
| tasks.append(asyncio.create_task(worker(file_path))) | |
| if not tasks: | |
| return [ProcessResult(str(path), False, "error", "No markdown files found")] | |
| results: list[ProcessResult] = [] | |
| for task in asyncio.as_completed(tasks): | |
| results.append(await task) | |
| return results | |
| def summarize_results(results: list[ProcessResult]) -> dict[str, int]: | |
| summary = {"updated": 0, "skipped": 0, "error": 0} | |
| for result in results: | |
| if result.status in summary: | |
| summary[result.status] += 1 | |
| return summary | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| prog="frontmatter-status.py", | |
| description="Async streaming YAML frontmatter updater for markdown files.", | |
| ) | |
| parser.add_argument("path", type=Path, help="Path to a markdown file or directory") | |
| parser.add_argument("--field", default="status", help="Frontmatter field name to set") | |
| parser.add_argument("--value", default="read", help="Frontmatter value to set") | |
| parser.add_argument("--concurrency", type=int, default=16, help="Max concurrent file operations") | |
| parser.add_argument("--scan-bytes", type=int, default=65536, help="Prefix scan size in bytes") | |
| parser.add_argument("--chunk-size", type=int, default=65536, help="Streaming copy chunk size in bytes") | |
| parser.add_argument("--json", action="store_true", help="Emit JSON results") | |
| parser.add_argument("--verbose", action="store_true", help="Print per-file results") | |
| return parser | |
| async def async_main() -> int: | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| results = await process_path( | |
| args.path, | |
| field=args.field, | |
| value=args.value, | |
| concurrency=args.concurrency, | |
| scan_bytes=args.scan_bytes, | |
| chunk_size=args.chunk_size, | |
| ) | |
| summary = summarize_results(results) | |
| if args.json: | |
| print(json.dumps({ | |
| "summary": summary, | |
| "results": [asdict(r) for r in results], | |
| }, indent=2)) | |
| else: | |
| if args.verbose: | |
| for result in results: | |
| print(f"{result.status:7} {result.path} {result.message}") | |
| print( | |
| f"updated={summary['updated']} " | |
| f"skipped={summary['skipped']} " | |
| f"errors={summary['error']}" | |
| ) | |
| return 1 if summary["error"] > 0 else 0 | |
| def main() -> None: | |
| raise SystemExit(asyncio.run(async_main())) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment