|
#!/usr/bin/env python3 |
|
"""Filter a 1PUX export to selected vault(s).""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import json |
|
import re |
|
import shutil |
|
import sys |
|
import tempfile |
|
import zipfile |
|
from contextlib import contextmanager |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from typing import Any, Iterator |
|
|
|
ONEPASSWORD_PREFIX = "1PasswordExport-" |
|
|
|
|
|
@dataclass(frozen=True) |
|
class VaultEntry: |
|
ordinal: int |
|
account_index: int |
|
vault_index: int |
|
name: str |
|
item_count: int |
|
file_count: int |
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
parser = argparse.ArgumentParser(description="Filter a 1PUX export to selected vault(s).") |
|
parser.add_argument("input_path", type=Path, help="Path to .1pux file or unpacked 1PUX folder.") |
|
parser.add_argument( |
|
"--vault-names", |
|
help="Comma-separated vault names to keep. If omitted, choose interactively by ordinal.", |
|
) |
|
parser.add_argument( |
|
"--keep-output-folder", |
|
action="store_true", |
|
help="Keep unpacked output folder after compression.", |
|
) |
|
parser.add_argument( |
|
"--no-compress", |
|
action="store_true", |
|
help="Skip .1pux compression and keep unpacked output folder.", |
|
) |
|
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output.") |
|
return parser.parse_args() |
|
|
|
|
|
def load_json(path: Path) -> Any: |
|
with path.open("r", encoding="utf-8") as f: |
|
return json.load(f) |
|
|
|
|
|
def write_json(path: Path, data: Any) -> None: |
|
with path.open("w", encoding="utf-8") as f: |
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
f.write("\n") |
|
|
|
|
|
def ensure_writable(path: Path, overwrite: bool) -> None: |
|
if not path.exists(): |
|
return |
|
if not overwrite: |
|
raise FileExistsError(f"{path} already exists. Use --overwrite to replace it.") |
|
if path.is_dir(): |
|
shutil.rmtree(path) |
|
else: |
|
path.unlink() |
|
|
|
|
|
def resolve_under(base: Path, relative: str | Path) -> Path | None: |
|
candidate = (base / relative).resolve(strict=False) |
|
try: |
|
candidate.relative_to(base.resolve()) |
|
except ValueError: |
|
return None |
|
return candidate |
|
|
|
|
|
@contextmanager |
|
def open_source_dir(input_path: Path) -> Iterator[Path]: |
|
if input_path.is_dir(): |
|
yield input_path |
|
return |
|
if not zipfile.is_zipfile(input_path): |
|
raise ValueError(f"Input file is not a valid zip/.1pux archive: {input_path}") |
|
with tempfile.TemporaryDirectory(prefix="onepux-extract-") as td: |
|
extracted = Path(td) / "input" |
|
extracted.mkdir(parents=True, exist_ok=True) |
|
with zipfile.ZipFile(input_path, mode="r") as zf: |
|
for member in zf.infolist(): |
|
target = resolve_under(extracted, member.filename) |
|
if target is None: |
|
raise ValueError(f"Archive member escapes extraction directory: {member.filename}") |
|
if member.is_dir(): |
|
target.mkdir(parents=True, exist_ok=True) |
|
continue |
|
target.parent.mkdir(parents=True, exist_ok=True) |
|
with zf.open(member) as src, target.open("wb") as dst: |
|
shutil.copyfileobj(src, dst) |
|
yield extracted |
|
|
|
|
|
def validate_1pux_dir(path: Path) -> tuple[dict[str, Any], dict[str, Any]]: |
|
attrs_path = path / "export.attributes" |
|
data_path = path / "export.data" |
|
files_dir = path / "files" |
|
if not attrs_path.is_file() or not data_path.is_file() or not files_dir.is_dir(): |
|
raise ValueError( |
|
f"Invalid 1PUX structure at {path}. Expected export.attributes, export.data, and files/." |
|
) |
|
attrs = load_json(attrs_path) |
|
data = load_json(data_path) |
|
if not isinstance(attrs, dict) or not isinstance(data, dict) or not isinstance(data.get("accounts"), list): |
|
raise ValueError("Invalid 1PUX JSON shape.") |
|
return attrs, data |
|
|
|
|
|
def collect_file_refs(node: Any, refs: set[str], document_ids: set[str]) -> None: |
|
if isinstance(node, dict): |
|
for key, value in node.items(): |
|
if key == "documentId" and isinstance(value, str) and value: |
|
document_ids.add(value) |
|
elif key in {"path", "avatar"} and isinstance(value, str) and value: |
|
if not value.lower().startswith(("http://", "https://")): |
|
refs.add(value) |
|
collect_file_refs(value, refs, document_ids) |
|
elif isinstance(node, list): |
|
for item in node: |
|
collect_file_refs(item, refs, document_ids) |
|
|
|
|
|
def matching_file_paths(files_dir: Path, node: Any) -> set[str]: |
|
refs: set[str] = set() |
|
doc_ids: set[str] = set() |
|
collect_file_refs(node, refs, doc_ids) |
|
matched: set[str] = set() |
|
files_root = files_dir.resolve() |
|
|
|
for ref in refs: |
|
p = resolve_under(files_root, ref) |
|
if p and p.is_file(): |
|
matched.add(p.relative_to(files_root).as_posix()) |
|
|
|
if doc_ids and files_dir.is_dir(): |
|
for p in files_dir.rglob("*"): |
|
if p.is_file() and any(p.name.startswith(f"{doc_id}___") for doc_id in doc_ids): |
|
matched.add(p.relative_to(files_root).as_posix()) |
|
return matched |
|
|
|
|
|
def discover_vaults(data: dict[str, Any], files_dir: Path) -> list[VaultEntry]: |
|
entries: list[VaultEntry] = [] |
|
ordinal = 1 |
|
for ai, account in enumerate(data.get("accounts", [])): |
|
if not isinstance(account, dict): |
|
continue |
|
vaults = account.get("vaults", []) |
|
if not isinstance(vaults, list): |
|
continue |
|
for vi, vault in enumerate(vaults): |
|
if not isinstance(vault, dict): |
|
continue |
|
attrs = vault.get("attrs", {}) |
|
name = attrs.get("name") if isinstance(attrs, dict) and isinstance(attrs.get("name"), str) else "" |
|
items = vault.get("items", []) |
|
item_count = len(items) if isinstance(items, list) else 0 |
|
file_count = len(matching_file_paths(files_dir, vault)) |
|
entries.append( |
|
VaultEntry( |
|
ordinal=ordinal, |
|
account_index=ai, |
|
vault_index=vi, |
|
name=name or "(unnamed)", |
|
item_count=item_count, |
|
file_count=file_count, |
|
) |
|
) |
|
ordinal += 1 |
|
return entries |
|
|
|
|
|
def parse_ordinal_csv(raw: str, max_ordinal: int) -> list[int]: |
|
values = [v.strip() for v in raw.split(",") if v.strip()] |
|
if not values: |
|
raise ValueError("No ordinals provided.") |
|
ordinals: list[int] = [] |
|
for value in values: |
|
if not value.isdigit(): |
|
raise ValueError(f"Invalid ordinal '{value}'.") |
|
n = int(value) |
|
if n < 1 or n > max_ordinal: |
|
raise ValueError(f"Ordinal out of range: {n} (valid: 1..{max_ordinal}).") |
|
ordinals.append(n) |
|
return sorted(set(ordinals)) |
|
|
|
|
|
def prompt_for_ordinals(entries: list[VaultEntry]) -> list[int]: |
|
if not sys.stdin.isatty(): |
|
raise RuntimeError("No --vault-names provided and interactive input is unavailable.") |
|
|
|
oh, nh, ih, fh = "ordinal", "vault name", "# items", "# files" |
|
ow = max(len(oh), max(len(str(e.ordinal)) for e in entries)) |
|
nw = max(len(nh), max(len(e.name) for e in entries)) |
|
iw = max(len(ih), max(len(str(e.item_count)) for e in entries)) |
|
fw = max(len(fh), max(len(str(e.file_count)) for e in entries)) |
|
|
|
print("Available vaults") |
|
print(f"{oh:>{ow}} | {nh:<{nw}} | {ih:>{iw}} | {fh:>{fw}}") |
|
print(f"{'-' * ow}-+-{'-' * nw}-+-{'-' * iw}-+-{'-' * fw}") |
|
for e in entries: |
|
print(f"{e.ordinal:>{ow}} | {e.name:<{nw}} | {e.item_count:>{iw}} | {e.file_count:>{fw}}") |
|
print() |
|
|
|
while True: |
|
try: |
|
return parse_ordinal_csv(input("Enter comma-separated ordinals to keep: ").strip(), entries[-1].ordinal) |
|
except ValueError as exc: |
|
print(f"Invalid selection: {exc}") |
|
|
|
|
|
def select_vaults(entries: list[VaultEntry], vault_names_csv: str | None) -> list[VaultEntry]: |
|
if not entries: |
|
raise RuntimeError("No vaults found in export.data.") |
|
|
|
if vault_names_csv: |
|
requested = [s.strip() for s in vault_names_csv.split(",") if s.strip()] |
|
if not requested: |
|
raise ValueError("No valid vault names provided in --vault-names.") |
|
requested_set = set(requested) |
|
selected = [e for e in entries if e.name in requested_set] |
|
if not selected: |
|
available = ", ".join(sorted({e.name for e in entries})) |
|
raise RuntimeError( |
|
f"None of the requested vault names were found. Requested: {', '.join(requested)}. " |
|
f"Available: {available}" |
|
) |
|
return selected |
|
|
|
selected_ordinals = set(prompt_for_ordinals(entries)) |
|
return [e for e in entries if e.ordinal in selected_ordinals] |
|
|
|
|
|
def sanitize_name_for_path(value: str) -> str: |
|
cleaned = re.sub(r"[^A-Za-z0-9]+", "_", value).strip("_") |
|
return cleaned or "Vault" |
|
|
|
|
|
def build_output_tag(names: list[str]) -> str: |
|
unique: list[str] = [] |
|
seen: set[str] = set() |
|
for name in names: |
|
if name not in seen: |
|
seen.add(name) |
|
unique.append(name) |
|
return "+".join(sanitize_name_for_path(name) for name in unique) |
|
|
|
|
|
def derive_output_paths(input_path: Path, output_tag: str) -> tuple[Path, Path]: |
|
parent = input_path.parent |
|
base = input_path.name if input_path.is_dir() else input_path.stem |
|
if base.startswith(ONEPASSWORD_PREFIX): |
|
suffix = base[len(ONEPASSWORD_PREFIX) :] |
|
out_base = f"{ONEPASSWORD_PREFIX}{output_tag}" + (f"-{suffix}" if suffix else "") |
|
else: |
|
out_base = f"{base}-{output_tag}" if base else output_tag |
|
out_dir = (parent / out_base).resolve() |
|
out_archive = (parent / f"{out_base}.1pux").resolve() |
|
return out_dir, out_archive |
|
|
|
|
|
def filter_export_data(data: dict[str, Any], selected_keys: set[tuple[int, int]]) -> tuple[dict[str, Any], dict[str, int]]: |
|
total_accounts = total_vaults = total_items = 0 |
|
kept_accounts = kept_vaults = kept_items = 0 |
|
filtered_accounts: list[dict[str, Any]] = [] |
|
|
|
for ai, account in enumerate(data.get("accounts", [])): |
|
if not isinstance(account, dict): |
|
continue |
|
total_accounts += 1 |
|
vaults = account.get("vaults", []) |
|
if not isinstance(vaults, list): |
|
continue |
|
total_vaults += len(vaults) |
|
|
|
kept: list[dict[str, Any]] = [] |
|
for vi, vault in enumerate(vaults): |
|
if not isinstance(vault, dict): |
|
continue |
|
items = vault.get("items", []) |
|
if isinstance(items, list): |
|
total_items += len(items) |
|
if (ai, vi) in selected_keys: |
|
kept.append(vault) |
|
if isinstance(items, list): |
|
kept_items += len(items) |
|
|
|
if kept: |
|
account_copy = dict(account) |
|
account_copy["vaults"] = kept |
|
filtered_accounts.append(account_copy) |
|
kept_accounts += 1 |
|
kept_vaults += len(kept) |
|
|
|
out = dict(data) |
|
out["accounts"] = filtered_accounts |
|
stats = { |
|
"total_accounts": total_accounts, |
|
"total_vaults": total_vaults, |
|
"total_items": total_items, |
|
"kept_accounts": kept_accounts, |
|
"kept_vaults": kept_vaults, |
|
"kept_items": kept_items, |
|
} |
|
return out, stats |
|
|
|
|
|
def write_filtered_export( |
|
source_dir: Path, |
|
attrs: dict[str, Any], |
|
filtered_data: dict[str, Any], |
|
output_dir: Path, |
|
overwrite: bool, |
|
) -> int: |
|
ensure_writable(output_dir, overwrite) |
|
out_files = output_dir / "files" |
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
out_files.mkdir(parents=True, exist_ok=True) |
|
|
|
write_json(output_dir / "export.attributes", attrs) |
|
write_json(output_dir / "export.data", filtered_data) |
|
|
|
copied = 0 |
|
src_files = source_dir / "files" |
|
for rel in sorted(matching_file_paths(src_files, filtered_data)): |
|
src = src_files / rel |
|
if not src.is_file(): |
|
continue |
|
dst = out_files / rel |
|
dst.parent.mkdir(parents=True, exist_ok=True) |
|
shutil.copy2(src, dst) |
|
copied += 1 |
|
return copied |
|
|
|
|
|
def create_1pux_archive(source_dir: Path, output_archive: Path) -> None: |
|
with zipfile.ZipFile(output_archive, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: |
|
for p in sorted(source_dir.rglob("*")): |
|
if p.is_file(): |
|
zf.write(p, arcname=p.relative_to(source_dir).as_posix()) |
|
|
|
|
|
def main() -> int: |
|
args = parse_args() |
|
input_path = args.input_path.resolve() |
|
if not input_path.exists(): |
|
raise FileNotFoundError(f"Input path does not exist: {input_path}") |
|
|
|
keep_folder = args.keep_output_folder or args.no_compress |
|
compress = not args.no_compress |
|
|
|
with open_source_dir(input_path) as source_dir: |
|
attrs, data = validate_1pux_dir(source_dir) |
|
entries = discover_vaults(data, source_dir / "files") |
|
selected = select_vaults(entries, args.vault_names) |
|
|
|
selected_keys = {(e.account_index, e.vault_index) for e in selected} |
|
selected_ordinals = [e.ordinal for e in selected] |
|
selected_names = [e.name for e in selected] |
|
|
|
out_dir, out_archive = derive_output_paths(input_path, build_output_tag(selected_names)) |
|
if input_path.is_dir() and out_dir == input_path: |
|
raise RuntimeError("Refusing to write output folder over input folder.") |
|
if input_path.is_file() and out_archive == input_path: |
|
raise RuntimeError("Refusing to overwrite input archive path.") |
|
if compress: |
|
ensure_writable(out_archive, args.overwrite) |
|
|
|
filtered_data, stats = filter_export_data(data, selected_keys) |
|
copied_files = write_filtered_export(source_dir, attrs, filtered_data, out_dir, args.overwrite) |
|
|
|
compressed_path: Path | None = None |
|
if compress: |
|
create_1pux_archive(out_dir, out_archive) |
|
compressed_path = out_archive |
|
if not keep_folder: |
|
shutil.rmtree(out_dir) |
|
|
|
print("Done.\n") |
|
print("Filter Summary") |
|
print(f"- Selected ordinals: {', '.join(str(n) for n in selected_ordinals)}") |
|
print(f"- Selected vault names: {', '.join(selected_names)}") |
|
print(f"- Kept accounts: {stats['kept_accounts']}") |
|
print(f"- Kept vaults: {stats['kept_vaults']}") |
|
print(f"- Kept items: {stats['kept_items']}") |
|
print(f"- Copied attachment files: {copied_files}\n") |
|
print("Source Totals") |
|
print(f"- Accounts: {stats['total_accounts']}") |
|
print(f"- Vaults: {stats['total_vaults']}") |
|
print(f"- Items: {stats['total_items']}\n") |
|
print("Output") |
|
print(f"- Compressed: {compressed_path if compressed_path else 'skipped (--no-compress)'}") |
|
print(f"- Folder: {out_dir if keep_folder else 'removed after compression'}") |
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
raise SystemExit(main()) |
|
except KeyboardInterrupt: |
|
print("\nCancelled.") |
|
raise SystemExit(130) |