|
#!/usr/bin/env python3 |
|
from __future__ import annotations |
|
from pathlib import Path |
|
from dataclasses import dataclass |
|
from subprocess import run, PIPE, DEVNULL |
|
import re |
|
import shutil |
|
|
|
import click |
|
from click import ClickException |
|
|
|
INPUT_FILE_PATTERN = re.compile("^(sms|calls)-(\d+)\.xml$") |
|
|
|
@dataclass |
|
class BackupEntry: |
|
date: int |
|
kind: str |
|
location: Path |
|
|
|
@dataclass |
|
class GitStatus: |
|
staged_changes: list[tuple[str, Path]] |
|
unstaged_changes: list[tuple[str, Path]] |
|
|
|
def only_has_unstaged_deletions(self, *, give_msg: bool=False) -> bool: |
|
if self.staged_changes: |
|
if give_msg: |
|
print("Repo has staged changes") |
|
return False |
|
for kind, file_path in self.unstaged_changes: |
|
if kind != 'D': |
|
if give_msg: |
|
print(f"File {str(file_path)!r} has changes of type {kind!r}") |
|
return False |
|
return True |
|
|
|
@staticmethod |
|
def parse(s: str) -> GitStatus: |
|
staged_changes = [] |
|
unstaged_changes = [] |
|
for line in s.splitlines(): |
|
assert line[2] == " ", repr(line) |
|
status_line = line[:2] |
|
file_name = line[3:] |
|
assert len(status_line) == 2, repr(line) |
|
if (kind := status_line[0]) != ' ': |
|
staged_changes.append((kind, Path(file_name))) |
|
if (kind := status_line[1]) != ' ': |
|
unstaged_changes.append((kind, Path(file_name))) |
|
return GitStatus(staged_changes=staged_changes, unstaged_changes=unstaged_changes) |
|
def run_git_status(repo: Path) -> GitStatus: |
|
assert repo.is_dir() |
|
proc = run(["git", "status", "--porcelain=v1"], check=True, |
|
cwd=str(repo), stdout=PIPE, encoding='utf8') |
|
return GitStatus.parse(proc.stdout) |
|
|
|
@click.command() |
|
@click.option('--input', '-i', type=click.Path(exists=True, file_okay=False), |
|
required=True, help="The location of the original input files") |
|
@click.option('--repo', '-r', type=click.Path(exists=False), |
|
required=True, help="Where the git repository should be ouptut") |
|
@click.option('--verbose/--quiet', '-v/-q', default=True, is_flag=True, help="Whether or not to display progress/status updates") |
|
@click.option('--allow-existing-entries', is_flag=True, help="Allow entries to already exist in the repo, completely unchanged") |
|
@click.option('--gc/--no-gc', 'git_gc', default=True, is_flag=True, help="Whether to run git gc on completion of the backup process") |
|
def sms_backup_repo(input: str, repo: str, verbose: bool, allow_existing_entries: bool, git_gc): |
|
"""Create a SMS Backup from a list of files as a Git Repository""" |
|
input_dir = Path(input) |
|
repo = Path(repo) |
|
assert input_dir.is_dir() |
|
entry_list = [] |
|
for input_file in input_dir.iterdir(): |
|
assert input_file.exists() |
|
match = INPUT_FILE_PATTERN.match(input_file.name) |
|
if match is None: |
|
raise ClickException(f"Unexpected input file: {input_file.name}") |
|
entry_list.append(BackupEntry( |
|
date=int(match.group(2)), |
|
kind=match.group(1), |
|
location=input_file |
|
)) |
|
entry_list.sort(key=lambda entry: entry.date) |
|
entries = {entry.date: list() for entry in entry_list} |
|
for entry in entry_list: |
|
entries[entry.date].append(entry) |
|
run(["git", "init", str(repo)], check=True) |
|
if git_gc: |
|
if verbose: |
|
print("NOTE: Configuring git-pack to use only a single thread") |
|
# NOTE: This avoids OOM in git-gc |
|
run(["git", "config", "pack.threads", "1"], cwd=repo, check=True) |
|
total_commits = 0 |
|
# TODO: Click progressbar? |
|
for index, (date, entries) in enumerate(entries.items()): |
|
if not run_git_status(repo).only_has_unstaged_deletions(give_msg=True): |
|
raise ClickException("Unclean repository") |
|
for entry in entries: |
|
shutil.copy(entry.location, repo) |
|
run(["git", "add", "."], cwd=repo, check=True) |
|
count = 0 |
|
status = run_git_status(repo) |
|
if status.unstaged_changes: |
|
print("Unexpected unstaged changes:", file=sys.stderr) |
|
for kind, f in status.unstaged_changes: |
|
print(f" {kind} -> {str(f)!r}", file=sys.stderr) |
|
sys.exit(1) |
|
for kind, f in status.staged_changes: |
|
if kind != "A": |
|
raise ClickException(f"Unexpected state {kind!r} for {f}") |
|
count += 1 |
|
if count == len(entries): |
|
pass # Everything was added |
|
elif allow_existing_entries: |
|
# Nothing was added -> make sure everything exists |
|
for entry in entries: |
|
rel = entry.location.relative_to(input_dir) |
|
loc = Path(repo, rel) |
|
if not loc.exists(): |
|
raise ClickException(f"Missing entry {entry}: Not added & doesn't already exist") |
|
elif verbose: |
|
print(f"WARN: Ignoring {rel}: Already exists") |
|
if count == 0: |
|
continue # Nothing to do -> continue outer loop |
|
else: |
|
raise AssertionError(f"Unexpected count {count} for {entries}") |
|
run(["git", "-c", "commit.gpgsign=false", "commit", |
|
"-m", f"Entries for {date}"], stdout=DEVNULL, cwd=str(repo), check=True) |
|
if verbose: |
|
ratio = index / len(entry_list) |
|
print(f"Committed {len(entries)} entries for {date} ({ratio:.2%})") |
|
total_commits += 1 |
|
print(f"Made {total_commits} commits for {len(entry_list)} entries") |
|
if git_gc: |
|
print(f"Running garbage collection on {repo}:") |
|
run(["git", "gc"], cwd=repo, check=True) |
|
|
|
if __name__ == "__main__": |
|
sms_backup_repo() |