Skip to content

Instantly share code, notes, and snippets.

@szapp
Last active August 3, 2025 14:55
Show Gist options
  • Save szapp/b5d6f2bec37225efce2fcb44fb1a9149 to your computer and use it in GitHub Desktop.
Save szapp/b5d6f2bec37225efce2fcb44fb1a9149 to your computer and use it in GitHub Desktop.
Renames files copied from Windows FileHistory to match the original file names. Requires uv.
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "cyclopts",
# "pytest",
# ]
# ///
"""Usage from command line:
`./restore_from_history.py --help` or `uv run --script restore_from_history.py --help`
"""
import logging
import os
import re
import stat
import sys
from pathlib import Path
from typing import Annotated, Iterable, Literal, Protocol, Sequence
import cyclopts
import pytest
app = cyclopts.App()
logger = logging.getLogger(Path(__file__).stem)
PATH_DEFAULT = Path()
PathLike = str | os.PathLike
class IO(Protocol):
def read(self, path: PathLike) -> Iterable[PathLike]: ...
def rename(self, source: PathLike, dest: PathLike): ...
def delete(self, dest: PathLike): ...
class FileSystem:
def read(self, path: PathLike) -> Iterable[PathLike]:
return Path(path).rglob("*")
def rename(self, source: PathLike, dest: PathLike):
try:
os.rename(source, dest)
except Exception:
logger.exception(f"Failed to rename {source!s} to {dest!s}.")
def delete(self, dest: PathLike):
try:
os.chmod(dest, stat.S_IWRITE)
os.remove(dest)
except Exception:
logger.exception(f"Failed to delete {dest!s}.")
def parse_file_versions(files: Iterable[PathLike]) -> dict[Path, set[str]]:
"""Parses the a list of files and groups files by file stem into a dictionary with
different timestamps.
Args:
files: List of file paths.
Returns:
A dictionary with file paths as keys and timestamps as values.
"""
pattern = re.compile(r"^(.*)( \(\d{4}_\d{2}_\d{2} \d{2}_\d{2}_\d{2} UTC\))$")
records = dict()
for file in files:
file = Path(file)
grps = pattern.match(file.stem)
if not grps:
if file.is_file():
logger.warning(f"File not matching: {file!s}.")
continue
clean_path = file.with_stem(grps.group(1))
timestamp = grps.group(2)
records.setdefault(clean_path, set()).add(timestamp)
return records
@app.default()
def main(
*,
base_path: cyclopts.types.ExistingDirectory = Path(),
dry_run: bool = False,
io: Annotated[IO, cyclopts.Parameter(parse=False)] = FileSystem(),
) -> int:
"""Renames files copied from Windows FileHistory to match the original file names.
Args:
base_path: The base path where the files are located. Defaults to the current directory.
dry_run: If True, only logs the actions without executing them.
Returns:
Exit code, 0 for success.
"""
logger.info(f"Rename files in '{base_path!s}'.")
files = io.read(base_path)
file_versions = parse_file_versions(files)
for file, versions in file_versions.items():
ver = sorted(versions)
newest = ver.pop()
file_str = file.relative_to(base_path)
if ver:
logger.info(
f"Chosing version {newest.strip('() ')} for {file_str!s} from {len(versions)} versions."
)
if not dry_run:
io.rename(file.with_stem(file.stem + newest), file)
for older in ver:
logger.debug(f"Deleting older: {older.strip('() ')} for {file_str!s}.")
if not dry_run:
io.delete(file.with_stem(file.stem + older))
return 0
"""
Tests
"""
@app.command(name="test")
def _test() -> int:
"""Run tests."""
pytest.main(
[__file__, "-v", "-s", "--tb=short", "--no-header", "-p no:cacheprovider"]
)
return 0
def test_skips_non_matching():
"""Test that a non-matching file is skipped while another is considered."""
files = ["path/file (2024_01_01 12_00_00 UTC).txt", "path/skip.txt"]
expected = {Path("path/file.txt"): {" (2024_01_01 12_00_00 UTC)"}}
actual = parse_file_versions(files)
assert actual == expected
def test_groups_versions():
"""Test that several versions of the same file are grouped under the correct name."""
files = [
"path/file (2024_01_01 12_00_00 UTC).txt",
"path/file (2024_01_02 12_00_00 UTC).txt",
]
expected = {
Path("path/file.txt"): {
" (2024_01_01 12_00_00 UTC)",
" (2024_01_02 12_00_00 UTC)",
}
}
actual = parse_file_versions(files)
assert actual == expected
def test_consideres_extension():
"""Test that files with same name but different extension are separated."""
files = [
"path/file (2024_01_01 12_00_00 UTC).txt",
"path/file (2024_01_01 12_00_00 UTC).json",
]
expected = {
Path("path/file.txt"): {
" (2024_01_01 12_00_00 UTC)",
},
Path("path/file.json"): {
" (2024_01_01 12_00_00 UTC)",
},
}
actual = parse_file_versions(files)
assert actual == expected
class FakeIO:
def __init__(self, file_versions: Sequence[PathLike]):
self.file_versions = file_versions
self.actions = list()
def read(self, path: PathLike) -> Iterable[PathLike]:
return self.file_versions
def rename(self, source: PathLike, dest: PathLike):
self.actions.append(("RENAME", source, dest))
def delete(self, dest: PathLike):
self.actions.append(("DELETE", dest))
def test_dry_run_logs_actions_only(caplog: pytest.LogCaptureFixture):
"""Integration test for dry run mode, ensuring no actions are executed."""
files = [
"path/file (2024_01_01 12_00_00 UTC).txt",
"path/file (2024_01_02 12_00_00 UTC).txt",
"path/file (2024_01_03 12_00_00 UTC).txt",
"path/file2.txt",
]
fake_io = FakeIO(files)
expected = []
with caplog.at_level(logging.INFO, logger="restore_from_history"):
main(dry_run=True, io=fake_io)
assert fake_io.actions == expected
assert "2024_01_03" in caplog.text # Selected version
assert "2024_01_02" not in caplog.text # Not mentioned at INFO level
assert "2024_01_01" not in caplog.text
def test_e2e():
"""Integration test for renaming and deleting correctly."""
files = [
"path/file (2024_01_01 12_00_00 UTC).txt",
"path/file (2024_01_02 12_00_00 UTC).txt",
"path/file (2024_01_03 12_00_00 UTC).txt",
"path/file2.txt",
]
fake_io = FakeIO(files)
expected = [
(
"RENAME",
Path("path/file (2024_01_03 12_00_00 UTC).txt"),
Path("path/file.txt"),
),
("DELETE", Path("path/file (2024_01_01 12_00_00 UTC).txt")),
("DELETE", Path("path/file (2024_01_02 12_00_00 UTC).txt")),
]
main(io=fake_io)
assert fake_io.actions == expected
@app.meta.default
def meta_default(
*tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
verbose: Literal[0, 1, 2, 3] = 2,
) -> int:
"""CLI wrapper to control verbosity and logging.
Args:
verbose: Verbosity level: 0 (ERROR), 1 (WARNING), 2 (INFO), 3 (DEBUG).
"""
command_name = app.parse_commands()[0]
if command_name != ("test",):
logging.basicConfig(
level=40 - (verbose * 10),
format="%(asctime)s | %(name)s | %(levelname)-8s | %(message)s",
)
try:
return app(tokens)
except KeyboardInterrupt:
logger.info("Interrupted by user.")
return 42
if __name__ == "__main__":
# Setup logging only if not running tests
sys.exit(app.meta())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment