Created
February 23, 2026 18:27
-
-
Save mrexodia/eab11a65425915bbc875dbad2c7d758e to your computer and use it in GitHub Desktop.
Gitlab Pipeline Utilities
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Fetch CI pipeline job logs and artifacts locally. | |
| Usage: | |
| # Default: latest pipeline for current branch | |
| # Logs for all jobs, artifacts for failed jobs only | |
| python3 tools/ci/fetch_pipeline.py | |
| # Specific pipeline | |
| python3 tools/ci/fetch_pipeline.py -p 37748 | |
| # Specific branch | |
| python3 tools/ci/fetch_pipeline.py -b main | |
| # Download only logs for failed jobs (skip artifact zips) | |
| python3 tools/ci/fetch_pipeline.py --logs-only | |
| # Download logs + artifacts for all jobs (not just failed) | |
| python3 tools/ci/fetch_pipeline.py --all-artifacts | |
| # List jobs without downloading | |
| python3 tools/ci/fetch_pipeline.py --list | |
| # Custom output directory | |
| python3 tools/ci/fetch_pipeline.py -o /tmp/my-pipeline | |
| Files are saved to: | |
| ci-artifacts/ # (or --output path) | |
| pipeline.json # full pipeline + job metadata | |
| {job_slug}/ | |
| job.log # job trace log (cleaned) | |
| artifacts/ # extracted artifact contents | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| import zipfile | |
| from pathlib import Path | |
| INTERACTIVE = sys.stderr.isatty() | |
| CHUNK_SIZE = 256 * 1024 # 256KB read chunks | |
| HIDE_CURSOR = "\033[?25l" | |
| SHOW_CURSOR = "\033[?25h" | |
| def fmt_size(nbytes): | |
| """Format byte count for display.""" | |
| if nbytes < 1024: | |
| return f"{nbytes}B" | |
| elif nbytes < 1024 * 1024: | |
| return f"{nbytes/1024:.1f}KB" | |
| elif nbytes < 1024 * 1024 * 1024: | |
| return f"{nbytes/1024/1024:.1f}MB" | |
| else: | |
| return f"{nbytes/1024/1024/1024:.2f}GB" | |
| def fmt_speed(bytes_per_sec): | |
| """Format download speed.""" | |
| return fmt_size(bytes_per_sec) + "/s" | |
| def fmt_eta(seconds): | |
| """Format seconds remaining as human-readable string.""" | |
| if seconds < 60: | |
| return f"{int(seconds)}s" | |
| elif seconds < 3600: | |
| return f"{int(seconds/60)}m{int(seconds%60):02d}s" | |
| else: | |
| return f"{int(seconds/3600)}h{int((seconds%3600)/60):02d}m" | |
| def progress_bar(received, total, speed, width=30): | |
| """Render a progress bar string.""" | |
| if total > 0: | |
| frac = min(received / total, 1.0) | |
| filled = int(width * frac) | |
| bar = "█" * filled + "░" * (width - filled) | |
| pct = f"{frac*100:5.1f}%" | |
| remaining = total - received | |
| eta = fmt_eta(remaining / speed) if speed > 0 else "?" | |
| return f" [{bar}] {pct} {fmt_size(received)}/{fmt_size(total)} {fmt_speed(speed)} ETA {eta}" | |
| else: | |
| return f" {fmt_size(received)} {fmt_speed(speed)}" | |
| def stream_to_file(cmd, dest, expected_size=0): | |
| """Run a command, streaming stdout to a file with progress. Returns bytes written.""" | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| received = 0 | |
| t_start = time.monotonic() | |
| last_update = 0 | |
| if INTERACTIVE: | |
| print(HIDE_CURSOR, end="", file=sys.stderr, flush=True) | |
| try: | |
| with open(dest, 'wb') as f: | |
| while True: | |
| chunk = proc.stdout.read(CHUNK_SIZE) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| received += len(chunk) | |
| now = time.monotonic() | |
| if INTERACTIVE and now - last_update >= 0.15: | |
| elapsed = now - t_start | |
| speed = received / elapsed if elapsed > 0 else 0 | |
| line = progress_bar(received, expected_size, speed) | |
| print(f"\r{line}", end="", file=sys.stderr, flush=True) | |
| last_update = now | |
| proc.wait() | |
| except KeyboardInterrupt: | |
| proc.kill() | |
| proc.wait() | |
| raise | |
| finally: | |
| if INTERACTIVE: | |
| print(f"\r{' '*100}\r{SHOW_CURSOR}", end="", file=sys.stderr, flush=True) | |
| if proc.returncode != 0: | |
| err = proc.stderr.read().decode() | |
| print(f" ERROR: {' '.join(cmd)}: {err.strip()}", file=sys.stderr) | |
| return None | |
| return received | |
| def stream_to_bytes(cmd): | |
| """Run a command, return stdout as bytes (for small responses like logs).""" | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| data = proc.stdout.read() | |
| proc.wait() | |
| if proc.returncode != 0: | |
| err = proc.stderr.read().decode() | |
| print(f" ERROR: {' '.join(cmd)}: {err.strip()}", file=sys.stderr) | |
| return None | |
| return data | |
| def run(cmd): | |
| """Run a command, return stdout as string.""" | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f" ERROR: {' '.join(cmd)}: {result.stderr.strip()}", file=sys.stderr) | |
| return None | |
| return result.stdout | |
| def get_pipeline(pipeline_id=None, branch=None): | |
| """Fetch pipeline with job details using glab ci get.""" | |
| cmd = ["glab", "ci", "get", "--with-job-details", "--output", "json"] | |
| if pipeline_id: | |
| cmd += ["--pipeline-id", str(pipeline_id)] | |
| elif branch: | |
| cmd += ["--branch", branch] | |
| result = run(cmd) | |
| if result is None: | |
| sys.exit(1) | |
| return json.loads(result) | |
| def get_merge_request_url(branch): | |
| """Look up an open MR for the given branch. Returns URL or None.""" | |
| result = run(["glab", "api", | |
| f"projects/:id/merge_requests?source_branch={branch}&state=opened"]) | |
| if result is None: | |
| return None | |
| mrs = json.loads(result) | |
| if mrs: | |
| return mrs[0].get('web_url') | |
| return None | |
| def slugify(name): | |
| """Turn a job name into a filesystem-safe slug.""" | |
| s = re.sub(r'[^a-zA-Z0-9_\-]', '_', name) | |
| s = re.sub(r'_+', '_', s).strip('_') | |
| return s[:80] | |
| def archive_size_for(job): | |
| """Return the archive artifact size in bytes, or 0.""" | |
| for a in job.get('artifacts', []): | |
| if a.get('file_type') == 'archive': | |
| return a.get('size', 0) | |
| return 0 | |
| def print_jobs(jobs, log_job_ids, artifact_job_ids): | |
| """Print a summary table of jobs.""" | |
| print(f"\n{'ID':>8} {'Status':<10} {'Artifacts':>12} {'Download':>10} Name") | |
| print(f"{'─'*8} {'─'*10} {'─'*12} {'─'*10} {'─'*50}") | |
| for j in jobs: | |
| size = archive_size_for(j) | |
| size_str = fmt_size(size) if size else "none" | |
| if j['id'] in artifact_job_ids: | |
| dl = "log+arts" | |
| elif j['id'] in log_job_ids: | |
| dl = "log" | |
| else: | |
| dl = "skip" | |
| print(f"{j['id']:>8} {j['status']:<10} {size_str:>12} {dl:>10} {j['name']}") | |
| def download_log(job, outdir): | |
| """Download the job trace log.""" | |
| log_path = outdir / "job.log" | |
| if log_path.exists(): | |
| print(f" log: already exists, skipping") | |
| return True | |
| data = stream_to_bytes(["glab", "api", f"projects/:id/jobs/{job['id']}/trace"]) | |
| if data is None: | |
| return False | |
| # Strip ANSI escape codes for readability | |
| text = data.decode('utf-8', errors='replace') | |
| text = re.sub(r'\x1b\[[0-9;]*m', '', text) | |
| # Strip CI timestamp prefixes (e.g. "2026-02-18T14:03:16.852092Z 01O ") | |
| text = re.sub(r'^\d{4}-\d{2}-\d{2}T[\d:.]+Z \d+[OE] ?', '', text, flags=re.MULTILINE) | |
| log_path.write_text(text) | |
| print(f" log: {fmt_size(len(data))}") | |
| return True | |
| def download_artifacts(job, outdir): | |
| """Download and extract the artifact archive.""" | |
| arts_dir = outdir / "artifacts" | |
| marker = outdir / ".artifacts_done" | |
| if marker.exists(): | |
| print(f" artifacts: already extracted, skipping") | |
| return True | |
| size = archive_size_for(job) | |
| if size == 0: | |
| print(f" artifacts: none") | |
| return True | |
| # Stream download to a temp file, then extract | |
| fd, tmp_zip = tempfile.mkstemp(suffix=".zip", dir=outdir) | |
| os.close(fd) | |
| try: | |
| print(f" artifacts: downloading {fmt_size(size)}...", flush=True) | |
| t_start = time.monotonic() | |
| received = stream_to_file( | |
| ["glab", "api", f"projects/:id/jobs/{job['id']}/artifacts"], | |
| tmp_zip, | |
| expected_size=size) | |
| if received is None: | |
| return False | |
| elapsed = time.monotonic() - t_start | |
| speed = received / elapsed if elapsed > 0 else 0 | |
| print(f" artifacts: downloaded {fmt_size(received)} in {elapsed:.1f}s ({fmt_speed(speed)})") | |
| print(f" artifacts: extracting...", flush=True) | |
| arts_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(tmp_zip) as zf: | |
| zf.extractall(arts_dir) | |
| count = len(zf.namelist()) | |
| marker.write_text("") | |
| print(f" artifacts: {count} files extracted") | |
| except zipfile.BadZipFile: | |
| print(f" artifacts: ERROR: bad zip file", file=sys.stderr) | |
| return False | |
| finally: | |
| if os.path.exists(tmp_zip): | |
| os.unlink(tmp_zip) | |
| return True | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Fetch CI pipeline logs and artifacts locally", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__) | |
| parser.add_argument("-p", "--pipeline-id", type=int, default=None, | |
| help="Pipeline ID (default: latest for current branch)") | |
| parser.add_argument("-b", "--branch", type=str, default=None, | |
| help="Branch name (default: current branch)") | |
| parser.add_argument("-o", "--output", type=str, default=None, | |
| help="Output directory (default: ci-artifacts/)") | |
| parser.add_argument("--all-artifacts", action="store_true", | |
| help="Download artifacts for all jobs, not just failed") | |
| parser.add_argument("--logs-only", action="store_true", | |
| help="Download only job logs, skip artifact archives") | |
| parser.add_argument("--list", action="store_true", | |
| help="List jobs and exit without downloading") | |
| args = parser.parse_args() | |
| # Check prerequisites | |
| if not shutil.which("glab"): | |
| print("Error: glab CLI not found.\n" | |
| "Install it from: https://gitlab.com/gitlab-org/cli/#installation", | |
| file=sys.stderr) | |
| sys.exit(1) | |
| result = subprocess.run(["glab", "auth", "status"], | |
| capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print("Error: glab is not authenticated.\n", | |
| file=sys.stderr) | |
| # glab auth status prints details to stderr | |
| print(result.stderr, file=sys.stderr) | |
| sys.exit(1) | |
| # Fetch pipeline (single call gets pipeline metadata + all jobs) | |
| what = f"pipeline {args.pipeline_id}" if args.pipeline_id \ | |
| else f"branch {args.branch}" if args.branch \ | |
| else "current branch" | |
| print(f"Fetching latest pipeline for {what}...") | |
| pipeline = get_pipeline(args.pipeline_id, args.branch) | |
| jobs = pipeline.get('jobs', []) | |
| print(f"Pipeline {pipeline['id']} ref={pipeline['ref']} " | |
| f"status={pipeline['status']} {len(jobs)} jobs\n" | |
| f"URL: {pipeline.get('web_url', '')}") | |
| mr_url = get_merge_request_url(pipeline['ref']) | |
| if mr_url: | |
| print(f"MR: {mr_url}") | |
| if not jobs: | |
| print("No jobs found.") | |
| sys.exit(0) | |
| # Determine which jobs to download | |
| failed_ids = {j['id'] for j in jobs if j['status'] == 'failed'} | |
| if args.logs_only: | |
| log_job_ids = failed_ids | |
| artifact_job_ids = set() | |
| elif args.all_artifacts: | |
| log_job_ids = {j['id'] for j in jobs} | |
| artifact_job_ids = {j['id'] for j in jobs} | |
| else: | |
| # Default: logs + artifacts for failed jobs only | |
| log_job_ids = failed_ids | |
| artifact_job_ids = failed_ids | |
| print_jobs(jobs, log_job_ids, artifact_job_ids) | |
| if args.list: | |
| sys.exit(0) | |
| # Setup output directory | |
| base = Path(args.output) if args.output else Path("ci-artifacts") | |
| base.mkdir(parents=True, exist_ok=True) | |
| # Check for existing pipeline data from a different pipeline | |
| meta_path = base / "pipeline.json" | |
| if meta_path.exists(): | |
| existing = json.loads(meta_path.read_text()) | |
| existing_id = existing.get('id') | |
| if existing_id != pipeline['id']: | |
| if INTERACTIVE: | |
| print(f"\n{base}/ already contains data for pipeline " | |
| f"{existing_id} (ref={existing.get('ref', '?')}).") | |
| answer = input(f"Delete and replace with pipeline {pipeline['id']}? [Y/n] ") | |
| if answer.strip().lower() in ('', 'y', 'yes'): | |
| shutil.rmtree(base) | |
| base.mkdir(parents=True, exist_ok=True) | |
| else: | |
| print("Aborted.") | |
| sys.exit(0) | |
| else: | |
| print(f"Error: {base}/ already contains data for pipeline " | |
| f"{existing_id} (ref={existing.get('ref', '?')}), " | |
| f"but requested pipeline {pipeline['id']}.\n" | |
| f"Delete the directory or use -o to specify a different path.", | |
| file=sys.stderr) | |
| sys.exit(1) | |
| # Ensure .gitignore exists so artifacts aren't committed | |
| gitignore = base / ".gitignore" | |
| if not gitignore.exists(): | |
| gitignore.write_text("*\n") | |
| # Save full pipeline + job metadata | |
| meta_path.write_text(json.dumps(pipeline, indent=2)) | |
| # Download each job | |
| total_arts = sum(archive_size_for(j) for j in jobs if j['id'] in artifact_job_ids) | |
| if total_arts: | |
| print(f"\nTotal artifact download: ~{fmt_size(total_arts)}") | |
| print(f"Downloading to {base}/\n") | |
| for j in jobs: | |
| if j['id'] not in log_job_ids: | |
| continue | |
| outdir = base / slugify(j['name']) | |
| outdir.mkdir(parents=True, exist_ok=True) | |
| print(f" [{j['status']}] {j['name']} (job {j['id']})") | |
| download_log(j, outdir) | |
| if j['id'] in artifact_job_ids: | |
| download_artifacts(j, outdir) | |
| print() | |
| print(f"Done. Results in {base}/") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| if INTERACTIVE: | |
| print(SHOW_CURSOR, end="", file=sys.stderr, flush=True) | |
| print("\nInterrupted.", file=sys.stderr) | |
| sys.exit(130) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| RemoteZip — stdlib-only fork. | |
| Original: https://github.com/gtsystem/python-remotezip | |
| Refactored to replace `requests` with `urllib.request` so the only | |
| dependency is the Python standard library. | |
| """ | |
| import io | |
| import time | |
| import zipfile | |
| from datetime import datetime | |
| from itertools import tee | |
| import urllib.request as urllib_request | |
| from urllib.parse import urlparse, parse_qs | |
| from urllib.request import Request, urlopen | |
| from urllib.error import HTTPError | |
| __all__ = ["RemoteIOError", "RemoteZip"] | |
| # ── Exceptions ──────────────────────────────────────────────────────────────── | |
| class RemoteZipError(Exception): | |
| pass | |
| class OutOfBound(RemoteZipError): | |
| pass | |
| class RemoteIOError(RemoteZipError): | |
| pass | |
| class RangeNotSupported(RemoteZipError): | |
| pass | |
| # ── PartialBuffer ───────────────────────────────────────────────────────────── | |
| class PartialBuffer: | |
| """Buffer-like wrapper around a byte range of a remote file.""" | |
| def __init__(self, buffer, offset, size, stream): | |
| self.buffer = buffer if stream else io.BytesIO(buffer.read()) | |
| self._offset = offset | |
| self._size = size | |
| self._position = offset | |
| self._stream = stream | |
| def __len__(self): | |
| return self._size | |
| def __repr__(self): | |
| return f"<PartialBuffer off={self._offset} size={self._size} stream={self._stream}>" | |
| def read(self, size=0): | |
| if size == 0: | |
| size = self._offset + self._size - self._position | |
| content = self.buffer.read(size) | |
| self._position = self._offset + self.buffer.tell() | |
| return content | |
| def close(self): | |
| if not self.buffer.closed: | |
| self.buffer.close() | |
| def tell(self): | |
| return self._position | |
| def seek(self, offset, whence): | |
| if whence == 2: | |
| self._position = self._size + self._offset + offset | |
| elif whence == 0: | |
| self._position = offset | |
| else: | |
| self._position += offset | |
| relative_position = self._position - self._offset | |
| if relative_position < 0 or relative_position >= self._size: | |
| raise OutOfBound("Position out of buffer bound") | |
| if self._stream: | |
| buff_pos = self.buffer.tell() | |
| if relative_position < buff_pos: | |
| raise OutOfBound("Negative seek not supported") | |
| skip_bytes = relative_position - buff_pos | |
| if skip_bytes == 0: | |
| return self._position | |
| self.buffer.read(skip_bytes) | |
| else: | |
| self.buffer.seek(relative_position) | |
| return self._position | |
| # ── RemoteIO ────────────────────────────────────────────────────────────────── | |
| class RemoteIO(io.IOBase): | |
| """File-like façade that fetches byte ranges on demand.""" | |
| def __init__(self, fetch_fun, initial_buffer_size=64 * 1024): | |
| self._fetch_fun = fetch_fun | |
| self._initial_buffer_size = initial_buffer_size | |
| self.buffer = None | |
| self._file_size = None | |
| self._seek_succeeded = False | |
| self._member_position_to_size = None | |
| self._last_member_pos = None | |
| def set_position_to_size(self, position_to_size): | |
| self._member_position_to_size = position_to_size | |
| def read(self, size=0): | |
| position = self.tell() | |
| if size == 0: | |
| size = self._file_size - position | |
| if not self._seek_succeeded: | |
| if self._member_position_to_size is None: | |
| fetch_size = size | |
| stream = False | |
| else: | |
| try: | |
| fetch_size = self._member_position_to_size[position] | |
| self._last_member_pos = position | |
| except KeyError: | |
| if self._last_member_pos and self._last_member_pos < position: | |
| fetch_size = self._member_position_to_size[self._last_member_pos] | |
| fetch_size -= position - self._last_member_pos | |
| else: | |
| raise OutOfBound( | |
| "Attempt to seek outside boundary of current zip member" | |
| ) | |
| stream = True | |
| self._seek_succeeded = True | |
| self.buffer.close() | |
| self.buffer = self._fetch_fun( | |
| (position, position + fetch_size - 1), stream=stream | |
| ) | |
| return self.buffer.read(size) | |
| def seekable(self): | |
| return True | |
| def seek(self, offset, whence=0): | |
| if whence == 2 and self._file_size is None: | |
| size = self._initial_buffer_size | |
| self.buffer = self._fetch_fun((-size, None), stream=False) | |
| self._file_size = len(self.buffer) + self.buffer.tell() | |
| try: | |
| pos = self.buffer.seek(offset, whence) | |
| self._seek_succeeded = True | |
| return pos | |
| except OutOfBound: | |
| self._seek_succeeded = False | |
| return self.tell() | |
| def tell(self): | |
| return self.buffer.tell() | |
| def close(self): | |
| if self.buffer: | |
| self.buffer.close() | |
| self.buffer = None | |
| # ── RemoteFetcher (stdlib) ──────────────────────────────────────────────────── | |
| class _NoRedirectHandler(urllib_request.HTTPRedirectHandler): | |
| """Intercept redirects so we can capture the final URL without following.""" | |
| def redirect_request(self, req, fp, code, msg, headers, newurl): | |
| raise _RedirectCaptured(newurl) | |
| class _RedirectCaptured(Exception): | |
| """Carries the redirect target URL.""" | |
| def __init__(self, url): | |
| self.url = url | |
| class RemoteFetcher: | |
| """Fetches byte ranges of a remote file using urllib.request. | |
| Handles the common pattern where the initial URL (e.g. GitLab artifacts) | |
| returns a 302 redirect to a signed object-storage URL (S3 / GCS / MinIO). | |
| The redirect is resolved once; all subsequent range requests go directly | |
| to the final URL *without* the original auth headers (the signature is | |
| already embedded in the query string). | |
| """ | |
| # Headers that should NOT be forwarded to the storage backend. | |
| _AUTH_HEADERS = frozenset( | |
| h.lower() | |
| for h in ( | |
| "PRIVATE-TOKEN", | |
| "Authorization", | |
| "JOB-TOKEN", | |
| "DEPLOY-TOKEN", | |
| ) | |
| ) | |
| def __init__( | |
| self, url, headers=None, support_suffix_range=True, follow_redirects=True, | |
| signed_url_margin=120, **kwargs | |
| ): | |
| self._initial_url = url | |
| self._headers = headers or {} | |
| self._support_suffix_range = support_suffix_range | |
| self._follow_redirects = follow_redirects | |
| self._signed_url_margin = signed_url_margin # seconds before expiry to re-resolve | |
| self._kwargs = kwargs | |
| # Resolved after the first request | |
| self._resolved_url = None | |
| self._resolved_at = None | |
| self._ttl = None # seconds until the signed URL expires | |
| self._is_redirected = False | |
| # ---- range header helpers ------------------------------------------------ | |
| @staticmethod | |
| def parse_range_header(content_range_header): | |
| range_part = content_range_header[6:].split("/")[0] | |
| if range_part.startswith("-"): | |
| return int(range_part), None | |
| range_min, range_max = range_part.split("-") | |
| return int(range_min), int(range_max) if range_max else None | |
| @staticmethod | |
| def build_range_header(range_min, range_max): | |
| if range_max is None: | |
| return "bytes=%s%s" % (range_min, "" if range_min < 0 else "-") | |
| return "bytes=%s-%s" % (range_min, range_max) | |
| # ---- redirect resolution ------------------------------------------------- | |
| @staticmethod | |
| def _parse_signed_url_ttl(url): | |
| """Extract the TTL (in seconds) from a signed S3/GCS URL. | |
| Looks for X-Amz-Expires (AWS) or X-Goog-Expires (GCS) query params. | |
| Returns None if not a signed URL. | |
| """ | |
| qs = parse_qs(urlparse(url).query) | |
| for key in ("X-Amz-Expires", "X-Goog-Expires"): | |
| vals = qs.get(key) | |
| if vals: | |
| try: | |
| return int(vals[0]) | |
| except (ValueError, IndexError): | |
| pass | |
| return None | |
| def _is_expired(self): | |
| """Check whether the resolved signed URL is about to expire.""" | |
| if self._resolved_at is None or self._ttl is None: | |
| return True | |
| elapsed = time.monotonic() - self._resolved_at | |
| return elapsed >= (self._ttl - self._signed_url_margin) | |
| def _resolve_redirect(self): | |
| """Follow the initial URL to discover the final (signed) URL. | |
| Re-resolves automatically when the signed URL is about to expire | |
| (based on X-Amz-Expires minus a safety margin). After resolution, | |
| all subsequent requests hit the final URL directly with auth headers | |
| stripped (they'd cause S3 to return 400). | |
| """ | |
| if self._resolved_url is not None and not self._is_expired(): | |
| return # still fresh | |
| if not self._follow_redirects: | |
| self._resolved_url = self._initial_url | |
| self._is_redirected = False | |
| return | |
| opener = urllib_request.build_opener(_NoRedirectHandler) | |
| req = Request(self._initial_url) | |
| for k, v in self._headers.items(): | |
| req.add_header(k, v) | |
| req.get_method = lambda: "HEAD" | |
| timeout = self._kwargs.get("timeout", 30) | |
| try: | |
| resp = opener.open(req, timeout=timeout) | |
| # No redirect — use the original URL as-is | |
| self._resolved_url = self._initial_url | |
| self._is_redirected = False | |
| self._ttl = None | |
| resp.close() | |
| except _RedirectCaptured as rc: | |
| self._resolved_url = rc.url | |
| self._is_redirected = True | |
| self._ttl = self._parse_signed_url_ttl(rc.url) | |
| except HTTPError as e: | |
| # Some servers return 302 as an HTTPError; check for Location header | |
| location = e.headers.get("Location") | |
| if location and e.code in (301, 302, 303, 307, 308): | |
| self._resolved_url = location | |
| self._is_redirected = True | |
| self._ttl = self._parse_signed_url_ttl(location) | |
| else: | |
| raise RemoteIOError(str(e)) | |
| self._resolved_at = time.monotonic() | |
| # ---- internal request helpers ------------------------------------------- | |
| def _build_request(self, extra_headers=None): | |
| self._resolve_redirect() | |
| req = Request(self._resolved_url) | |
| # Only attach auth headers if we're still hitting the original host | |
| for k, v in self._headers.items(): | |
| if self._is_redirected and k.lower() in self._AUTH_HEADERS: | |
| continue | |
| req.add_header(k, v) | |
| if extra_headers: | |
| for k, v in extra_headers.items(): | |
| req.add_header(k, v) | |
| return req | |
| def _open(self, req): | |
| timeout = self._kwargs.get("timeout", 30) | |
| try: | |
| return urlopen(req, timeout=timeout) | |
| except HTTPError as e: | |
| raise RemoteIOError(str(e)) | |
| # ---- public API ---------------------------------------------------------- | |
| def get_file_size(self): | |
| req = self._build_request() | |
| req.get_method = lambda: "HEAD" | |
| try: | |
| resp = self._open(req) | |
| length = resp.headers.get("Content-Length") | |
| if length is None: | |
| raise RemoteZipError( | |
| "Cannot get file size: Content-Length header missing" | |
| ) | |
| return int(length) | |
| except IOError as e: | |
| raise RemoteIOError(str(e)) | |
| def fetch(self, data_range, stream=False): | |
| """Fetch a byte range of the remote file.""" | |
| # Handle servers that don't support suffix ranges | |
| if ( | |
| data_range[0] < 0 | |
| and data_range[1] is None | |
| and not self._support_suffix_range | |
| ): | |
| size = self.get_file_size() | |
| data_range = (max(0, size + data_range[0]), size - 1) | |
| range_header = self.build_range_header(*data_range) | |
| req = self._build_request(extra_headers={"Range": range_header}) | |
| try: | |
| resp = self._open(req) | |
| cr = resp.headers.get("Content-Range") | |
| if cr is None: | |
| raise RangeNotSupported( | |
| "The server doesn't support range requests" | |
| ) | |
| range_min, range_max = self.parse_range_header(cr) | |
| return PartialBuffer(resp, range_min, range_max - range_min + 1, stream) | |
| except IOError as e: | |
| raise RemoteIOError(str(e)) | |
| # ── helpers ─────────────────────────────────────────────────────────────────── | |
| def pairwise(iterable): | |
| a, b = tee(iterable) | |
| next(b, None) | |
| return zip(a, b) | |
| # ── RemoteZip ───────────────────────────────────────────────────────────────── | |
| class RemoteZip(zipfile.ZipFile): | |
| """A ZipFile subclass that reads from a remote URL via HTTP range requests. | |
| Usage: | |
| with RemoteZip("https://example.com/archive.zip") as z: | |
| print(z.namelist()) | |
| data = z.read("some/file.txt") | |
| Pass extra headers (e.g. auth tokens) as keyword arguments: | |
| RemoteZip(url, headers={"Authorization": "Bearer ..."}) | |
| """ | |
| def __init__( | |
| self, | |
| url, | |
| initial_buffer_size=64 * 1024, | |
| fetcher=RemoteFetcher, | |
| support_suffix_range=True, | |
| **kwargs, | |
| ): | |
| rio = RemoteIO( | |
| fetcher(url, support_suffix_range=support_suffix_range, **kwargs).fetch, | |
| initial_buffer_size, | |
| ) | |
| super().__init__(rio) | |
| rio.set_position_to_size(self._get_position_to_size()) | |
| def _get_position_to_size(self): | |
| ilist = [info.header_offset for info in self.infolist()] | |
| if not ilist: | |
| return {} | |
| ilist.sort() | |
| ilist.append(self.start_dir) | |
| return {a: b - a for a, b in pairwise(ilist)} | |
| def size(self): | |
| return self.fp._file_size if self.fp else 0 | |
| # ── CLI ─────────────────────────────────────────────────────────────────────── | |
| def _list_files(url, support_suffix_range, filenames): | |
| with RemoteZip( | |
| url, | |
| headers={"User-Agent": "remotezip-stdlib"}, | |
| support_suffix_range=support_suffix_range, | |
| ) as z: | |
| if not filenames: | |
| filenames = z.namelist() | |
| data = [] | |
| for fname in filenames: | |
| zinfo = z.getinfo(fname) | |
| dt = datetime(*zinfo.date_time) | |
| data.append( | |
| (zinfo.file_size, dt.strftime("%Y-%m-%d %H:%M:%S"), zinfo.filename) | |
| ) | |
| _print_table(data, ("Length", "DateTime", "Name"), "><<") | |
| def _print_table(data, header, align): | |
| col_w = [len(col) for col in header] | |
| for row in data: | |
| col_w = [max(w, len(str(x))) for w, x in zip(col_w, row)] | |
| fmt = " ".join( | |
| "{{:{}{}}}".format(a, w) for w, a in zip(col_w, align + "<" * 99) | |
| ) | |
| print(fmt.format(*header).rstrip()) | |
| print(fmt.format(*["-" * w for w in col_w])) | |
| for row in data: | |
| print(fmt.format(*row).rstrip()) | |
| print() | |
| def _extract_files(url, support_suffix_range, filenames, path): | |
| with RemoteZip(url, support_suffix_range=support_suffix_range) as z: | |
| if not filenames: | |
| filenames = z.namelist() | |
| for fname in filenames: | |
| print(f"Extracting {fname}...") | |
| z.extract(fname, path=path) | |
| def main(): | |
| import argparse | |
| import os | |
| parser = argparse.ArgumentParser(description="Unzip remote files") | |
| parser.add_argument("url", help="URL of the zip archive") | |
| parser.add_argument("filename", nargs="*", help="File to extract") | |
| parser.add_argument( | |
| "-l", "--list", action="store_true", help="List files in the archive" | |
| ) | |
| parser.add_argument( | |
| "-d", "--dir", default=os.getcwd(), help="Extract directory (default: cwd)" | |
| ) | |
| parser.add_argument( | |
| "--disable-suffix-range-support", | |
| action="store_true", | |
| help="Use when server doesn't support suffix range (negative offset)", | |
| ) | |
| args = parser.parse_args() | |
| support_suffix_range = not args.disable_suffix_range_support | |
| if args.list: | |
| _list_files(args.url, support_suffix_range, args.filename) | |
| else: | |
| _extract_files(args.url, support_suffix_range, args.filename, args.dir) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment