Skip to content

Instantly share code, notes, and snippets.

@mrexodia
Created February 23, 2026 18:27
Show Gist options
  • Select an option

  • Save mrexodia/eab11a65425915bbc875dbad2c7d758e to your computer and use it in GitHub Desktop.

Select an option

Save mrexodia/eab11a65425915bbc875dbad2c7d758e to your computer and use it in GitHub Desktop.
Gitlab Pipeline Utilities
#!/usr/bin/env python3
"""
Fetch CI pipeline job logs and artifacts locally.
Usage:
# Default: latest pipeline for current branch
# Logs for all jobs, artifacts for failed jobs only
python3 tools/ci/fetch_pipeline.py
# Specific pipeline
python3 tools/ci/fetch_pipeline.py -p 37748
# Specific branch
python3 tools/ci/fetch_pipeline.py -b main
# Download only logs for failed jobs (skip artifact zips)
python3 tools/ci/fetch_pipeline.py --logs-only
# Download logs + artifacts for all jobs (not just failed)
python3 tools/ci/fetch_pipeline.py --all-artifacts
# List jobs without downloading
python3 tools/ci/fetch_pipeline.py --list
# Custom output directory
python3 tools/ci/fetch_pipeline.py -o /tmp/my-pipeline
Files are saved to:
ci-artifacts/ # (or --output path)
pipeline.json # full pipeline + job metadata
{job_slug}/
job.log # job trace log (cleaned)
artifacts/ # extracted artifact contents
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import zipfile
from pathlib import Path
INTERACTIVE = sys.stderr.isatty()
CHUNK_SIZE = 256 * 1024 # 256KB read chunks
HIDE_CURSOR = "\033[?25l"
SHOW_CURSOR = "\033[?25h"
def fmt_size(nbytes):
"""Format byte count for display."""
if nbytes < 1024:
return f"{nbytes}B"
elif nbytes < 1024 * 1024:
return f"{nbytes/1024:.1f}KB"
elif nbytes < 1024 * 1024 * 1024:
return f"{nbytes/1024/1024:.1f}MB"
else:
return f"{nbytes/1024/1024/1024:.2f}GB"
def fmt_speed(bytes_per_sec):
"""Format download speed."""
return fmt_size(bytes_per_sec) + "/s"
def fmt_eta(seconds):
"""Format seconds remaining as human-readable string."""
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
return f"{int(seconds/60)}m{int(seconds%60):02d}s"
else:
return f"{int(seconds/3600)}h{int((seconds%3600)/60):02d}m"
def progress_bar(received, total, speed, width=30):
"""Render a progress bar string."""
if total > 0:
frac = min(received / total, 1.0)
filled = int(width * frac)
bar = "█" * filled + "░" * (width - filled)
pct = f"{frac*100:5.1f}%"
remaining = total - received
eta = fmt_eta(remaining / speed) if speed > 0 else "?"
return f" [{bar}] {pct} {fmt_size(received)}/{fmt_size(total)} {fmt_speed(speed)} ETA {eta}"
else:
return f" {fmt_size(received)} {fmt_speed(speed)}"
def stream_to_file(cmd, dest, expected_size=0):
"""Run a command, streaming stdout to a file with progress. Returns bytes written."""
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
received = 0
t_start = time.monotonic()
last_update = 0
if INTERACTIVE:
print(HIDE_CURSOR, end="", file=sys.stderr, flush=True)
try:
with open(dest, 'wb') as f:
while True:
chunk = proc.stdout.read(CHUNK_SIZE)
if not chunk:
break
f.write(chunk)
received += len(chunk)
now = time.monotonic()
if INTERACTIVE and now - last_update >= 0.15:
elapsed = now - t_start
speed = received / elapsed if elapsed > 0 else 0
line = progress_bar(received, expected_size, speed)
print(f"\r{line}", end="", file=sys.stderr, flush=True)
last_update = now
proc.wait()
except KeyboardInterrupt:
proc.kill()
proc.wait()
raise
finally:
if INTERACTIVE:
print(f"\r{' '*100}\r{SHOW_CURSOR}", end="", file=sys.stderr, flush=True)
if proc.returncode != 0:
err = proc.stderr.read().decode()
print(f" ERROR: {' '.join(cmd)}: {err.strip()}", file=sys.stderr)
return None
return received
def stream_to_bytes(cmd):
"""Run a command, return stdout as bytes (for small responses like logs)."""
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
data = proc.stdout.read()
proc.wait()
if proc.returncode != 0:
err = proc.stderr.read().decode()
print(f" ERROR: {' '.join(cmd)}: {err.strip()}", file=sys.stderr)
return None
return data
def run(cmd):
"""Run a command, return stdout as string."""
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" ERROR: {' '.join(cmd)}: {result.stderr.strip()}", file=sys.stderr)
return None
return result.stdout
def get_pipeline(pipeline_id=None, branch=None):
"""Fetch pipeline with job details using glab ci get."""
cmd = ["glab", "ci", "get", "--with-job-details", "--output", "json"]
if pipeline_id:
cmd += ["--pipeline-id", str(pipeline_id)]
elif branch:
cmd += ["--branch", branch]
result = run(cmd)
if result is None:
sys.exit(1)
return json.loads(result)
def get_merge_request_url(branch):
"""Look up an open MR for the given branch. Returns URL or None."""
result = run(["glab", "api",
f"projects/:id/merge_requests?source_branch={branch}&state=opened"])
if result is None:
return None
mrs = json.loads(result)
if mrs:
return mrs[0].get('web_url')
return None
def slugify(name):
"""Turn a job name into a filesystem-safe slug."""
s = re.sub(r'[^a-zA-Z0-9_\-]', '_', name)
s = re.sub(r'_+', '_', s).strip('_')
return s[:80]
def archive_size_for(job):
"""Return the archive artifact size in bytes, or 0."""
for a in job.get('artifacts', []):
if a.get('file_type') == 'archive':
return a.get('size', 0)
return 0
def print_jobs(jobs, log_job_ids, artifact_job_ids):
"""Print a summary table of jobs."""
print(f"\n{'ID':>8} {'Status':<10} {'Artifacts':>12} {'Download':>10} Name")
print(f"{'─'*8} {'─'*10} {'─'*12} {'─'*10} {'─'*50}")
for j in jobs:
size = archive_size_for(j)
size_str = fmt_size(size) if size else "none"
if j['id'] in artifact_job_ids:
dl = "log+arts"
elif j['id'] in log_job_ids:
dl = "log"
else:
dl = "skip"
print(f"{j['id']:>8} {j['status']:<10} {size_str:>12} {dl:>10} {j['name']}")
def download_log(job, outdir):
"""Download the job trace log."""
log_path = outdir / "job.log"
if log_path.exists():
print(f" log: already exists, skipping")
return True
data = stream_to_bytes(["glab", "api", f"projects/:id/jobs/{job['id']}/trace"])
if data is None:
return False
# Strip ANSI escape codes for readability
text = data.decode('utf-8', errors='replace')
text = re.sub(r'\x1b\[[0-9;]*m', '', text)
# Strip CI timestamp prefixes (e.g. "2026-02-18T14:03:16.852092Z 01O ")
text = re.sub(r'^\d{4}-\d{2}-\d{2}T[\d:.]+Z \d+[OE] ?', '', text, flags=re.MULTILINE)
log_path.write_text(text)
print(f" log: {fmt_size(len(data))}")
return True
def download_artifacts(job, outdir):
"""Download and extract the artifact archive."""
arts_dir = outdir / "artifacts"
marker = outdir / ".artifacts_done"
if marker.exists():
print(f" artifacts: already extracted, skipping")
return True
size = archive_size_for(job)
if size == 0:
print(f" artifacts: none")
return True
# Stream download to a temp file, then extract
fd, tmp_zip = tempfile.mkstemp(suffix=".zip", dir=outdir)
os.close(fd)
try:
print(f" artifacts: downloading {fmt_size(size)}...", flush=True)
t_start = time.monotonic()
received = stream_to_file(
["glab", "api", f"projects/:id/jobs/{job['id']}/artifacts"],
tmp_zip,
expected_size=size)
if received is None:
return False
elapsed = time.monotonic() - t_start
speed = received / elapsed if elapsed > 0 else 0
print(f" artifacts: downloaded {fmt_size(received)} in {elapsed:.1f}s ({fmt_speed(speed)})")
print(f" artifacts: extracting...", flush=True)
arts_dir.mkdir(parents=True, exist_ok=True)
try:
with zipfile.ZipFile(tmp_zip) as zf:
zf.extractall(arts_dir)
count = len(zf.namelist())
marker.write_text("")
print(f" artifacts: {count} files extracted")
except zipfile.BadZipFile:
print(f" artifacts: ERROR: bad zip file", file=sys.stderr)
return False
finally:
if os.path.exists(tmp_zip):
os.unlink(tmp_zip)
return True
def main():
parser = argparse.ArgumentParser(
description="Fetch CI pipeline logs and artifacts locally",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__)
parser.add_argument("-p", "--pipeline-id", type=int, default=None,
help="Pipeline ID (default: latest for current branch)")
parser.add_argument("-b", "--branch", type=str, default=None,
help="Branch name (default: current branch)")
parser.add_argument("-o", "--output", type=str, default=None,
help="Output directory (default: ci-artifacts/)")
parser.add_argument("--all-artifacts", action="store_true",
help="Download artifacts for all jobs, not just failed")
parser.add_argument("--logs-only", action="store_true",
help="Download only job logs, skip artifact archives")
parser.add_argument("--list", action="store_true",
help="List jobs and exit without downloading")
args = parser.parse_args()
# Check prerequisites
if not shutil.which("glab"):
print("Error: glab CLI not found.\n"
"Install it from: https://gitlab.com/gitlab-org/cli/#installation",
file=sys.stderr)
sys.exit(1)
result = subprocess.run(["glab", "auth", "status"],
capture_output=True, text=True)
if result.returncode != 0:
print("Error: glab is not authenticated.\n",
file=sys.stderr)
# glab auth status prints details to stderr
print(result.stderr, file=sys.stderr)
sys.exit(1)
# Fetch pipeline (single call gets pipeline metadata + all jobs)
what = f"pipeline {args.pipeline_id}" if args.pipeline_id \
else f"branch {args.branch}" if args.branch \
else "current branch"
print(f"Fetching latest pipeline for {what}...")
pipeline = get_pipeline(args.pipeline_id, args.branch)
jobs = pipeline.get('jobs', [])
print(f"Pipeline {pipeline['id']} ref={pipeline['ref']} "
f"status={pipeline['status']} {len(jobs)} jobs\n"
f"URL: {pipeline.get('web_url', '')}")
mr_url = get_merge_request_url(pipeline['ref'])
if mr_url:
print(f"MR: {mr_url}")
if not jobs:
print("No jobs found.")
sys.exit(0)
# Determine which jobs to download
failed_ids = {j['id'] for j in jobs if j['status'] == 'failed'}
if args.logs_only:
log_job_ids = failed_ids
artifact_job_ids = set()
elif args.all_artifacts:
log_job_ids = {j['id'] for j in jobs}
artifact_job_ids = {j['id'] for j in jobs}
else:
# Default: logs + artifacts for failed jobs only
log_job_ids = failed_ids
artifact_job_ids = failed_ids
print_jobs(jobs, log_job_ids, artifact_job_ids)
if args.list:
sys.exit(0)
# Setup output directory
base = Path(args.output) if args.output else Path("ci-artifacts")
base.mkdir(parents=True, exist_ok=True)
# Check for existing pipeline data from a different pipeline
meta_path = base / "pipeline.json"
if meta_path.exists():
existing = json.loads(meta_path.read_text())
existing_id = existing.get('id')
if existing_id != pipeline['id']:
if INTERACTIVE:
print(f"\n{base}/ already contains data for pipeline "
f"{existing_id} (ref={existing.get('ref', '?')}).")
answer = input(f"Delete and replace with pipeline {pipeline['id']}? [Y/n] ")
if answer.strip().lower() in ('', 'y', 'yes'):
shutil.rmtree(base)
base.mkdir(parents=True, exist_ok=True)
else:
print("Aborted.")
sys.exit(0)
else:
print(f"Error: {base}/ already contains data for pipeline "
f"{existing_id} (ref={existing.get('ref', '?')}), "
f"but requested pipeline {pipeline['id']}.\n"
f"Delete the directory or use -o to specify a different path.",
file=sys.stderr)
sys.exit(1)
# Ensure .gitignore exists so artifacts aren't committed
gitignore = base / ".gitignore"
if not gitignore.exists():
gitignore.write_text("*\n")
# Save full pipeline + job metadata
meta_path.write_text(json.dumps(pipeline, indent=2))
# Download each job
total_arts = sum(archive_size_for(j) for j in jobs if j['id'] in artifact_job_ids)
if total_arts:
print(f"\nTotal artifact download: ~{fmt_size(total_arts)}")
print(f"Downloading to {base}/\n")
for j in jobs:
if j['id'] not in log_job_ids:
continue
outdir = base / slugify(j['name'])
outdir.mkdir(parents=True, exist_ok=True)
print(f" [{j['status']}] {j['name']} (job {j['id']})")
download_log(j, outdir)
if j['id'] in artifact_job_ids:
download_artifacts(j, outdir)
print()
print(f"Done. Results in {base}/")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
if INTERACTIVE:
print(SHOW_CURSOR, end="", file=sys.stderr, flush=True)
print("\nInterrupted.", file=sys.stderr)
sys.exit(130)
#!/usr/bin/env python
"""
RemoteZip — stdlib-only fork.
Original: https://github.com/gtsystem/python-remotezip
Refactored to replace `requests` with `urllib.request` so the only
dependency is the Python standard library.
"""
import io
import time
import zipfile
from datetime import datetime
from itertools import tee
import urllib.request as urllib_request
from urllib.parse import urlparse, parse_qs
from urllib.request import Request, urlopen
from urllib.error import HTTPError
__all__ = ["RemoteIOError", "RemoteZip"]
# ── Exceptions ────────────────────────────────────────────────────────────────
class RemoteZipError(Exception):
pass
class OutOfBound(RemoteZipError):
pass
class RemoteIOError(RemoteZipError):
pass
class RangeNotSupported(RemoteZipError):
pass
# ── PartialBuffer ─────────────────────────────────────────────────────────────
class PartialBuffer:
"""Buffer-like wrapper around a byte range of a remote file."""
def __init__(self, buffer, offset, size, stream):
self.buffer = buffer if stream else io.BytesIO(buffer.read())
self._offset = offset
self._size = size
self._position = offset
self._stream = stream
def __len__(self):
return self._size
def __repr__(self):
return f"<PartialBuffer off={self._offset} size={self._size} stream={self._stream}>"
def read(self, size=0):
if size == 0:
size = self._offset + self._size - self._position
content = self.buffer.read(size)
self._position = self._offset + self.buffer.tell()
return content
def close(self):
if not self.buffer.closed:
self.buffer.close()
def tell(self):
return self._position
def seek(self, offset, whence):
if whence == 2:
self._position = self._size + self._offset + offset
elif whence == 0:
self._position = offset
else:
self._position += offset
relative_position = self._position - self._offset
if relative_position < 0 or relative_position >= self._size:
raise OutOfBound("Position out of buffer bound")
if self._stream:
buff_pos = self.buffer.tell()
if relative_position < buff_pos:
raise OutOfBound("Negative seek not supported")
skip_bytes = relative_position - buff_pos
if skip_bytes == 0:
return self._position
self.buffer.read(skip_bytes)
else:
self.buffer.seek(relative_position)
return self._position
# ── RemoteIO ──────────────────────────────────────────────────────────────────
class RemoteIO(io.IOBase):
"""File-like façade that fetches byte ranges on demand."""
def __init__(self, fetch_fun, initial_buffer_size=64 * 1024):
self._fetch_fun = fetch_fun
self._initial_buffer_size = initial_buffer_size
self.buffer = None
self._file_size = None
self._seek_succeeded = False
self._member_position_to_size = None
self._last_member_pos = None
def set_position_to_size(self, position_to_size):
self._member_position_to_size = position_to_size
def read(self, size=0):
position = self.tell()
if size == 0:
size = self._file_size - position
if not self._seek_succeeded:
if self._member_position_to_size is None:
fetch_size = size
stream = False
else:
try:
fetch_size = self._member_position_to_size[position]
self._last_member_pos = position
except KeyError:
if self._last_member_pos and self._last_member_pos < position:
fetch_size = self._member_position_to_size[self._last_member_pos]
fetch_size -= position - self._last_member_pos
else:
raise OutOfBound(
"Attempt to seek outside boundary of current zip member"
)
stream = True
self._seek_succeeded = True
self.buffer.close()
self.buffer = self._fetch_fun(
(position, position + fetch_size - 1), stream=stream
)
return self.buffer.read(size)
def seekable(self):
return True
def seek(self, offset, whence=0):
if whence == 2 and self._file_size is None:
size = self._initial_buffer_size
self.buffer = self._fetch_fun((-size, None), stream=False)
self._file_size = len(self.buffer) + self.buffer.tell()
try:
pos = self.buffer.seek(offset, whence)
self._seek_succeeded = True
return pos
except OutOfBound:
self._seek_succeeded = False
return self.tell()
def tell(self):
return self.buffer.tell()
def close(self):
if self.buffer:
self.buffer.close()
self.buffer = None
# ── RemoteFetcher (stdlib) ────────────────────────────────────────────────────
class _NoRedirectHandler(urllib_request.HTTPRedirectHandler):
"""Intercept redirects so we can capture the final URL without following."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
raise _RedirectCaptured(newurl)
class _RedirectCaptured(Exception):
"""Carries the redirect target URL."""
def __init__(self, url):
self.url = url
class RemoteFetcher:
"""Fetches byte ranges of a remote file using urllib.request.
Handles the common pattern where the initial URL (e.g. GitLab artifacts)
returns a 302 redirect to a signed object-storage URL (S3 / GCS / MinIO).
The redirect is resolved once; all subsequent range requests go directly
to the final URL *without* the original auth headers (the signature is
already embedded in the query string).
"""
# Headers that should NOT be forwarded to the storage backend.
_AUTH_HEADERS = frozenset(
h.lower()
for h in (
"PRIVATE-TOKEN",
"Authorization",
"JOB-TOKEN",
"DEPLOY-TOKEN",
)
)
def __init__(
self, url, headers=None, support_suffix_range=True, follow_redirects=True,
signed_url_margin=120, **kwargs
):
self._initial_url = url
self._headers = headers or {}
self._support_suffix_range = support_suffix_range
self._follow_redirects = follow_redirects
self._signed_url_margin = signed_url_margin # seconds before expiry to re-resolve
self._kwargs = kwargs
# Resolved after the first request
self._resolved_url = None
self._resolved_at = None
self._ttl = None # seconds until the signed URL expires
self._is_redirected = False
# ---- range header helpers ------------------------------------------------
@staticmethod
def parse_range_header(content_range_header):
range_part = content_range_header[6:].split("/")[0]
if range_part.startswith("-"):
return int(range_part), None
range_min, range_max = range_part.split("-")
return int(range_min), int(range_max) if range_max else None
@staticmethod
def build_range_header(range_min, range_max):
if range_max is None:
return "bytes=%s%s" % (range_min, "" if range_min < 0 else "-")
return "bytes=%s-%s" % (range_min, range_max)
# ---- redirect resolution -------------------------------------------------
@staticmethod
def _parse_signed_url_ttl(url):
"""Extract the TTL (in seconds) from a signed S3/GCS URL.
Looks for X-Amz-Expires (AWS) or X-Goog-Expires (GCS) query params.
Returns None if not a signed URL.
"""
qs = parse_qs(urlparse(url).query)
for key in ("X-Amz-Expires", "X-Goog-Expires"):
vals = qs.get(key)
if vals:
try:
return int(vals[0])
except (ValueError, IndexError):
pass
return None
def _is_expired(self):
"""Check whether the resolved signed URL is about to expire."""
if self._resolved_at is None or self._ttl is None:
return True
elapsed = time.monotonic() - self._resolved_at
return elapsed >= (self._ttl - self._signed_url_margin)
def _resolve_redirect(self):
"""Follow the initial URL to discover the final (signed) URL.
Re-resolves automatically when the signed URL is about to expire
(based on X-Amz-Expires minus a safety margin). After resolution,
all subsequent requests hit the final URL directly with auth headers
stripped (they'd cause S3 to return 400).
"""
if self._resolved_url is not None and not self._is_expired():
return # still fresh
if not self._follow_redirects:
self._resolved_url = self._initial_url
self._is_redirected = False
return
opener = urllib_request.build_opener(_NoRedirectHandler)
req = Request(self._initial_url)
for k, v in self._headers.items():
req.add_header(k, v)
req.get_method = lambda: "HEAD"
timeout = self._kwargs.get("timeout", 30)
try:
resp = opener.open(req, timeout=timeout)
# No redirect — use the original URL as-is
self._resolved_url = self._initial_url
self._is_redirected = False
self._ttl = None
resp.close()
except _RedirectCaptured as rc:
self._resolved_url = rc.url
self._is_redirected = True
self._ttl = self._parse_signed_url_ttl(rc.url)
except HTTPError as e:
# Some servers return 302 as an HTTPError; check for Location header
location = e.headers.get("Location")
if location and e.code in (301, 302, 303, 307, 308):
self._resolved_url = location
self._is_redirected = True
self._ttl = self._parse_signed_url_ttl(location)
else:
raise RemoteIOError(str(e))
self._resolved_at = time.monotonic()
# ---- internal request helpers -------------------------------------------
def _build_request(self, extra_headers=None):
self._resolve_redirect()
req = Request(self._resolved_url)
# Only attach auth headers if we're still hitting the original host
for k, v in self._headers.items():
if self._is_redirected and k.lower() in self._AUTH_HEADERS:
continue
req.add_header(k, v)
if extra_headers:
for k, v in extra_headers.items():
req.add_header(k, v)
return req
def _open(self, req):
timeout = self._kwargs.get("timeout", 30)
try:
return urlopen(req, timeout=timeout)
except HTTPError as e:
raise RemoteIOError(str(e))
# ---- public API ----------------------------------------------------------
def get_file_size(self):
req = self._build_request()
req.get_method = lambda: "HEAD"
try:
resp = self._open(req)
length = resp.headers.get("Content-Length")
if length is None:
raise RemoteZipError(
"Cannot get file size: Content-Length header missing"
)
return int(length)
except IOError as e:
raise RemoteIOError(str(e))
def fetch(self, data_range, stream=False):
"""Fetch a byte range of the remote file."""
# Handle servers that don't support suffix ranges
if (
data_range[0] < 0
and data_range[1] is None
and not self._support_suffix_range
):
size = self.get_file_size()
data_range = (max(0, size + data_range[0]), size - 1)
range_header = self.build_range_header(*data_range)
req = self._build_request(extra_headers={"Range": range_header})
try:
resp = self._open(req)
cr = resp.headers.get("Content-Range")
if cr is None:
raise RangeNotSupported(
"The server doesn't support range requests"
)
range_min, range_max = self.parse_range_header(cr)
return PartialBuffer(resp, range_min, range_max - range_min + 1, stream)
except IOError as e:
raise RemoteIOError(str(e))
# ── helpers ───────────────────────────────────────────────────────────────────
def pairwise(iterable):
a, b = tee(iterable)
next(b, None)
return zip(a, b)
# ── RemoteZip ─────────────────────────────────────────────────────────────────
class RemoteZip(zipfile.ZipFile):
"""A ZipFile subclass that reads from a remote URL via HTTP range requests.
Usage:
with RemoteZip("https://example.com/archive.zip") as z:
print(z.namelist())
data = z.read("some/file.txt")
Pass extra headers (e.g. auth tokens) as keyword arguments:
RemoteZip(url, headers={"Authorization": "Bearer ..."})
"""
def __init__(
self,
url,
initial_buffer_size=64 * 1024,
fetcher=RemoteFetcher,
support_suffix_range=True,
**kwargs,
):
rio = RemoteIO(
fetcher(url, support_suffix_range=support_suffix_range, **kwargs).fetch,
initial_buffer_size,
)
super().__init__(rio)
rio.set_position_to_size(self._get_position_to_size())
def _get_position_to_size(self):
ilist = [info.header_offset for info in self.infolist()]
if not ilist:
return {}
ilist.sort()
ilist.append(self.start_dir)
return {a: b - a for a, b in pairwise(ilist)}
def size(self):
return self.fp._file_size if self.fp else 0
# ── CLI ───────────────────────────────────────────────────────────────────────
def _list_files(url, support_suffix_range, filenames):
with RemoteZip(
url,
headers={"User-Agent": "remotezip-stdlib"},
support_suffix_range=support_suffix_range,
) as z:
if not filenames:
filenames = z.namelist()
data = []
for fname in filenames:
zinfo = z.getinfo(fname)
dt = datetime(*zinfo.date_time)
data.append(
(zinfo.file_size, dt.strftime("%Y-%m-%d %H:%M:%S"), zinfo.filename)
)
_print_table(data, ("Length", "DateTime", "Name"), "><<")
def _print_table(data, header, align):
col_w = [len(col) for col in header]
for row in data:
col_w = [max(w, len(str(x))) for w, x in zip(col_w, row)]
fmt = " ".join(
"{{:{}{}}}".format(a, w) for w, a in zip(col_w, align + "<" * 99)
)
print(fmt.format(*header).rstrip())
print(fmt.format(*["-" * w for w in col_w]))
for row in data:
print(fmt.format(*row).rstrip())
print()
def _extract_files(url, support_suffix_range, filenames, path):
with RemoteZip(url, support_suffix_range=support_suffix_range) as z:
if not filenames:
filenames = z.namelist()
for fname in filenames:
print(f"Extracting {fname}...")
z.extract(fname, path=path)
def main():
import argparse
import os
parser = argparse.ArgumentParser(description="Unzip remote files")
parser.add_argument("url", help="URL of the zip archive")
parser.add_argument("filename", nargs="*", help="File to extract")
parser.add_argument(
"-l", "--list", action="store_true", help="List files in the archive"
)
parser.add_argument(
"-d", "--dir", default=os.getcwd(), help="Extract directory (default: cwd)"
)
parser.add_argument(
"--disable-suffix-range-support",
action="store_true",
help="Use when server doesn't support suffix range (negative offset)",
)
args = parser.parse_args()
support_suffix_range = not args.disable_suffix_range_support
if args.list:
_list_files(args.url, support_suffix_range, args.filename)
else:
_extract_files(args.url, support_suffix_range, args.filename, args.dir)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment