Skip to content

Instantly share code, notes, and snippets.

@yvan-sraka
Forked from ryancdotorg/wzip.py
Created February 20, 2023 09:46
Show Gist options
  • Save yvan-sraka/963f2db6782c52316fe79dcc90f549b2 to your computer and use it in GitHub Desktop.
Save yvan-sraka/963f2db6782c52316fe79dcc90f549b2 to your computer and use it in GitHub Desktop.
Partial/streaming zip downloader
#!/usr/bin/env python3
# SPDX-License-Identifier: 0BSD or CC0-1.0 or MIT-0 or Unlicense
# Copyright (c) 2023, Ryan Castellucci, No Rights Reserved
import io, sys
import datetime
import argparse
import requests
import operator
import struct
from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA
from fnmatch import fnmatch
structFileHeader = "<4s2B4HL2L2H"
sizeFileHeader = struct.calcsize(structFileHeader)
_FH_FILENAME_LENGTH = 10
_FH_EXTRA_FIELD_LENGTH = 11
BLOCK_SIZE = 1<<18
__debug = False
def debug(*args, **kwarg):
if __debug:
if 'file' not in kwarg: kwarg['file'] = sys.stderr
print(*args, **kwarg)
class HttpError(IOError):
def __init__(self, response, message=None):
self.response = response
if message is None: self.message = f'http status {response.status_code}'
else: self.message = message
super().__init__(self.message)
# not a full implementation, just enough to use with BufferedReader
class _HttpIO(io.RawIOBase):
def __init__(self, url, *, session=None, parent=None):
if not session:
session = parent._session if parent else requests.Session()
self._session = session
self._parent = parent
self._url, self._off = url, 0
self._pos, self._total = 0, 0
self._gets, self._heads = 0, 0
self._response, self._iter, self._next = None, None, None
if not parent:
# We assume the zip file doesn't change while this tool is in use
r = self._head()
if 'bytes' not in r.headers.get('Accept-Ranges'):
raise HttpError(r, 'byte ranges not supported by server')
self._len = int(r.headers.get('Content-Length'))
def _head(self):
self._heads += 1
if self._parent: self._parent._heads += 1
r = self._session.head(self._url, allow_redirects=True)
if r.status_code != 200: raise HttpError(r)
# Update URL if there was a redirect
if r.url != self._url: self._url = r.url
return r
def _get(self, headers=None, *, stream=False):
self._gets += 1
if self._parent: self._parent._gets += 1
r = self._session.get(self._url, headers=headers, stream=stream)
if r.status_code not in (200, 206): raise HttpError(r)
return r
def _advance(self, n):
self._pos += n
self._total += n
if self._parent:
self._parent._total += n
def slice(self, offset, length=None):
hio = self.__class__(self._url, session=self._session, parent=self)
hio._off = offset
hio._len = length if length is not None else self._len - self._off
hio.seek(0)
return hio
def _stream_close(self):
debug('_stream_close')
if self._response: self._response.close()
self._response, self._iter, self._next = None, None, None
def _stream_read(self, size=-1):
debug(f'_stream_read avail={len(self._next) if self._next else 0}, requested={size}')
assert self._iter is not None
if self._next is None:
# this implies that the response iterator was exausted on its first call,
# which should not happen, but better safe than sorry
self._stream_close()
size = BLOCK_SIZE if size < 0 else min(size, BLOCK_SIZE)
return self.read(size)
elif size >= 0 and size < len(self._next):
# reading less than the available amount of data isn't fully handled
chunk = self._next[:size]
self._stream_close()
else:
chunk = self._next
# the response iterator only gets marked as exausted upon when it is asked
# to generate a chunk but has no data available - read ahead one chunk
# since the reader may stop after the last one it expects
self._next = next(self._iter, None)
if self._next is None:
self._stream_close()
self._advance(len(chunk))
return chunk
def read(self, size=-1):
return self._read(size)
def _read(self, size=-1, *, stream=False, chunk_size=BLOCK_SIZE):
if self._iter:
return self._stream_read(size)
elif size == 0:
# any empty byte string will do
return b''
elif self._off == 0 and self._pos == 0 and (size < 0 or size >= self._len):
# entire file from the begining, no range header needed
end = None
elif size < 0:
# rest of the file
end = self._off + self._len
else:
# requested range or rest of the file, whichever is less
end = min(self._off + self._len, self._pos + size) - 1
debug('read', size, self._pos - self._off, end - self._off, self._len)
headers = {}
if end is not None:
headers['Range'] = f'bytes={self._pos}-{end}'
# reading multiple sequential chunks without streaming would require a request
# to the server for each chunk, involving potentially slow network round trips
actually_stream = bool(stream and (size < 0 or size >= chunk_size))
r = self._get(headers, stream=actually_stream)
if actually_stream:
# set up response streaming
debug(f'_stream_init chunk_size={chunk_size}')
self._response = r
self._iter = r.iter_content(chunk_size)
self._next = next(self._iter, None)
# return the first chunk
return self._stream_read(size)
n = int(r.headers.get('Content-Length'))
self._advance(n)
return r.content
def readall(self):
return self.read(-1)
def readinto(self, b):
b = (memoryview(b) if not isinstance(b, memoryview) else b).cast('B')
n = len(b)
data = self.read(n)
n = len(data)
b[:n] = data
return n
def seek(self, pos, whence=0):
if whence != 0 or pos != self._pos - self._off:
debug('seek', pos, whence, self._pos - self._off)
if whence == 0: newpos = self._off + pos
elif whence == 1: newpos = self._pos + pos
elif whence == 2: newpos = self._off + self._len + pos
else: raise ValueError('invalid whence')
if self._pos != newpos and self._iter:
self._stream_close()
self._pos = newpos
return newpos
def __len__(self): return self._len
def tell(self): return self._pos - self._off
def writeable(self): return False
def seekable(self): return True
def readable(self): return True
def __getattr__(self, name):
if name in ('truncate', 'fileno', 'write'):
raise OSError(f'{name} not supported')
return None
closed = property(lambda self: False)
url = property(operator.attrgetter('_url'))
# helper class that automatically bypasses the buffer for bulk reads, and tries to
# cache the end of central directory record from the end of a zip file
class _BufferedHttpIO(io.BufferedReader):
def __init__(self, httpio, buffer_size=1024, block_size=BLOCK_SIZE):
assert isinstance(httpio, _HttpIO)
super().__init__(httpio, buffer_size)
self._buffer_size = buffer_size
self._block_size = block_size
self._unbuffered = False
self._tail = None
def read(self, size=-1):
if self.raw._len >= self._buffer_size:
pos = self.tell()
from_end = self.raw._len - pos
if from_end <= self._buffer_size:
# ZipFile does several small reads and seeks near the end of the file,
# so it's useful to cache the last buffer worth of data, since seek()
# resets the read buffer
if not self._tail:
debug('caching tail')
self.seek(-self._buffer_size, 2)
self._tail = super().read(self._buffer_size)
n = len(self._tail)
self.raw._advance(n)
else:
n = len(self._tail)
start = n - from_end
end = n if size < 0 else min(n, start+size)
chunk = self._tail[start:end]
debug('cached tail read', len(chunk))
self.seek(pos + len(chunk))
return chunk
# for larger reads, io.BufferedReader just gets in the way, so bypass it
if size < 0 or size > self._buffer_size:
if self._unbuffered:
return self.raw._read(size, stream=True, chunk_size=self._block_size)
#return self.raw.read(min(size, self._block_size))
else:
self._unbuffered = True
debug('enter unbuffered mode')
if size < 0: size = self._buffer_size
# return the buffer contents
return self.read1(size)
elif self._unbuffered:
debug('exit unbuffered mode')
# syncronize the file position
self.seek(self.raw.tell())
self._unbuffered = False
# normal buffered read
return super().read(size)
class ZipHttp(ZipFile):
def __init__(self, httpio, *args, **kwargs):
assert isinstance(httpio, _HttpIO)
self._args, self._kwargs = args, kwargs
bkw = {}
if 'buffer_size' in kwargs: bkw['buffer_size'] = kwargs.pop('buffer_size')
if 'block_size' in kwargs: bkw['block_size'] = kwargs.pop('block_size')
reader = _BufferedHttpIO(httpio, **bkw)
super().__init__(reader, *args, **kwargs)
self._httpio, self._reader = httpio, reader
def sub(self, zi):
assert zi.compress_type == ZIP_STORED
saved_position = self._reader.tell()
# get the zip member header
self._reader.seek(zi.header_offset)
fheader = self._reader.read(sizeFileHeader)
fheader = struct.unpack(structFileHeader, fheader)
self._reader.seek(saved_position)
# find the start of the actual data
skip = fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH]
data_offset = zi.header_offset + sizeFileHeader + skip
httpio = self.httpio.slice(data_offset, zi.compress_size)
return self.__class__(httpio, *self._args, **self._kwargs)
return self.httpio.slice(data_offset, zi.compress_size)
def infolist_nested(self, max_depth=1, parents=None, *, pattern=None):
parents = parents or ()
for zi in self.infolist():
yield self, zi, parents
# recursion limit
if max_depth < 0: continue
# only try to recurse STORED .zip files
if zi.compress_type != ZIP_STORED: continue
if zi.filename[-4:].lower() != '.zip': continue
# if a pattern was provided, don't recurse unless it matches
if pattern is not None and not fnmatch(zi.filename, pattern): continue
with self.sub(zi) as z2:
yield from z2.infolist_nested(max_depth - 1, parents + (zi.filename,))
def open_nested(self, name):
# try to glob the filename
for c in '[]?*':
if c not in name:
continue
# if there's no colon in the name, pat1 and sep will be empty strings
pat1, sep, pat2 = name.rpartition(':')
for z2, zi, parents in self.infolist_nested(pattern=pat1):
if not parents:
if sep or not fnmatch(zi.filename, pat2): continue
else:
if not pat1 or not fnmatch(':'.join(parents), pat1): continue
if not fnmatch(zi.filename, pat2): continue
return z2.open(zi.filename)
try:
zi = self.getinfo(name)
return self.open(name)
except KeyError as e:
oname, sep, name = name.rpartition(':')
if sep and oname[-4:].lower() != '.zip':
raise e
try: zi = self.getinfo(oname)
except: raise e
if zi.compress_type != ZIP_STORED:
raise ValueError(f'Nested zip file uses a method other than STORE!')
z2 = self.sub(zi)
return z2.open(name)
httpio = property(operator.attrgetter('_httpio'))
def list_zipinfo(zi, parents=None):
parents = parents or ()
ts = datetime.datetime(*zi.date_time).strftime('%Y-%m-%d %H:%M:%S')
name = ':'.join(parents + (zi.filename,))
method_id = zi.compress_type
if method_id == ZIP_STORED: method = 'S'
elif method_id == ZIP_DEFLATED: method = 'D'
elif method_id == ZIP_BZIP2: method = 'B'
elif method_id == ZIP_LZMA: method = 'L'
else: method = '?'
print(f'{ts} {zi.file_size:13d} {zi.compress_size:13d} {method} {name}')
parser = argparse.ArgumentParser(description='Operate on zip files over HTTP.')
parser.add_argument('url', metavar='URL', type=str, help='URL of a zip file')
parser.add_argument('filename', metavar='FILENAME', type=str, nargs='?',
help='filename within the zip file')
args = parser.parse_args()
z = ZipHttp(_HttpIO(args.url))
if args.filename:
f = z.open_nested(args.filename)
while True:
chunk = f.read1(-1)
if len(chunk) == 0: break
sys.stdout.buffer.write(chunk)
sys.stdout.buffer.flush()
else:
# if no filename given, list the contents
for _, zi, parents in z.infolist_nested():
list_zipinfo(zi, parents)
print(f'heads={z.httpio._heads} gets={z.httpio._gets} bytes_read={z.httpio._total}', file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment