Skip to content

Instantly share code, notes, and snippets.

@deinspanjer
Last active March 28, 2026 19:03
Show Gist options
  • Select an option

  • Save deinspanjer/3756e44f1e997d22ea899e87149a42ee to your computer and use it in GitHub Desktop.

Select an option

Save deinspanjer/3756e44f1e997d22ea899e87149a42ee to your computer and use it in GitHub Desktop.
Filter a 1PUX export to selected vaults

1PUX Vault Filter

Filter a .1pux export down to one or more selected vaults and write a new .1pux containing only those vaults.

Quickstart:

python3 filter_1pux_vault.py /path/to/export.1pux

To choose vaults without the interactive prompt:

python3 filter_1pux_vault.py /path/to/export.1pux --vault-names "TeachingStrategies,Team Vault"

Useful flags:

  • --no-compress: keep only the unpacked output folder
  • --keep-output-folder: keep the unpacked folder after creating the new .1pux
  • --overwrite: replace existing output
#!/usr/bin/env python3
"""Filter a 1PUX export to selected vault(s)."""
from __future__ import annotations
import argparse
import json
import re
import shutil
import sys
import tempfile
import zipfile
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterator
ONEPASSWORD_PREFIX = "1PasswordExport-"
@dataclass(frozen=True)
class VaultEntry:
ordinal: int
account_index: int
vault_index: int
name: str
item_count: int
file_count: int
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Filter a 1PUX export to selected vault(s).")
parser.add_argument("input_path", type=Path, help="Path to .1pux file or unpacked 1PUX folder.")
parser.add_argument(
"--vault-names",
help="Comma-separated vault names to keep. If omitted, choose interactively by ordinal.",
)
parser.add_argument(
"--keep-output-folder",
action="store_true",
help="Keep unpacked output folder after compression.",
)
parser.add_argument(
"--no-compress",
action="store_true",
help="Skip .1pux compression and keep unpacked output folder.",
)
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output.")
return parser.parse_args()
def load_json(path: Path) -> Any:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def write_json(path: Path, data: Any) -> None:
with path.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
def ensure_writable(path: Path, overwrite: bool) -> None:
if not path.exists():
return
if not overwrite:
raise FileExistsError(f"{path} already exists. Use --overwrite to replace it.")
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
def resolve_under(base: Path, relative: str | Path) -> Path | None:
candidate = (base / relative).resolve(strict=False)
try:
candidate.relative_to(base.resolve())
except ValueError:
return None
return candidate
@contextmanager
def open_source_dir(input_path: Path) -> Iterator[Path]:
if input_path.is_dir():
yield input_path
return
if not zipfile.is_zipfile(input_path):
raise ValueError(f"Input file is not a valid zip/.1pux archive: {input_path}")
with tempfile.TemporaryDirectory(prefix="onepux-extract-") as td:
extracted = Path(td) / "input"
extracted.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(input_path, mode="r") as zf:
for member in zf.infolist():
target = resolve_under(extracted, member.filename)
if target is None:
raise ValueError(f"Archive member escapes extraction directory: {member.filename}")
if member.is_dir():
target.mkdir(parents=True, exist_ok=True)
continue
target.parent.mkdir(parents=True, exist_ok=True)
with zf.open(member) as src, target.open("wb") as dst:
shutil.copyfileobj(src, dst)
yield extracted
def validate_1pux_dir(path: Path) -> tuple[dict[str, Any], dict[str, Any]]:
attrs_path = path / "export.attributes"
data_path = path / "export.data"
files_dir = path / "files"
if not attrs_path.is_file() or not data_path.is_file() or not files_dir.is_dir():
raise ValueError(
f"Invalid 1PUX structure at {path}. Expected export.attributes, export.data, and files/."
)
attrs = load_json(attrs_path)
data = load_json(data_path)
if not isinstance(attrs, dict) or not isinstance(data, dict) or not isinstance(data.get("accounts"), list):
raise ValueError("Invalid 1PUX JSON shape.")
return attrs, data
def collect_file_refs(node: Any, refs: set[str], document_ids: set[str]) -> None:
if isinstance(node, dict):
for key, value in node.items():
if key == "documentId" and isinstance(value, str) and value:
document_ids.add(value)
elif key in {"path", "avatar"} and isinstance(value, str) and value:
if not value.lower().startswith(("http://", "https://")):
refs.add(value)
collect_file_refs(value, refs, document_ids)
elif isinstance(node, list):
for item in node:
collect_file_refs(item, refs, document_ids)
def matching_file_paths(files_dir: Path, node: Any) -> set[str]:
refs: set[str] = set()
doc_ids: set[str] = set()
collect_file_refs(node, refs, doc_ids)
matched: set[str] = set()
files_root = files_dir.resolve()
for ref in refs:
p = resolve_under(files_root, ref)
if p and p.is_file():
matched.add(p.relative_to(files_root).as_posix())
if doc_ids and files_dir.is_dir():
for p in files_dir.rglob("*"):
if p.is_file() and any(p.name.startswith(f"{doc_id}___") for doc_id in doc_ids):
matched.add(p.relative_to(files_root).as_posix())
return matched
def discover_vaults(data: dict[str, Any], files_dir: Path) -> list[VaultEntry]:
entries: list[VaultEntry] = []
ordinal = 1
for ai, account in enumerate(data.get("accounts", [])):
if not isinstance(account, dict):
continue
vaults = account.get("vaults", [])
if not isinstance(vaults, list):
continue
for vi, vault in enumerate(vaults):
if not isinstance(vault, dict):
continue
attrs = vault.get("attrs", {})
name = attrs.get("name") if isinstance(attrs, dict) and isinstance(attrs.get("name"), str) else ""
items = vault.get("items", [])
item_count = len(items) if isinstance(items, list) else 0
file_count = len(matching_file_paths(files_dir, vault))
entries.append(
VaultEntry(
ordinal=ordinal,
account_index=ai,
vault_index=vi,
name=name or "(unnamed)",
item_count=item_count,
file_count=file_count,
)
)
ordinal += 1
return entries
def parse_ordinal_csv(raw: str, max_ordinal: int) -> list[int]:
values = [v.strip() for v in raw.split(",") if v.strip()]
if not values:
raise ValueError("No ordinals provided.")
ordinals: list[int] = []
for value in values:
if not value.isdigit():
raise ValueError(f"Invalid ordinal '{value}'.")
n = int(value)
if n < 1 or n > max_ordinal:
raise ValueError(f"Ordinal out of range: {n} (valid: 1..{max_ordinal}).")
ordinals.append(n)
return sorted(set(ordinals))
def prompt_for_ordinals(entries: list[VaultEntry]) -> list[int]:
if not sys.stdin.isatty():
raise RuntimeError("No --vault-names provided and interactive input is unavailable.")
oh, nh, ih, fh = "ordinal", "vault name", "# items", "# files"
ow = max(len(oh), max(len(str(e.ordinal)) for e in entries))
nw = max(len(nh), max(len(e.name) for e in entries))
iw = max(len(ih), max(len(str(e.item_count)) for e in entries))
fw = max(len(fh), max(len(str(e.file_count)) for e in entries))
print("Available vaults")
print(f"{oh:>{ow}} | {nh:<{nw}} | {ih:>{iw}} | {fh:>{fw}}")
print(f"{'-' * ow}-+-{'-' * nw}-+-{'-' * iw}-+-{'-' * fw}")
for e in entries:
print(f"{e.ordinal:>{ow}} | {e.name:<{nw}} | {e.item_count:>{iw}} | {e.file_count:>{fw}}")
print()
while True:
try:
return parse_ordinal_csv(input("Enter comma-separated ordinals to keep: ").strip(), entries[-1].ordinal)
except ValueError as exc:
print(f"Invalid selection: {exc}")
def select_vaults(entries: list[VaultEntry], vault_names_csv: str | None) -> list[VaultEntry]:
if not entries:
raise RuntimeError("No vaults found in export.data.")
if vault_names_csv:
requested = [s.strip() for s in vault_names_csv.split(",") if s.strip()]
if not requested:
raise ValueError("No valid vault names provided in --vault-names.")
requested_set = set(requested)
selected = [e for e in entries if e.name in requested_set]
if not selected:
available = ", ".join(sorted({e.name for e in entries}))
raise RuntimeError(
f"None of the requested vault names were found. Requested: {', '.join(requested)}. "
f"Available: {available}"
)
return selected
selected_ordinals = set(prompt_for_ordinals(entries))
return [e for e in entries if e.ordinal in selected_ordinals]
def sanitize_name_for_path(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9]+", "_", value).strip("_")
return cleaned or "Vault"
def build_output_tag(names: list[str]) -> str:
unique: list[str] = []
seen: set[str] = set()
for name in names:
if name not in seen:
seen.add(name)
unique.append(name)
return "+".join(sanitize_name_for_path(name) for name in unique)
def derive_output_paths(input_path: Path, output_tag: str) -> tuple[Path, Path]:
parent = input_path.parent
base = input_path.name if input_path.is_dir() else input_path.stem
if base.startswith(ONEPASSWORD_PREFIX):
suffix = base[len(ONEPASSWORD_PREFIX) :]
out_base = f"{ONEPASSWORD_PREFIX}{output_tag}" + (f"-{suffix}" if suffix else "")
else:
out_base = f"{base}-{output_tag}" if base else output_tag
out_dir = (parent / out_base).resolve()
out_archive = (parent / f"{out_base}.1pux").resolve()
return out_dir, out_archive
def filter_export_data(data: dict[str, Any], selected_keys: set[tuple[int, int]]) -> tuple[dict[str, Any], dict[str, int]]:
total_accounts = total_vaults = total_items = 0
kept_accounts = kept_vaults = kept_items = 0
filtered_accounts: list[dict[str, Any]] = []
for ai, account in enumerate(data.get("accounts", [])):
if not isinstance(account, dict):
continue
total_accounts += 1
vaults = account.get("vaults", [])
if not isinstance(vaults, list):
continue
total_vaults += len(vaults)
kept: list[dict[str, Any]] = []
for vi, vault in enumerate(vaults):
if not isinstance(vault, dict):
continue
items = vault.get("items", [])
if isinstance(items, list):
total_items += len(items)
if (ai, vi) in selected_keys:
kept.append(vault)
if isinstance(items, list):
kept_items += len(items)
if kept:
account_copy = dict(account)
account_copy["vaults"] = kept
filtered_accounts.append(account_copy)
kept_accounts += 1
kept_vaults += len(kept)
out = dict(data)
out["accounts"] = filtered_accounts
stats = {
"total_accounts": total_accounts,
"total_vaults": total_vaults,
"total_items": total_items,
"kept_accounts": kept_accounts,
"kept_vaults": kept_vaults,
"kept_items": kept_items,
}
return out, stats
def write_filtered_export(
source_dir: Path,
attrs: dict[str, Any],
filtered_data: dict[str, Any],
output_dir: Path,
overwrite: bool,
) -> int:
ensure_writable(output_dir, overwrite)
out_files = output_dir / "files"
output_dir.mkdir(parents=True, exist_ok=True)
out_files.mkdir(parents=True, exist_ok=True)
write_json(output_dir / "export.attributes", attrs)
write_json(output_dir / "export.data", filtered_data)
copied = 0
src_files = source_dir / "files"
for rel in sorted(matching_file_paths(src_files, filtered_data)):
src = src_files / rel
if not src.is_file():
continue
dst = out_files / rel
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
copied += 1
return copied
def create_1pux_archive(source_dir: Path, output_archive: Path) -> None:
with zipfile.ZipFile(output_archive, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in sorted(source_dir.rglob("*")):
if p.is_file():
zf.write(p, arcname=p.relative_to(source_dir).as_posix())
def main() -> int:
args = parse_args()
input_path = args.input_path.resolve()
if not input_path.exists():
raise FileNotFoundError(f"Input path does not exist: {input_path}")
keep_folder = args.keep_output_folder or args.no_compress
compress = not args.no_compress
with open_source_dir(input_path) as source_dir:
attrs, data = validate_1pux_dir(source_dir)
entries = discover_vaults(data, source_dir / "files")
selected = select_vaults(entries, args.vault_names)
selected_keys = {(e.account_index, e.vault_index) for e in selected}
selected_ordinals = [e.ordinal for e in selected]
selected_names = [e.name for e in selected]
out_dir, out_archive = derive_output_paths(input_path, build_output_tag(selected_names))
if input_path.is_dir() and out_dir == input_path:
raise RuntimeError("Refusing to write output folder over input folder.")
if input_path.is_file() and out_archive == input_path:
raise RuntimeError("Refusing to overwrite input archive path.")
if compress:
ensure_writable(out_archive, args.overwrite)
filtered_data, stats = filter_export_data(data, selected_keys)
copied_files = write_filtered_export(source_dir, attrs, filtered_data, out_dir, args.overwrite)
compressed_path: Path | None = None
if compress:
create_1pux_archive(out_dir, out_archive)
compressed_path = out_archive
if not keep_folder:
shutil.rmtree(out_dir)
print("Done.\n")
print("Filter Summary")
print(f"- Selected ordinals: {', '.join(str(n) for n in selected_ordinals)}")
print(f"- Selected vault names: {', '.join(selected_names)}")
print(f"- Kept accounts: {stats['kept_accounts']}")
print(f"- Kept vaults: {stats['kept_vaults']}")
print(f"- Kept items: {stats['kept_items']}")
print(f"- Copied attachment files: {copied_files}\n")
print("Source Totals")
print(f"- Accounts: {stats['total_accounts']}")
print(f"- Vaults: {stats['total_vaults']}")
print(f"- Items: {stats['total_items']}\n")
print("Output")
print(f"- Compressed: {compressed_path if compressed_path else 'skipped (--no-compress)'}")
print(f"- Folder: {out_dir if keep_folder else 'removed after compression'}")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nCancelled.")
raise SystemExit(130)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment