Skip to content

Instantly share code, notes, and snippets.

@chrowe
Last active August 13, 2025 19:04
Show Gist options
  • Save chrowe/14956ad4f7d5ef2c04a6d77c695a0e5b to your computer and use it in GitHub Desktop.
Save chrowe/14956ad4f7d5ef2c04a6d77c695a0e5b to your computer and use it in GitHub Desktop.
Scripts to help with managing Aqueduct files

Aqueduct file management scripts

# /// script
# requires-python = ">=3.9"
# dependencies = [
# "py7zr",
# "fsspec",
# "aiohttp",
# "requests",
# ]
# ///
import sys
import argparse
from pathlib import Path
from tempfile import TemporaryDirectory
import shutil
import fsspec
import py7zr
from datetime import datetime
def human(n):
# simple humanize, base-1024
for unit in ("", "K", "M", "G", "T"):
if n < 1024:
return f"{n:.0f}{unit}"
n /= 1024
return f"{n:.0f}P"
def iso(dt):
if not dt:
return ""
# ensure consistent display
if isinstance(dt, datetime):
return dt.isoformat(sep=" ", timespec="seconds")
return str(dt)
def file_mtime(fi):
"""Best-effort to get a modification time from a py7zr file info object.
Different py7zr versions/archives may expose different fields.
"""
for attr in (
"modified_time", # common
"writetime",
"write_time",
"mtime",
"datetime",
"creationtime",
"create_time",
):
v = getattr(fi, attr, None)
if v:
# Convert numeric epoch to datetime if encountered
if isinstance(v, (int, float)):
try:
return datetime.fromtimestamp(v)
except Exception:
pass
return v
return None
parser = argparse.ArgumentParser(
description="List a .7z archive (remote/local) or extract a single file.",
usage="%(prog)s <url> [--get <file>]",
)
parser.add_argument("url", metavar="<url>", help="URL or path to a .7z archive")
parser.add_argument("--get", dest="file", metavar="<file>", help="Extract a single file to the current directory (preserving its path)")
args = parser.parse_args()
url = args.url
# Single-file extraction path (avoid listing)
if args.file:
target = args.file
with fsspec.open(url, mode="rb") as fh:
with py7zr.SevenZipFile(fh, mode="r") as z:
# Validate presence (exact match preferred)
entries = [fi for fi in z.list() if not getattr(fi, "is_directory", False)]
names = [getattr(fi, "filename", "") for fi in entries]
if target not in names:
# Relax to unique basename match
candidates = [n for n in names if Path(n).name == Path(target).name]
if len(candidates) == 1:
target = candidates[0]
elif len(candidates) > 1:
print("Multiple entries match by basename; please specify full path:")
for n in candidates:
print(f" {n}")
sys.exit(2)
else:
print(f"File not found in archive: {args.file}")
sys.exit(2)
out_path = Path(target)
out_path.parent.mkdir(parents=True, exist_ok=True)
with TemporaryDirectory() as td:
try:
z.extract(path=td, targets=[target])
except TypeError:
# Older py7zr signature may require positional path
z.extract(td, targets=[target])
tmp_src = Path(td) / target
if not tmp_src.exists():
# Fallback: search by basename under temp tree
matches = list(Path(td).rglob(Path(target).name))
if not matches:
print(f"Extraction failed: {target} not found in temp dir")
sys.exit(2)
tmp_src = matches[0]
# Stream copy to output
with open(tmp_src, "rb") as rf, open(out_path, "wb") as wf:
shutil.copyfileobj(rf, wf, length=1024 * 1024)
print(f"Saved: {out_path} ({human(out_path.stat().st_size)})")
sys.exit(0)
# Header
print(f'Listing archive: {url}\n')
print(f'{"Modified":19} {"Size":>12} {"Packed":>12} Name')
print('-' * 64)
total_size = 0
total_packed = 0
count = 0
with fsspec.open(url, mode="rb") as fh:
with py7zr.SevenZipFile(fh, mode="r") as z:
# py7zr.list() yields file info objects with attributes:
# filename, uncompressed, compressed, modified_time, is_directory, etc.
for fi in z.list():
if getattr(fi, "is_directory", False):
continue
name = getattr(fi, "filename", "")
size = int(getattr(fi, "uncompressed", 0) or 0)
packed = int(getattr(fi, "compressed", 0) or 0)
mtime = file_mtime(fi)
print(f'{iso(mtime):19} {human(size):>12} {human(packed):>12} {name}')
total_size += size
total_packed += packed
count += 1
print('-' * 64)
print(f'Files: {count} Total size: {round(total_size / 1024**3, 3)} GB Total packed: {round(total_packed / 1024**3, 3)} GB')
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "requests",
# ]
# ///
import sys
import time
import argparse
import urllib.parse as urlparse
import xml.etree.ElementTree as ET
from collections import deque
import requests
DAV_NS = "{DAV:}"
PROPFIND_BODY = (
"<?xml version=\"1.0\" encoding=\"utf-8\"?>"
"<d:propfind xmlns:d=\"DAV:\">"
"<d:prop><d:resourcetype/><d:getcontentlength/><d:getlastmodified/><d:etag/></d:prop>"
"</d:propfind>"
)
def normalize_dir(url: str) -> str:
return url if url.endswith("/") else url + "/"
def same_tree(url: str, base: str) -> bool:
u = urlparse.urlsplit(url)
b = urlparse.urlsplit(base)
if (u.scheme, u.netloc) != (b.scheme, b.netloc):
return False
# Compare using unquoted (decoded) paths to ignore % encoding differences
u_path = normalize_dir(urlparse.unquote(u.path))
b_path = normalize_dir(urlparse.unquote(b.path))
return u_path.startswith(b_path)
def human_bytes(b: float) -> str:
b = float(b or 0)
units = ["B", "KB", "MB", "GB", "TB", "PB"]
i = 0
while b >= 1024 and i < len(units) - 1:
b /= 1024
i += 1
return f"{b:.1f}{units[i]}" if i else f"{int(b)}{units[i]}"
def propfind(url: str, auth: tuple[str, str] | None = None) -> ET.Element:
headers = {
"Depth": "1",
"Accept": "application/xml",
"Content-Type": 'application/xml; charset="utf-8"',
}
r = requests.request("PROPFIND", url, headers=headers, data=PROPFIND_BODY, auth=auth)
r.raise_for_status()
# Some servers return bytes; ensure text for ET
text = r.text if isinstance(r.text, str) else r.content.decode("utf-8", "replace")
return ET.fromstring(text)
def crawl(base_url: str, auth: tuple[str, str] | None, delay: float = 0.0, max_pages: int | None = None):
base_url = normalize_dir(base_url)
q = deque([base_url])
seen = set()
pages = 0
while q:
cur = q.popleft()
if cur in seen:
continue
seen.add(cur)
try:
root = propfind(cur, auth=auth)
except Exception as e:
print(f"WARN: PROPFIND failed for {cur}: {e}", file=sys.stderr)
continue
for resp in root.findall(f".//{DAV_NS}response"):
href_el = resp.find(f"{DAV_NS}href")
if href_el is None or not href_el.text:
continue
abs_url = urlparse.urljoin(cur, href_el.text)
# stay in tree
if not same_tree(abs_url, base_url):
continue
rt = resp.find(f".//{DAV_NS}resourcetype")
is_dir = rt is not None and rt.find(f"{DAV_NS}collection") is not None
# many servers echo the current directory as an entry; skip it
# Compare on decoded paths to avoid % encoding mismatches
cur_path = normalize_dir(urlparse.unquote(urlparse.urlsplit(cur).path)).rstrip("/")
abs_path = normalize_dir(urlparse.unquote(urlparse.urlsplit(abs_url).path)).rstrip("/")
if abs_path == cur_path:
continue
if is_dir:
q.append(normalize_dir(abs_url))
else:
size_el = resp.find(f".//{DAV_NS}getcontentlength")
mod_el = resp.find(f".//{DAV_NS}getlastmodified")
etag_el = resp.find(f".//{DAV_NS}etag")
yield {
"url": abs_url,
"size": int(size_el.text) if size_el is not None and size_el.text and size_el.text.isdigit() else size_el.text if size_el is not None else None,
"modified": mod_el.text if mod_el is not None else None,
"etag": etag_el.text if etag_el is not None else None,
}
pages += 1
if max_pages is not None and pages >= max_pages:
break
if delay:
time.sleep(delay)
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Recursively list files from a WebDAV directory using PROPFIND Depth:1.")
ap.add_argument("url", help="Base WebDAV directory URL to start from")
ap.add_argument("--user", default=None, help="Username (if required)")
ap.add_argument("--password", default=None, help="Password (if required)")
ap.add_argument("--output", choices=["text", "json"], default="text", help="Output format")
ap.add_argument("--delay", type=float, default=0.0, help="Polite delay between requests")
ap.add_argument("--max-pages", type=int, default=None, help="Limit number of directory pages crawled")
args = ap.parse_args(argv)
base = args.url
if not base.startswith(("http://", "https://")):
print("ERROR: url must start with http:// or https://", file=sys.stderr)
return 2
auth = None
if args.user is not None or args.password is not None:
auth = (args.user or "", args.password or "")
items = list(crawl(base, auth=auth, delay=args.delay, max_pages=args.max_pages))
if args.output == "json":
import json
print(json.dumps({
"base": normalize_dir(base),
"count": len(items),
"files": items,
}, indent=2))
else:
for it in items:
size = it.get("size")
human = human_bytes(size) if isinstance(size, (int, float)) or (isinstance(size, str) and size.isdigit()) else str(size)
print(f"{human:>8}\t{it.get('url')}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment