Last active
August 13, 2025 19:04
-
-
Save chrowe/14956ad4f7d5ef2c04a6d77c695a0e5b to your computer and use it in GitHub Desktop.
Scripts to help with managing Aqueduct files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/data |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.9" | |
# dependencies = [ | |
# "py7zr", | |
# "fsspec", | |
# "aiohttp", | |
# "requests", | |
# ] | |
# /// | |
import sys | |
import argparse | |
from pathlib import Path | |
from tempfile import TemporaryDirectory | |
import shutil | |
import fsspec | |
import py7zr | |
from datetime import datetime | |
def human(n): | |
# simple humanize, base-1024 | |
for unit in ("", "K", "M", "G", "T"): | |
if n < 1024: | |
return f"{n:.0f}{unit}" | |
n /= 1024 | |
return f"{n:.0f}P" | |
def iso(dt): | |
if not dt: | |
return "" | |
# ensure consistent display | |
if isinstance(dt, datetime): | |
return dt.isoformat(sep=" ", timespec="seconds") | |
return str(dt) | |
def file_mtime(fi): | |
"""Best-effort to get a modification time from a py7zr file info object. | |
Different py7zr versions/archives may expose different fields. | |
""" | |
for attr in ( | |
"modified_time", # common | |
"writetime", | |
"write_time", | |
"mtime", | |
"datetime", | |
"creationtime", | |
"create_time", | |
): | |
v = getattr(fi, attr, None) | |
if v: | |
# Convert numeric epoch to datetime if encountered | |
if isinstance(v, (int, float)): | |
try: | |
return datetime.fromtimestamp(v) | |
except Exception: | |
pass | |
return v | |
return None | |
parser = argparse.ArgumentParser( | |
description="List a .7z archive (remote/local) or extract a single file.", | |
usage="%(prog)s <url> [--get <file>]", | |
) | |
parser.add_argument("url", metavar="<url>", help="URL or path to a .7z archive") | |
parser.add_argument("--get", dest="file", metavar="<file>", help="Extract a single file to the current directory (preserving its path)") | |
args = parser.parse_args() | |
url = args.url | |
# Single-file extraction path (avoid listing) | |
if args.file: | |
target = args.file | |
with fsspec.open(url, mode="rb") as fh: | |
with py7zr.SevenZipFile(fh, mode="r") as z: | |
# Validate presence (exact match preferred) | |
entries = [fi for fi in z.list() if not getattr(fi, "is_directory", False)] | |
names = [getattr(fi, "filename", "") for fi in entries] | |
if target not in names: | |
# Relax to unique basename match | |
candidates = [n for n in names if Path(n).name == Path(target).name] | |
if len(candidates) == 1: | |
target = candidates[0] | |
elif len(candidates) > 1: | |
print("Multiple entries match by basename; please specify full path:") | |
for n in candidates: | |
print(f" {n}") | |
sys.exit(2) | |
else: | |
print(f"File not found in archive: {args.file}") | |
sys.exit(2) | |
out_path = Path(target) | |
out_path.parent.mkdir(parents=True, exist_ok=True) | |
with TemporaryDirectory() as td: | |
try: | |
z.extract(path=td, targets=[target]) | |
except TypeError: | |
# Older py7zr signature may require positional path | |
z.extract(td, targets=[target]) | |
tmp_src = Path(td) / target | |
if not tmp_src.exists(): | |
# Fallback: search by basename under temp tree | |
matches = list(Path(td).rglob(Path(target).name)) | |
if not matches: | |
print(f"Extraction failed: {target} not found in temp dir") | |
sys.exit(2) | |
tmp_src = matches[0] | |
# Stream copy to output | |
with open(tmp_src, "rb") as rf, open(out_path, "wb") as wf: | |
shutil.copyfileobj(rf, wf, length=1024 * 1024) | |
print(f"Saved: {out_path} ({human(out_path.stat().st_size)})") | |
sys.exit(0) | |
# Header | |
print(f'Listing archive: {url}\n') | |
print(f'{"Modified":19} {"Size":>12} {"Packed":>12} Name') | |
print('-' * 64) | |
total_size = 0 | |
total_packed = 0 | |
count = 0 | |
with fsspec.open(url, mode="rb") as fh: | |
with py7zr.SevenZipFile(fh, mode="r") as z: | |
# py7zr.list() yields file info objects with attributes: | |
# filename, uncompressed, compressed, modified_time, is_directory, etc. | |
for fi in z.list(): | |
if getattr(fi, "is_directory", False): | |
continue | |
name = getattr(fi, "filename", "") | |
size = int(getattr(fi, "uncompressed", 0) or 0) | |
packed = int(getattr(fi, "compressed", 0) or 0) | |
mtime = file_mtime(fi) | |
print(f'{iso(mtime):19} {human(size):>12} {human(packed):>12} {name}') | |
total_size += size | |
total_packed += packed | |
count += 1 | |
print('-' * 64) | |
print(f'Files: {count} Total size: {round(total_size / 1024**3, 3)} GB Total packed: {round(total_packed / 1024**3, 3)} GB') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.9" | |
# dependencies = [ | |
# "requests", | |
# ] | |
# /// | |
import sys | |
import time | |
import argparse | |
import urllib.parse as urlparse | |
import xml.etree.ElementTree as ET | |
from collections import deque | |
import requests | |
DAV_NS = "{DAV:}" | |
PROPFIND_BODY = ( | |
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" | |
"<d:propfind xmlns:d=\"DAV:\">" | |
"<d:prop><d:resourcetype/><d:getcontentlength/><d:getlastmodified/><d:etag/></d:prop>" | |
"</d:propfind>" | |
) | |
def normalize_dir(url: str) -> str: | |
return url if url.endswith("/") else url + "/" | |
def same_tree(url: str, base: str) -> bool: | |
u = urlparse.urlsplit(url) | |
b = urlparse.urlsplit(base) | |
if (u.scheme, u.netloc) != (b.scheme, b.netloc): | |
return False | |
# Compare using unquoted (decoded) paths to ignore % encoding differences | |
u_path = normalize_dir(urlparse.unquote(u.path)) | |
b_path = normalize_dir(urlparse.unquote(b.path)) | |
return u_path.startswith(b_path) | |
def human_bytes(b: float) -> str: | |
b = float(b or 0) | |
units = ["B", "KB", "MB", "GB", "TB", "PB"] | |
i = 0 | |
while b >= 1024 and i < len(units) - 1: | |
b /= 1024 | |
i += 1 | |
return f"{b:.1f}{units[i]}" if i else f"{int(b)}{units[i]}" | |
def propfind(url: str, auth: tuple[str, str] | None = None) -> ET.Element: | |
headers = { | |
"Depth": "1", | |
"Accept": "application/xml", | |
"Content-Type": 'application/xml; charset="utf-8"', | |
} | |
r = requests.request("PROPFIND", url, headers=headers, data=PROPFIND_BODY, auth=auth) | |
r.raise_for_status() | |
# Some servers return bytes; ensure text for ET | |
text = r.text if isinstance(r.text, str) else r.content.decode("utf-8", "replace") | |
return ET.fromstring(text) | |
def crawl(base_url: str, auth: tuple[str, str] | None, delay: float = 0.0, max_pages: int | None = None): | |
base_url = normalize_dir(base_url) | |
q = deque([base_url]) | |
seen = set() | |
pages = 0 | |
while q: | |
cur = q.popleft() | |
if cur in seen: | |
continue | |
seen.add(cur) | |
try: | |
root = propfind(cur, auth=auth) | |
except Exception as e: | |
print(f"WARN: PROPFIND failed for {cur}: {e}", file=sys.stderr) | |
continue | |
for resp in root.findall(f".//{DAV_NS}response"): | |
href_el = resp.find(f"{DAV_NS}href") | |
if href_el is None or not href_el.text: | |
continue | |
abs_url = urlparse.urljoin(cur, href_el.text) | |
# stay in tree | |
if not same_tree(abs_url, base_url): | |
continue | |
rt = resp.find(f".//{DAV_NS}resourcetype") | |
is_dir = rt is not None and rt.find(f"{DAV_NS}collection") is not None | |
# many servers echo the current directory as an entry; skip it | |
# Compare on decoded paths to avoid % encoding mismatches | |
cur_path = normalize_dir(urlparse.unquote(urlparse.urlsplit(cur).path)).rstrip("/") | |
abs_path = normalize_dir(urlparse.unquote(urlparse.urlsplit(abs_url).path)).rstrip("/") | |
if abs_path == cur_path: | |
continue | |
if is_dir: | |
q.append(normalize_dir(abs_url)) | |
else: | |
size_el = resp.find(f".//{DAV_NS}getcontentlength") | |
mod_el = resp.find(f".//{DAV_NS}getlastmodified") | |
etag_el = resp.find(f".//{DAV_NS}etag") | |
yield { | |
"url": abs_url, | |
"size": int(size_el.text) if size_el is not None and size_el.text and size_el.text.isdigit() else size_el.text if size_el is not None else None, | |
"modified": mod_el.text if mod_el is not None else None, | |
"etag": etag_el.text if etag_el is not None else None, | |
} | |
pages += 1 | |
if max_pages is not None and pages >= max_pages: | |
break | |
if delay: | |
time.sleep(delay) | |
def main(argv: list[str]) -> int: | |
ap = argparse.ArgumentParser(description="Recursively list files from a WebDAV directory using PROPFIND Depth:1.") | |
ap.add_argument("url", help="Base WebDAV directory URL to start from") | |
ap.add_argument("--user", default=None, help="Username (if required)") | |
ap.add_argument("--password", default=None, help="Password (if required)") | |
ap.add_argument("--output", choices=["text", "json"], default="text", help="Output format") | |
ap.add_argument("--delay", type=float, default=0.0, help="Polite delay between requests") | |
ap.add_argument("--max-pages", type=int, default=None, help="Limit number of directory pages crawled") | |
args = ap.parse_args(argv) | |
base = args.url | |
if not base.startswith(("http://", "https://")): | |
print("ERROR: url must start with http:// or https://", file=sys.stderr) | |
return 2 | |
auth = None | |
if args.user is not None or args.password is not None: | |
auth = (args.user or "", args.password or "") | |
items = list(crawl(base, auth=auth, delay=args.delay, max_pages=args.max_pages)) | |
if args.output == "json": | |
import json | |
print(json.dumps({ | |
"base": normalize_dir(base), | |
"count": len(items), | |
"files": items, | |
}, indent=2)) | |
else: | |
for it in items: | |
size = it.get("size") | |
human = human_bytes(size) if isinstance(size, (int, float)) or (isinstance(size, str) and size.isdigit()) else str(size) | |
print(f"{human:>8}\t{it.get('url')}") | |
return 0 | |
if __name__ == "__main__": | |
raise SystemExit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment