-
-
Save yvan-sraka/963f2db6782c52316fe79dcc90f549b2 to your computer and use it in GitHub Desktop.
Partial/streaming zip downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-License-Identifier: 0BSD or CC0-1.0 or MIT-0 or Unlicense | |
# Copyright (c) 2023, Ryan Castellucci, No Rights Reserved | |
import io, sys | |
import datetime | |
import argparse | |
import requests | |
import operator | |
import struct | |
from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA | |
from fnmatch import fnmatch | |
structFileHeader = "<4s2B4HL2L2H" | |
sizeFileHeader = struct.calcsize(structFileHeader) | |
_FH_FILENAME_LENGTH = 10 | |
_FH_EXTRA_FIELD_LENGTH = 11 | |
BLOCK_SIZE = 1<<18 | |
__debug = False | |
def debug(*args, **kwarg): | |
if __debug: | |
if 'file' not in kwarg: kwarg['file'] = sys.stderr | |
print(*args, **kwarg) | |
class HttpError(IOError): | |
def __init__(self, response, message=None): | |
self.response = response | |
if message is None: self.message = f'http status {response.status_code}' | |
else: self.message = message | |
super().__init__(self.message) | |
# not a full implementation, just enough to use with BufferedReader | |
class _HttpIO(io.RawIOBase): | |
def __init__(self, url, *, session=None, parent=None): | |
if not session: | |
session = parent._session if parent else requests.Session() | |
self._session = session | |
self._parent = parent | |
self._url, self._off = url, 0 | |
self._pos, self._total = 0, 0 | |
self._gets, self._heads = 0, 0 | |
self._response, self._iter, self._next = None, None, None | |
if not parent: | |
# We assume the zip file doesn't change while this tool is in use | |
r = self._head() | |
if 'bytes' not in r.headers.get('Accept-Ranges'): | |
raise HttpError(r, 'byte ranges not supported by server') | |
self._len = int(r.headers.get('Content-Length')) | |
def _head(self): | |
self._heads += 1 | |
if self._parent: self._parent._heads += 1 | |
r = self._session.head(self._url, allow_redirects=True) | |
if r.status_code != 200: raise HttpError(r) | |
# Update URL if there was a redirect | |
if r.url != self._url: self._url = r.url | |
return r | |
def _get(self, headers=None, *, stream=False): | |
self._gets += 1 | |
if self._parent: self._parent._gets += 1 | |
r = self._session.get(self._url, headers=headers, stream=stream) | |
if r.status_code not in (200, 206): raise HttpError(r) | |
return r | |
def _advance(self, n): | |
self._pos += n | |
self._total += n | |
if self._parent: | |
self._parent._total += n | |
def slice(self, offset, length=None): | |
hio = self.__class__(self._url, session=self._session, parent=self) | |
hio._off = offset | |
hio._len = length if length is not None else self._len - self._off | |
hio.seek(0) | |
return hio | |
def _stream_close(self): | |
debug('_stream_close') | |
if self._response: self._response.close() | |
self._response, self._iter, self._next = None, None, None | |
def _stream_read(self, size=-1): | |
debug(f'_stream_read avail={len(self._next) if self._next else 0}, requested={size}') | |
assert self._iter is not None | |
if self._next is None: | |
# this implies that the response iterator was exausted on its first call, | |
# which should not happen, but better safe than sorry | |
self._stream_close() | |
size = BLOCK_SIZE if size < 0 else min(size, BLOCK_SIZE) | |
return self.read(size) | |
elif size >= 0 and size < len(self._next): | |
# reading less than the available amount of data isn't fully handled | |
chunk = self._next[:size] | |
self._stream_close() | |
else: | |
chunk = self._next | |
# the response iterator only gets marked as exausted upon when it is asked | |
# to generate a chunk but has no data available - read ahead one chunk | |
# since the reader may stop after the last one it expects | |
self._next = next(self._iter, None) | |
if self._next is None: | |
self._stream_close() | |
self._advance(len(chunk)) | |
return chunk | |
def read(self, size=-1): | |
return self._read(size) | |
def _read(self, size=-1, *, stream=False, chunk_size=BLOCK_SIZE): | |
if self._iter: | |
return self._stream_read(size) | |
elif size == 0: | |
# any empty byte string will do | |
return b'' | |
elif self._off == 0 and self._pos == 0 and (size < 0 or size >= self._len): | |
# entire file from the begining, no range header needed | |
end = None | |
elif size < 0: | |
# rest of the file | |
end = self._off + self._len | |
else: | |
# requested range or rest of the file, whichever is less | |
end = min(self._off + self._len, self._pos + size) - 1 | |
debug('read', size, self._pos - self._off, end - self._off, self._len) | |
headers = {} | |
if end is not None: | |
headers['Range'] = f'bytes={self._pos}-{end}' | |
# reading multiple sequential chunks without streaming would require a request | |
# to the server for each chunk, involving potentially slow network round trips | |
actually_stream = bool(stream and (size < 0 or size >= chunk_size)) | |
r = self._get(headers, stream=actually_stream) | |
if actually_stream: | |
# set up response streaming | |
debug(f'_stream_init chunk_size={chunk_size}') | |
self._response = r | |
self._iter = r.iter_content(chunk_size) | |
self._next = next(self._iter, None) | |
# return the first chunk | |
return self._stream_read(size) | |
n = int(r.headers.get('Content-Length')) | |
self._advance(n) | |
return r.content | |
def readall(self): | |
return self.read(-1) | |
def readinto(self, b): | |
b = (memoryview(b) if not isinstance(b, memoryview) else b).cast('B') | |
n = len(b) | |
data = self.read(n) | |
n = len(data) | |
b[:n] = data | |
return n | |
def seek(self, pos, whence=0): | |
if whence != 0 or pos != self._pos - self._off: | |
debug('seek', pos, whence, self._pos - self._off) | |
if whence == 0: newpos = self._off + pos | |
elif whence == 1: newpos = self._pos + pos | |
elif whence == 2: newpos = self._off + self._len + pos | |
else: raise ValueError('invalid whence') | |
if self._pos != newpos and self._iter: | |
self._stream_close() | |
self._pos = newpos | |
return newpos | |
def __len__(self): return self._len | |
def tell(self): return self._pos - self._off | |
def writeable(self): return False | |
def seekable(self): return True | |
def readable(self): return True | |
def __getattr__(self, name): | |
if name in ('truncate', 'fileno', 'write'): | |
raise OSError(f'{name} not supported') | |
return None | |
closed = property(lambda self: False) | |
url = property(operator.attrgetter('_url')) | |
# helper class that automatically bypasses the buffer for bulk reads, and tries to | |
# cache the end of central directory record from the end of a zip file | |
class _BufferedHttpIO(io.BufferedReader): | |
def __init__(self, httpio, buffer_size=1024, block_size=BLOCK_SIZE): | |
assert isinstance(httpio, _HttpIO) | |
super().__init__(httpio, buffer_size) | |
self._buffer_size = buffer_size | |
self._block_size = block_size | |
self._unbuffered = False | |
self._tail = None | |
def read(self, size=-1): | |
if self.raw._len >= self._buffer_size: | |
pos = self.tell() | |
from_end = self.raw._len - pos | |
if from_end <= self._buffer_size: | |
# ZipFile does several small reads and seeks near the end of the file, | |
# so it's useful to cache the last buffer worth of data, since seek() | |
# resets the read buffer | |
if not self._tail: | |
debug('caching tail') | |
self.seek(-self._buffer_size, 2) | |
self._tail = super().read(self._buffer_size) | |
n = len(self._tail) | |
self.raw._advance(n) | |
else: | |
n = len(self._tail) | |
start = n - from_end | |
end = n if size < 0 else min(n, start+size) | |
chunk = self._tail[start:end] | |
debug('cached tail read', len(chunk)) | |
self.seek(pos + len(chunk)) | |
return chunk | |
# for larger reads, io.BufferedReader just gets in the way, so bypass it | |
if size < 0 or size > self._buffer_size: | |
if self._unbuffered: | |
return self.raw._read(size, stream=True, chunk_size=self._block_size) | |
#return self.raw.read(min(size, self._block_size)) | |
else: | |
self._unbuffered = True | |
debug('enter unbuffered mode') | |
if size < 0: size = self._buffer_size | |
# return the buffer contents | |
return self.read1(size) | |
elif self._unbuffered: | |
debug('exit unbuffered mode') | |
# syncronize the file position | |
self.seek(self.raw.tell()) | |
self._unbuffered = False | |
# normal buffered read | |
return super().read(size) | |
class ZipHttp(ZipFile): | |
def __init__(self, httpio, *args, **kwargs): | |
assert isinstance(httpio, _HttpIO) | |
self._args, self._kwargs = args, kwargs | |
bkw = {} | |
if 'buffer_size' in kwargs: bkw['buffer_size'] = kwargs.pop('buffer_size') | |
if 'block_size' in kwargs: bkw['block_size'] = kwargs.pop('block_size') | |
reader = _BufferedHttpIO(httpio, **bkw) | |
super().__init__(reader, *args, **kwargs) | |
self._httpio, self._reader = httpio, reader | |
def sub(self, zi): | |
assert zi.compress_type == ZIP_STORED | |
saved_position = self._reader.tell() | |
# get the zip member header | |
self._reader.seek(zi.header_offset) | |
fheader = self._reader.read(sizeFileHeader) | |
fheader = struct.unpack(structFileHeader, fheader) | |
self._reader.seek(saved_position) | |
# find the start of the actual data | |
skip = fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] | |
data_offset = zi.header_offset + sizeFileHeader + skip | |
httpio = self.httpio.slice(data_offset, zi.compress_size) | |
return self.__class__(httpio, *self._args, **self._kwargs) | |
return self.httpio.slice(data_offset, zi.compress_size) | |
def infolist_nested(self, max_depth=1, parents=None, *, pattern=None): | |
parents = parents or () | |
for zi in self.infolist(): | |
yield self, zi, parents | |
# recursion limit | |
if max_depth < 0: continue | |
# only try to recurse STORED .zip files | |
if zi.compress_type != ZIP_STORED: continue | |
if zi.filename[-4:].lower() != '.zip': continue | |
# if a pattern was provided, don't recurse unless it matches | |
if pattern is not None and not fnmatch(zi.filename, pattern): continue | |
with self.sub(zi) as z2: | |
yield from z2.infolist_nested(max_depth - 1, parents + (zi.filename,)) | |
def open_nested(self, name): | |
# try to glob the filename | |
for c in '[]?*': | |
if c not in name: | |
continue | |
# if there's no colon in the name, pat1 and sep will be empty strings | |
pat1, sep, pat2 = name.rpartition(':') | |
for z2, zi, parents in self.infolist_nested(pattern=pat1): | |
if not parents: | |
if sep or not fnmatch(zi.filename, pat2): continue | |
else: | |
if not pat1 or not fnmatch(':'.join(parents), pat1): continue | |
if not fnmatch(zi.filename, pat2): continue | |
return z2.open(zi.filename) | |
try: | |
zi = self.getinfo(name) | |
return self.open(name) | |
except KeyError as e: | |
oname, sep, name = name.rpartition(':') | |
if sep and oname[-4:].lower() != '.zip': | |
raise e | |
try: zi = self.getinfo(oname) | |
except: raise e | |
if zi.compress_type != ZIP_STORED: | |
raise ValueError(f'Nested zip file uses a method other than STORE!') | |
z2 = self.sub(zi) | |
return z2.open(name) | |
httpio = property(operator.attrgetter('_httpio')) | |
def list_zipinfo(zi, parents=None): | |
parents = parents or () | |
ts = datetime.datetime(*zi.date_time).strftime('%Y-%m-%d %H:%M:%S') | |
name = ':'.join(parents + (zi.filename,)) | |
method_id = zi.compress_type | |
if method_id == ZIP_STORED: method = 'S' | |
elif method_id == ZIP_DEFLATED: method = 'D' | |
elif method_id == ZIP_BZIP2: method = 'B' | |
elif method_id == ZIP_LZMA: method = 'L' | |
else: method = '?' | |
print(f'{ts} {zi.file_size:13d} {zi.compress_size:13d} {method} {name}') | |
parser = argparse.ArgumentParser(description='Operate on zip files over HTTP.') | |
parser.add_argument('url', metavar='URL', type=str, help='URL of a zip file') | |
parser.add_argument('filename', metavar='FILENAME', type=str, nargs='?', | |
help='filename within the zip file') | |
args = parser.parse_args() | |
z = ZipHttp(_HttpIO(args.url)) | |
if args.filename: | |
f = z.open_nested(args.filename) | |
while True: | |
chunk = f.read1(-1) | |
if len(chunk) == 0: break | |
sys.stdout.buffer.write(chunk) | |
sys.stdout.buffer.flush() | |
else: | |
# if no filename given, list the contents | |
for _, zi, parents in z.infolist_nested(): | |
list_zipinfo(zi, parents) | |
print(f'heads={z.httpio._heads} gets={z.httpio._gets} bytes_read={z.httpio._total}', file=sys.stderr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment