Last active
August 3, 2025 14:55
-
-
Save szapp/b5d6f2bec37225efce2fcb44fb1a9149 to your computer and use it in GitHub Desktop.
Renames files copied from Windows FileHistory to match the original file names. Requires uv.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# requires-python = ">=3.13" | |
# dependencies = [ | |
# "cyclopts", | |
# "pytest", | |
# ] | |
# /// | |
"""Usage from command line: | |
`./restore_from_history.py --help` or `uv run --script restore_from_history.py --help` | |
""" | |
import logging | |
import os | |
import re | |
import stat | |
import sys | |
from pathlib import Path | |
from typing import Annotated, Iterable, Literal, Protocol, Sequence | |
import cyclopts | |
import pytest | |
app = cyclopts.App() | |
logger = logging.getLogger(Path(__file__).stem) | |
PATH_DEFAULT = Path() | |
PathLike = str | os.PathLike | |
class IO(Protocol): | |
def read(self, path: PathLike) -> Iterable[PathLike]: ... | |
def rename(self, source: PathLike, dest: PathLike): ... | |
def delete(self, dest: PathLike): ... | |
class FileSystem: | |
def read(self, path: PathLike) -> Iterable[PathLike]: | |
return Path(path).rglob("*") | |
def rename(self, source: PathLike, dest: PathLike): | |
try: | |
os.rename(source, dest) | |
except Exception: | |
logger.exception(f"Failed to rename {source!s} to {dest!s}.") | |
def delete(self, dest: PathLike): | |
try: | |
os.chmod(dest, stat.S_IWRITE) | |
os.remove(dest) | |
except Exception: | |
logger.exception(f"Failed to delete {dest!s}.") | |
def parse_file_versions(files: Iterable[PathLike]) -> dict[Path, set[str]]: | |
"""Parses the a list of files and groups files by file stem into a dictionary with | |
different timestamps. | |
Args: | |
files: List of file paths. | |
Returns: | |
A dictionary with file paths as keys and timestamps as values. | |
""" | |
pattern = re.compile(r"^(.*)( \(\d{4}_\d{2}_\d{2} \d{2}_\d{2}_\d{2} UTC\))$") | |
records = dict() | |
for file in files: | |
file = Path(file) | |
grps = pattern.match(file.stem) | |
if not grps: | |
if file.is_file(): | |
logger.warning(f"File not matching: {file!s}.") | |
continue | |
clean_path = file.with_stem(grps.group(1)) | |
timestamp = grps.group(2) | |
records.setdefault(clean_path, set()).add(timestamp) | |
return records | |
@app.default() | |
def main( | |
*, | |
base_path: cyclopts.types.ExistingDirectory = Path(), | |
dry_run: bool = False, | |
io: Annotated[IO, cyclopts.Parameter(parse=False)] = FileSystem(), | |
) -> int: | |
"""Renames files copied from Windows FileHistory to match the original file names. | |
Args: | |
base_path: The base path where the files are located. Defaults to the current directory. | |
dry_run: If True, only logs the actions without executing them. | |
Returns: | |
Exit code, 0 for success. | |
""" | |
logger.info(f"Rename files in '{base_path!s}'.") | |
files = io.read(base_path) | |
file_versions = parse_file_versions(files) | |
for file, versions in file_versions.items(): | |
ver = sorted(versions) | |
newest = ver.pop() | |
file_str = file.relative_to(base_path) | |
if ver: | |
logger.info( | |
f"Chosing version {newest.strip('() ')} for {file_str!s} from {len(versions)} versions." | |
) | |
if not dry_run: | |
io.rename(file.with_stem(file.stem + newest), file) | |
for older in ver: | |
logger.debug(f"Deleting older: {older.strip('() ')} for {file_str!s}.") | |
if not dry_run: | |
io.delete(file.with_stem(file.stem + older)) | |
return 0 | |
""" | |
Tests | |
""" | |
@app.command(name="test") | |
def _test() -> int: | |
"""Run tests.""" | |
pytest.main( | |
[__file__, "-v", "-s", "--tb=short", "--no-header", "-p no:cacheprovider"] | |
) | |
return 0 | |
def test_skips_non_matching(): | |
"""Test that a non-matching file is skipped while another is considered.""" | |
files = ["path/file (2024_01_01 12_00_00 UTC).txt", "path/skip.txt"] | |
expected = {Path("path/file.txt"): {" (2024_01_01 12_00_00 UTC)"}} | |
actual = parse_file_versions(files) | |
assert actual == expected | |
def test_groups_versions(): | |
"""Test that several versions of the same file are grouped under the correct name.""" | |
files = [ | |
"path/file (2024_01_01 12_00_00 UTC).txt", | |
"path/file (2024_01_02 12_00_00 UTC).txt", | |
] | |
expected = { | |
Path("path/file.txt"): { | |
" (2024_01_01 12_00_00 UTC)", | |
" (2024_01_02 12_00_00 UTC)", | |
} | |
} | |
actual = parse_file_versions(files) | |
assert actual == expected | |
def test_consideres_extension(): | |
"""Test that files with same name but different extension are separated.""" | |
files = [ | |
"path/file (2024_01_01 12_00_00 UTC).txt", | |
"path/file (2024_01_01 12_00_00 UTC).json", | |
] | |
expected = { | |
Path("path/file.txt"): { | |
" (2024_01_01 12_00_00 UTC)", | |
}, | |
Path("path/file.json"): { | |
" (2024_01_01 12_00_00 UTC)", | |
}, | |
} | |
actual = parse_file_versions(files) | |
assert actual == expected | |
class FakeIO: | |
def __init__(self, file_versions: Sequence[PathLike]): | |
self.file_versions = file_versions | |
self.actions = list() | |
def read(self, path: PathLike) -> Iterable[PathLike]: | |
return self.file_versions | |
def rename(self, source: PathLike, dest: PathLike): | |
self.actions.append(("RENAME", source, dest)) | |
def delete(self, dest: PathLike): | |
self.actions.append(("DELETE", dest)) | |
def test_dry_run_logs_actions_only(caplog: pytest.LogCaptureFixture): | |
"""Integration test for dry run mode, ensuring no actions are executed.""" | |
files = [ | |
"path/file (2024_01_01 12_00_00 UTC).txt", | |
"path/file (2024_01_02 12_00_00 UTC).txt", | |
"path/file (2024_01_03 12_00_00 UTC).txt", | |
"path/file2.txt", | |
] | |
fake_io = FakeIO(files) | |
expected = [] | |
with caplog.at_level(logging.INFO, logger="restore_from_history"): | |
main(dry_run=True, io=fake_io) | |
assert fake_io.actions == expected | |
assert "2024_01_03" in caplog.text # Selected version | |
assert "2024_01_02" not in caplog.text # Not mentioned at INFO level | |
assert "2024_01_01" not in caplog.text | |
def test_e2e(): | |
"""Integration test for renaming and deleting correctly.""" | |
files = [ | |
"path/file (2024_01_01 12_00_00 UTC).txt", | |
"path/file (2024_01_02 12_00_00 UTC).txt", | |
"path/file (2024_01_03 12_00_00 UTC).txt", | |
"path/file2.txt", | |
] | |
fake_io = FakeIO(files) | |
expected = [ | |
( | |
"RENAME", | |
Path("path/file (2024_01_03 12_00_00 UTC).txt"), | |
Path("path/file.txt"), | |
), | |
("DELETE", Path("path/file (2024_01_01 12_00_00 UTC).txt")), | |
("DELETE", Path("path/file (2024_01_02 12_00_00 UTC).txt")), | |
] | |
main(io=fake_io) | |
assert fake_io.actions == expected | |
@app.meta.default | |
def meta_default( | |
*tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)], | |
verbose: Literal[0, 1, 2, 3] = 2, | |
) -> int: | |
"""CLI wrapper to control verbosity and logging. | |
Args: | |
verbose: Verbosity level: 0 (ERROR), 1 (WARNING), 2 (INFO), 3 (DEBUG). | |
""" | |
command_name = app.parse_commands()[0] | |
if command_name != ("test",): | |
logging.basicConfig( | |
level=40 - (verbose * 10), | |
format="%(asctime)s | %(name)s | %(levelname)-8s | %(message)s", | |
) | |
try: | |
return app(tokens) | |
except KeyboardInterrupt: | |
logger.info("Interrupted by user.") | |
return 42 | |
if __name__ == "__main__": | |
# Setup logging only if not running tests | |
sys.exit(app.meta()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment