Skip to content

Instantly share code, notes, and snippets.

@fairchild
Created April 12, 2026 16:43
Show Gist options
  • Select an option

  • Save fairchild/cf75aa93323752531e2bfa4d80aa020c to your computer and use it in GitHub Desktop.

Select an option

Save fairchild/cf75aa93323752531e2bfa4d80aa020c to your computer and use it in GitHub Desktop.
Efficient async reading and writing of front yaml frontmatter with safe concurrent operation across directories
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "aiofiles>=24.1.0",
# "PyYAML>=6.0.2",
# ]
# ///
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
import tempfile
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, AsyncIterator
import aiofiles
import yaml
MARKDOWN_EXTENSIONS: set[str] = {".md", ".markdown", ".mdx"}
DEFAULT_SCAN_BYTES = 64 * 1024
DEFAULT_CHUNK_SIZE = 64 * 1024
DEFAULT_CONCURRENCY = 32
@dataclass(slots=True)
class ProcessResult:
path: str
changed: bool
status: str
message: str = ""
class FrontmatterError(Exception):
pass
def _detect_newline(prefix: bytes) -> bytes:
return b"\r\n" if b"\r\n" in prefix else b"\n"
def _strip_utf8_bom(prefix: bytes) -> tuple[bytes, int]:
if prefix.startswith(b"\xef\xbb\xbf"):
return prefix[3:], 3
return prefix, 0
def _extract_frontmatter(prefix: bytes) -> tuple[dict[str, Any], int, bytes] | None:
scanned, bom_len = _strip_utf8_bom(prefix)
newline = _detect_newline(scanned)
opening = b"---" + newline
if not scanned.startswith(opening):
return None
start = len(opening)
close_marker = newline + b"---"
end = scanned.find(close_marker, start)
if end == -1:
raise FrontmatterError("Closing frontmatter fence not found in scan window")
after_close = end + len(close_marker)
if scanned[after_close:after_close + len(newline)] == newline:
body_offset = bom_len + after_close + len(newline)
else:
body_offset = bom_len + after_close
raw_yaml = scanned[start:end]
try:
data = yaml.safe_load(raw_yaml.decode("utf-8")) or {}
except Exception as e:
raise FrontmatterError(f"Invalid YAML: {e}") from e
if not isinstance(data, dict):
raise FrontmatterError("Frontmatter must be a YAML mapping/object")
return data, body_offset, newline
def _serialize_frontmatter(meta: dict[str, Any], newline: bytes) -> bytes:
dumped = yaml.safe_dump(
meta,
sort_keys=False,
default_flow_style=False,
allow_unicode=True,
).encode("utf-8")
return b"---" + newline + dumped + b"---" + newline
async def _copy_remaining(src_path: Path, dst_path: Path, start_offset: int, chunk_size: int) -> None:
async with aiofiles.open(src_path, "rb") as src, aiofiles.open(dst_path, "ab") as dst:
await src.seek(start_offset)
while True:
chunk = await src.read(chunk_size)
if not chunk:
break
await dst.write(chunk)
async def _replace_file_atomic(tmp_path: Path, dest_path: Path) -> None:
await asyncio.to_thread(os.replace, tmp_path, dest_path)
async def update_file(
path: str | Path,
*,
field: str = "status",
value: Any = "read",
scan_bytes: int = DEFAULT_SCAN_BYTES,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> ProcessResult:
file_path = Path(path)
if not file_path.is_file():
return ProcessResult(str(file_path), False, "error", "Not a file")
try:
async with aiofiles.open(file_path, "rb") as f:
prefix = await f.read(scan_bytes)
parsed = _extract_frontmatter(prefix)
if parsed is None:
metadata: dict[str, Any] = {field: value}
body_offset = 0
newline = b"\n"
else:
metadata, body_offset, newline = parsed
if metadata.get(field) == value:
return ProcessResult(str(file_path), False, "skipped", f"{field} already set")
metadata[field] = value
frontmatter_bytes = _serialize_frontmatter(metadata, newline)
fd, tmp_name = tempfile.mkstemp(
dir=str(file_path.parent),
prefix=f".{file_path.name}.",
suffix=".tmp",
)
os.close(fd)
tmp_path = Path(tmp_name)
try:
async with aiofiles.open(tmp_path, "wb") as tmp:
await tmp.write(frontmatter_bytes)
if body_offset == 0:
async with aiofiles.open(file_path, "rb") as src, aiofiles.open(tmp_path, "ab") as dst:
first = await src.read(chunk_size)
if first.startswith(b"\xef\xbb\xbf"):
first = first[3:]
first = first.lstrip(b"\r\n")
if first:
await dst.write(first)
while True:
chunk = await src.read(chunk_size)
if not chunk:
break
await dst.write(chunk)
else:
await _copy_remaining(file_path, tmp_path, body_offset, chunk_size)
await _replace_file_atomic(tmp_path, file_path)
return ProcessResult(str(file_path), True, "updated", f"set {field}={value!r}")
except Exception:
try:
await asyncio.to_thread(tmp_path.unlink)
except Exception:
pass
raise
except FrontmatterError as e:
return ProcessResult(str(file_path), False, "error", str(e))
except FileNotFoundError as e:
return ProcessResult(str(file_path), False, "error", f"File not found: {e}")
except PermissionError as e:
return ProcessResult(str(file_path), False, "error", f"Permission denied: {e}")
except UnicodeDecodeError as e:
return ProcessResult(str(file_path), False, "error", f"UTF-8 decode error: {e}")
except Exception as e:
return ProcessResult(str(file_path), False, "error", f"Unexpected error: {e}")
async def iter_markdown_files(path: str | Path) -> AsyncIterator[Path]:
root = Path(path)
if root.is_file():
if root.suffix.lower() in MARKDOWN_EXTENSIONS:
yield root
return
if not root.exists():
return
for dirpath, _, filenames in await asyncio.to_thread(lambda: list(os.walk(root))):
base = Path(dirpath)
for filename in filenames:
candidate = base / filename
if candidate.suffix.lower() in MARKDOWN_EXTENSIONS:
yield candidate
async def process_path(
path: str | Path,
*,
field: str = "status",
value: Any = "read",
concurrency: int = DEFAULT_CONCURRENCY,
scan_bytes: int = DEFAULT_SCAN_BYTES,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> list[ProcessResult]:
semaphore = asyncio.Semaphore(concurrency)
tasks: list[asyncio.Task[ProcessResult]] = []
async def worker(file_path: Path) -> ProcessResult:
async with semaphore:
return await update_file(
file_path,
field=field,
value=value,
scan_bytes=scan_bytes,
chunk_size=chunk_size,
)
async for file_path in iter_markdown_files(path):
tasks.append(asyncio.create_task(worker(file_path)))
if not tasks:
return [ProcessResult(str(path), False, "error", "No markdown files found")]
results: list[ProcessResult] = []
for task in asyncio.as_completed(tasks):
results.append(await task)
return results
def summarize_results(results: list[ProcessResult]) -> dict[str, int]:
summary = {"updated": 0, "skipped": 0, "error": 0}
for result in results:
if result.status in summary:
summary[result.status] += 1
return summary
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="frontmatter-status.py",
description="Async streaming YAML frontmatter updater for markdown files.",
)
parser.add_argument("path", type=Path, help="Path to a markdown file or directory")
parser.add_argument("--field", default="status", help="Frontmatter field name to set")
parser.add_argument("--value", default="read", help="Frontmatter value to set")
parser.add_argument("--concurrency", type=int, default=16, help="Max concurrent file operations")
parser.add_argument("--scan-bytes", type=int, default=65536, help="Prefix scan size in bytes")
parser.add_argument("--chunk-size", type=int, default=65536, help="Streaming copy chunk size in bytes")
parser.add_argument("--json", action="store_true", help="Emit JSON results")
parser.add_argument("--verbose", action="store_true", help="Print per-file results")
return parser
async def async_main() -> int:
parser = build_parser()
args = parser.parse_args()
results = await process_path(
args.path,
field=args.field,
value=args.value,
concurrency=args.concurrency,
scan_bytes=args.scan_bytes,
chunk_size=args.chunk_size,
)
summary = summarize_results(results)
if args.json:
print(json.dumps({
"summary": summary,
"results": [asdict(r) for r in results],
}, indent=2))
else:
if args.verbose:
for result in results:
print(f"{result.status:7} {result.path} {result.message}")
print(
f"updated={summary['updated']} "
f"skipped={summary['skipped']} "
f"errors={summary['error']}"
)
return 1 if summary["error"] > 0 else 0
def main() -> None:
raise SystemExit(asyncio.run(async_main()))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment