Skip to content

Instantly share code, notes, and snippets.

@Julien00859
Created December 19, 2025 11:55
Show Gist options
  • Select an option

  • Save Julien00859/96b2c5b34ffa31b46570b2293e4e9a67 to your computer and use it in GitHub Desktop.

Select an option

Save Julien00859/96b2c5b34ffa31b46570b2293e4e9a67 to your computer and use it in GitHub Desktop.
# Part of Odoo. See LICENSE file for full copyright and licensing details.
"""
Utilities to work with partial zip files.
The zipfile library found in the python standard library only work with
full zipfile, i.e. the entire zipfile must be created/loaded in memory.
There are situations where we don't want to load the entire thing in
memory, e.g. to craft a zipfile out of many large file and to send it
over the network.
"""
# https://users.cs.jmu.edu/buchhofp/forensics/formats/pkzip.html
# https://pkwaredownloads.blob.core.windows.net/pkware-general/Documentation/APPNOTE-6.3.9.TXT
# https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-dosdatetimetofiletime
#
# The structure of a zip file is as follow:
#
# Local File Header 1
# File data 1
# (Optional) Data Descriptor 1
# Local File Header 2
# File data 2
# (Optional) Data Descriptor 2
# ...
# Local File Header n
# File data n
# (Optional) Data Descriptor n
#
# Central Directory file 1
# Central Directory file 2
# ...
# Central Directory file n
#
# (if zip64) End Of Central Directory Zip64
# (if zip64) End Of Central Directory Locator
# EndOfCentralDirectory
#
# Each Local File Header contains the metadata for the file that
# follows (e.g. the filename). It is possible that the crc32 or
# compressed file size or uncompressed file size are not yet known when
# producing the local file header and will only be known once the file
# data loaded. In those cases the DATA_DESCRIPTOR flag can be set and
# the actual three values will be written after the data.
#
# Each File data can be compressed using an algorithm whoose identifier
# was written in the local file header. They can also be encrypted but
# this library does not support encryption.
#
# The central directory is found at the end of the archive, after all
# files. It contains a copy of every Local File Header enhanced with the
# data descriptor values and an additional pointer (numeric offset from
# the beginning of the archive) where to find the local file header.
#
# After the central directory comes three structures that let contain
# pointers (numeric offset from the beginning of the archive) where to
# find the start of the Central Directory, and the start of the Zip64
# Central Directory.
import dataclasses
import datetime
import enum
import io
import itertools
import struct
import zlib
from collections.abc import Generator, Iterable, Mapping
from functools import partial
from pathlib import Path
try:
from .mimetypes import is_mimetype_textual
except ImportError:
def is_mimetype_textual(mimetype):
return mimetype.startswith('text/')
try:
import bz2
except ImportError:
bz2 = None
try:
import lzma
except ImportError:
lzma = None
MAX_INT32 = 0xFF_FF_FF_FF # 4GiB - 1
def serialize_time_date(dt: datetime.datetime) -> tuple[int, int]:
"""
Serialize a python datetime into the MS-DOS format used by ZIP.
The ZIP datetime are naive, there is no timezone associated with the
value. This function uses the datetime as-is, be it naive or aware,
UTC or not.
The MS-DOS format works for dates between 1980 and 2107 (included)
and has a precision down to 2 seconds (odd seconds don't exist).
This function rejects dates before the range. This function
serializes dates after the range, even if the date will not fit in
a 16-bits unsigned integer.
:param dt: A python datetime to be serialized.
:returns: A 2 value tuple (time, date), to be ``struct.pack('<HH')``.
:raises ValueError: When the given datetime is before 1980.
"""
# https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-dosdatetimetofiletime
if dt.year < 1980:
raise ValueError(
f"cannot serialize a date before 1970: {dt}")
ziptime = (dt.hour << 11) + (dt.minute << 5) + dt.second // 2
zipdate = ((dt.year - 1980) << 9) + (dt.month << 5) + dt.day
return (ziptime, zipdate)
def deserialize_time_date(ziptime: int, zipdate: int) -> datetime.datetime:
"""
Deserialize a time and date pair in the MS-DOS format used by ZIP
into a python datetime.
The ZIP datetime are naive, there is no timezone associated with the
value. This function likewise makes no attempt localize the date and
just return the deserialized naive python datetime.
The MS-DOS format works for dates between 1980 and 2107 (included)
and has a precision down to 2 seconds. This function makes no
attempt to support dates outside that range.
:param ziptime: A MS-DOS time as a 16-bits unsigned integer.
:param ziptime: A MS-DOS date as a 16-bits unsigned integer.
:returns: A naive python datetime, between 1/1/1980-00:00:00 and
31/12/2107-23:59:58 (included).
"""
# ruff: noqa: E221
# https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-dosdatetimetofiletime
second = (ziptime & 0b0000000000011111)
minute = (ziptime & 0b0000011111100000) >> 5
hour = (ziptime & 0b1111100000000000) >> 11
day = (zipdate & 0b0000000000011111)
month = (zipdate & 0b0000000111100000) >> 5
year = (zipdate & 0b1111111000000000) >> 9
try:
return datetime.datetime(1980 + year, month, day, hour, minute, second * 2)
except ValueError as exc:
exc.add_note(str((1980 + year, month, day, hour, minute, second * 2)))
raise
class OS(enum.IntEnum):
""" The operating system where a zipfile was created. """
MSDOS = 0
AMIGA = 1
OPOENVMS = 2
UNIX = 3
VM_CMS = 4
ATARI_ST = 5
OS_2_HPFS = 6
MACINTOSH = 7
Z_SYSTEM = 8
CP_M = 9
NTFS = 10
MVS = 11
VSE = 12
ACORN_RISC = 13
VFAT = 14
ALTERNATE_MVS = 15
BEOS = 16
TANDEM = 17
OS_400 = 18
DARWIN = 19
class Version(enum.IntEnum):
""" The version of zip needed to work with a specific file. """
DEFAULT = 20 # 2.0
ZIP64 = 45 # 4.5
UNICODE_FILENAME = 63 # 6.3
class InternalAttribute(enum.IntFlag, boundary=enum.FlagBoundary.KEEP):
""" Some flags found in the central directory. """
TEXT = 1 << 0
CONTROL_FIELD_RECORDS_PRECEDE_LOGICAL_RECORDS = 1 << 2
class CompressionMethod(enum.IntEnum):
"""
The compression algorithm used to compress a file.
All algorithms are listed in this enumeration but only deflated,
bzip2 and lzma are supported.
"""
NO_COMPRESSION = 0
SHRUNK = 1
REDUCED_WITH_COMPRESSION_FACTOR_1 = 2
REDUCED_WITH_COMPRESSION_FACTOR_2 = 3
REDUCED_WITH_COMPRESSION_FACTOR_3 = 4
REDUCED_WITH_COMPRESSION_FACTOR_4 = 5
IMPLODED = 6
DEFLATED = 8
ENHANCED_DEFLATED = 9
PK_WARE_DCL_IMPLODED = 10
BZIP2 = 12
LZMA = 14
IBM_TERSE = 18
IBM_LZ77_Z = 19
PPMD = 98
CompressionMethod.DEFLATED.compressor = partial(zlib.compressobj, wbits=-15)
CompressionMethod.DEFLATED.decompressor = partial(zlib.decompressobj, wbits=-15)
if bz2:
CompressionMethod.BZIP2.compressor = bz2.BZ2Compressor
CompressionMethod.BZIP2.decompressor = bz2.BZ2Decompressor
if lzma:
CompressionMethod.LZMA.compressor = lzma.LZMACompressor
CompressionMethod.LZMA.decompressor = lzma.LZMADecompressor
class Flags(enum.IntFlag, boundary=enum.FlagBoundary.KEEP):
"""
Some flags found in the Local File Header.
All specified flags are listed in this enumeration but only
data descriptor and language encoding are supported.
"""
ENCRYPTED_FILE = 1 << 0
COMPRESSION_OPTION1 = 1 << 1
COMPRESSION_OPTION2 = 1 << 2
DATA_DESCRIPTOR = 1 << 3
ENHANCED_DEFLATION = 1 << 4
COMPRESSED_PATCHED_DATA = 1 << 5
STRONG_ENCRYPTION = 1 << 6
LANGUAGE_ENCODING = 1 << 11 # filename and comment use UTF-8
MASK_HEADER_VALUES = 1 << 13
class ExtraFieldId(enum.IntEnum):
""" The numeric identifier of every extra field. """
ZIP64 = 0x0001
class _ExtraField:
"""
Abstract class and registry for concrete extra fields.
All subclasses must implement ``extra_field_id``, ``_struct`` and
``__init__``. All subclasses are registered in the ``_registry``
using ``extra_field_id`` as entry key.
Two functions: :meth:`unpack` and :meth:`pack` are provided to
serialize and parse an extra field using ``_struct``.
The generic :meth:`pack` works by introspecting the dataclass fields
of the concrete ExtraField class. It only works when ``_struct``
maps 1-1 to the dataclass fields. In other cases, please override
:meth:`pack`.
"""
_registry: 'Mapping[ExtraFieldId, _ExtraField]' = {}
""" A registry of extra fields. Populated by :meth:`__init_subclass__`. """
extra_field_id: ExtraFieldId
""" The unique numeric identifier of this extra field. """
_struct: bytes
"""
The structure (without the ``<HH`` prefix) used to parse and
serialize the extra field.
"""
@classmethod
def unpack(cls, data):
# Don't override this method. Implement your logic inside __init__.
# support unpacking with and without header
if len(data) == struct.calcsize(cls._struct) + 4:
# header present, unpack it
extra_field_id, extra_field_size = struct.unpack('<HH', data[:4])
if extra_field_id != cls.extra_field_id:
raise ValueError(
f"invalid header, expected {cls.extra_field_id!r}, got {extra_field_id}")
if extra_field_size != len(data):
raise ValueError(
f"invalid header, {extra_field_size=} but {len(data)=}")
data = data[4:]
return cls(*struct.unpack(cls._struct, data))
def pack(self):
# It is ok to override this method.
# always pack the extra field with its header
return struct.pack('<HH' + self._struct.removeprefix('<'),
self.extra_field_id,
struct.calcsize(self._struct),
*(
getattr(self, field.name)
for field in dataclasses.fields(self)
),
)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
other_cls = cls._registry.setdefault(cls.extra_field_id, cls)
assert other_cls is cls, \
f"conflicting classes for {cls.extra_field_id!r}: {other_cls} vs {cls}"
def _parse_extra_fields(data: bytes) -> dict[ExtraFieldId | int, _ExtraField | bytes]:
# A field is a header with type (2 bytes), length (2 bytes) and the
# actual data ({length} bytes). Parsing the extra field is delegated
# to the concrete _ExtraField class for {type} (or left as bytes if
# no concrete class exists).
buffer = io.BytesIO(data)
extra_fields = {}
while buffer.tell() < len(data):
id_, size = struct.unpack('<HH', buffer.read(4))
if buffer.tell() + size > len(data):
raise ValueError(
f"invalid header, expected {size} bytes, but only {len(data) - size} bytes remaining")
extra_field = buffer.read(size)
try:
id_ = ExtraFieldId(id_)
ExtraField = _ExtraField._registry[id_]
except (KeyError, ValueError):
extra_fields[id_] = extra_field
else:
extra_fields[id_] = ExtraField.unpack(extra_field)
return extra_fields
def _serialize_extra_fields(
dico: dict[ExtraFieldId | int, _ExtraField | bytes],
) -> bytes:
out = []
for id_, item in dico.items():
match item:
case _ExtraField():
data = item.pack()
case bytes():
data = item
case e:
raise TypeError(e)
out.append(struct.pack('<HH', id_, len(data)))
out.append(data)
return b''.join(out)
@dataclasses.dataclass
class Zip64(_ExtraField):
extra_field_id = ExtraFieldId.ZIP64 # noqa: RUF045
_struct = '<QQQI'
original_size: int
compressed_size: int
offset: int
disk_no: int
@dataclasses.dataclass
class DataDescriptor:
_struct = '<4sIII'
_struct_64 = '<4sIQQ'
signature = b'PK\7\x08' # noqa: RUF045
crc32: int
compressed_size: int
uncompressed_size: int
def pack(self, zip64=None):
if zip64 is None:
zip64 = self.uncompressed_size > 0xFF_FF_FF_FF
return struct.pack(self._struct_64 if zip64 else self._struct,
self.signature,
self.crc32,
self.compressed_size,
self.uncompressed_size,
)
@classmethod
def unpack(cls, data, zip64=False):
dd_struct = cls._struct_64 if zip64 else cls._struct
if len(data) == struct.calcsize(dd_struct) - 4:
data = cls.signature + data
sign, crc32, csize, usize = struct.unpack(dd_struct, data)
if sign != cls.signature:
raise ValueError(
f"invalid data descriptor, exptected {cls.signature!r}, got {sign}")
return cls(crc32, csize, usize)
@dataclasses.dataclass
class LocalFileHeader:
_struct = '<4sHHHHHIIIHH'
_length = struct.calcsize(_struct)
signature = b'PK\3\4' # noqa: RUF045
version: int
flags: Flags
compression: CompressionMethod
modification: datetime.datetime
crc32: int
compressed_size: int
uncompressed_size: int
filename: str
extra_fields: dict[ExtraFieldId | int, _ExtraField | bytes]
def pack(self, *, encoding='ascii'):
filename = self.filename.encode(
'utf-8' if self.flags & Flags.UNICODE_FILENAME else encoding)
extra_fields = _serialize_extra_fields(self.extra_fields)
return struct.pack(self._struct,
self.signature,
self.version,
self.flags,
self.compression,
*serialize_time_date(self.modification),
self.crc32,
self.compressed_size,
self.uncompressed_size,
len(filename),
len(extra_fields),
) + filename + extra_fields
@classmethod
def unpack(cls, file_header, *, encoding='ascii'):
buffer = io.BytesIO(file_header)
(
signature,
version,
flags,
compression,
modtime,
moddate,
crc32,
compressed_size,
uncompressed_size,
filename_len,
extra_fields_len,
) = struct.unpack(cls._struct, buffer.read(cls._length))
if signature != cls.signature:
raise ValueError(
f"invalid header, expected {cls.signature}, got {signature}")
if len(file_header) != (cls._length + filename_len + extra_fields_len):
raise ValueError(
f"invalid header, expected {cls._length+filename_len+extra_fields_len=} bytes where cls._length=struct.calcsize({cls._struct})={cls._length} {filename_len=} {extra_fields_len=}, got {len(file_header)=} bytes") # noqa: E226
flags = Flags(flags)
return cls(
version=version,
flags=flags,
compression=CompressionMethod(compression),
modification=deserialize_time_date(modtime, moddate),
crc32=crc32,
compressed_size=compressed_size,
uncompressed_size=uncompressed_size,
filename=buffer.read(filename_len).decode(
'utf-8' if flags & Flags.UNICODE_FILENAME else encoding),
extra_fields=_parse_extra_fields(buffer.read(extra_fields_len)),
)
@dataclasses.dataclass
class CentralDirectoryFileHeader:
_struct = '<4sBBHHHHHIIIHHHHHII'
_length = struct.calcsize(_struct)
signature = b'PK\1\2' # noqa: RUF045
version_os: OS
version_zip: Version
version_needed: Version
flags: Flags
compression: CompressionMethod
modification: datetime.datetime
crc32: int
compressed_size: int
uncompressed_size: int
disk_no: int
internal_attribute: int
external_attribute: int
local_header_offset: int
filename: str
extra_fields: dict[ExtraFieldId | int, _ExtraField | bytes]
comment: str
def pack(self, *, encoding='ascii'):
filename = self.filename.encode(
'utf-8' if self.flags & Flags.UNICODE_FILENAME else encoding)
comment = self.comment.encode(
'utf-8' if self.flags & Flags.UNICODE_FILENAME else encoding)
extra_fields = _serialize_extra_fields(self.extra_fields)
return struct.pack(self._struct,
self.signature,
self.version_os,
self.version_zip,
self.version_needed,
self.flags,
self.compression,
*serialize_time_date(self.modification),
self.crc32,
self.compressed_size,
self.uncompressed_size,
len(filename),
len(extra_fields),
len(comment),
self.disk_no,
self.internal_attribute,
self.external_attribute,
self.local_header_offset,
) + filename + extra_fields + comment
@classmethod
def unpack(cls, cd_file_header, *, encoding='ascii'):
buffer = io.BytesIO(cd_file_header)
(
signature,
version_os,
version_zip,
version_needed,
flags,
compression,
modtime,
moddate,
crc32,
compressed_size,
uncompressed_size,
filename_len,
extra_fields_len,
comment_len,
disk_no,
internal_attribute,
external_attribute,
local_header_offset,
) = struct.unpack(cls._struct, buffer.read(cls._length))
if signature != cls.signature:
raise ValueError(
f"invalid header, expected {cls.signature}, got {signature}")
if len(cd_file_header) != (cls._length + filename_len + extra_fields_len + comment_len):
raise ValueError(
f"invalid header, expected {cls._length+filename_len+extra_fields_len+comment_len=} bytes where cls._length=struct.calcsize({cls._struct})={cls._length} {filename_len=} {extra_fields_len=} {comment_len=}, got {len(cd_file_header)=} bytes") # noqa: E226
flags = Flags(flags)
return cls(
version_os=OS(version_os),
version_zip=Version(version_zip),
version_needed=Version(version_needed),
flags=flags,
compression=CompressionMethod(compression),
modification=deserialize_time_date(modtime, moddate),
crc32=crc32,
compressed_size=compressed_size,
uncompressed_size=uncompressed_size,
disk_no=disk_no,
internal_attribute=internal_attribute,
external_attribute=external_attribute,
local_header_offset=local_header_offset,
filename=buffer.read(filename_len).decode(
'utf-8' if flags & Flags.UNICODE_FILENAME else encoding),
extra_fields=_parse_extra_fields(buffer.read(extra_fields_len)),
comment=buffer.read(comment_len).decode(
'utf-8' if flags & Flags.UNICODE_FILENAME else encoding),
)
@dataclasses.dataclass
class EndOfCentralDirectory64:
_struct = '<4sQBBHIIQQQQ'
_length = struct.calcsize(_struct)
_length_header = struct.calcsize('<4sQ')
signature = b'PK\6\6' # noqa: RUF045
version_os: OS
version_zip: Version
version_needed: Version
disk_no: int
central_directory_disk_no: int
central_directory_disk_entries_count: int
central_directory_total_entries_count: int
central_directory_size: int
central_directory_offset: int
comment: str
@classmethod
def unpack(cls, eocd, encoding='utf-8'):
signature, size, *fields = struct.unpack(cls._struct, eocd[:cls._length])
if signature != cls.signature:
raise ValueError(
f"invalid header, expected {cls.signature}, got {signature}")
if len(eocd) != size + cls._length_header:
raise ValueError(
f"invalid header, expected {size} bytes, got {len(eocd)=} bytes")
comment = eocd[cls._length:].decode(encoding)
return cls(*fields, comment)
def pack(self, encoding='utf-8'):
return struct.pack(self._struct,
self.signature,
self._length + len(self.comment) - self._length_header,
self.version_os,
self.version_needed,
self.disk_no,
self.central_directory_disk_no,
self.central_directory_disk_entries_count,
self.central_directory_total_entries_count,
self.central_directory_size,
self.central_directory_offset,
) + self.comment.encode(encoding)
@dataclasses.dataclass
class EndOfCentralDirectoryLocator:
_struct = '<4sIQI'
_length = struct.calcsize(_struct)
signature = b'PK\6\7' # noqa: RUF045
central_directory64_disk_no: int
central_directory64_offset: int
total_disk_count: int
@classmethod
def unpack(cls, eocdl):
signature, *fields = struct.unpack(cls._struct, eocdl)
if signature != cls.signature:
pass
return cls(*fields)
def pack(self):
return struct.pack(self.struct,
self.signature,
self.central_directory64_disk_no,
self.central_directory64_offset,
self.total_disk_count,
)
@dataclasses.dataclass
class EndOfCentralDirectory:
_struct = '<4sHHHHIIH'
_length = struct.calcsize(_struct)
signature = b'PK\5\6' # noqa: RUF045
disk_no: int
central_directory_disk_no: int
central_directory_disk_entries_count: int
central_directory_total_entries_count: int
central_directory_size: int
central_directory_offset: int
comment: bytes
@classmethod
def unpack(cls, eocd):
signature, *fields, comment_len = struct.unpack(cls._struct, eocd[:cls._length])
if signature != cls.signature:
raise ValueError(
f"invalid header, expected {cls.signature}, got {signature}")
if len(eocd) != (cls._length + comment_len):
raise ValueError(
f"invalid header, expected {cls._length+comment_len=} bytes where cls._length=struct.calcsize({cls._struct})={cls._length} {comment_len=}, got {len(eocd)=} bytes") # noqa: E226
return cls(*fields, eocd[cls._length:])
def pack(self):
return struct.pack(self._struct,
self.signature,
self.disk_no,
self.central_directory_disk_no,
self.central_directory_disk_entries_count,
self.central_directory_total_entries_count,
self.central_directory_size,
self.central_directory_offset,
len(self.comment),
) + self.comment
class _DirectoryItem:
def __init__(self, attachment, offset):
self.attachment = attachment
self.offset = offset
self.local_file_header = None
self.data_descriptor = None
def __iter__(self):
return iter((
self.attachment,
self.offset,
self.local_file_header,
self.data_descriptor,
))
class _Flat(Mapping):
def __contains__(self, item):
return True
def __getitem__(self, item):
return ''
def __iter__(self):
raise NotImplementedError
def __len__(self):
raise NotImplementedError
def _read_file(local_file_header, _read_buffer):
zip64 = local_file_header.extra_fields.get(ExtraFieldId.ZIP64)
csize = (zip64 or local_file_header).compressed_size
expected_usize = (zip64 or local_file_header).uncompressed_size
if local_file_header.compression:
decompressor = local_file_header.compression.decompressor()
crc32 = 0
usize = 0
d, m = divmod(csize, io.DEFAULT_BUFFER_SIZE)
for i in range(d + bool(m)):
chunk_size = m if i == d else io.DEFAULT_BUFFER_SIZE
csize += chunk_size
chunk = _read_buffer(chunk_size)
if len(chunk) != chunk_size:
raise ValueError( # noqa: TRY301
"unexpected end of file") # noqa: EM101
if local_file_header.compression:
chunk = decompressor.decompress(chunk)
crc32 = zlib.crc32(chunk, crc32)
usize += len(chunk)
yield chunk
if local_file_header.compression:
# flush the decompressor
chunk = decompressor.decompress(b'')
if hasattr(decompressor, 'flush'):
chunk += decompressor.flush()
if not decompressor.eof:
raise ValueError( # noqa: TRY301
f"expected end of file, but found {len(decompressor.unused_data)} remaining bytes: {decompressor.unused_data}")
if chunk:
crc32 = zlib.crc32(chunk, crc32)
usize += len(chunk)
yield chunk
if usize != expected_usize:
raise ValueError( # noqa: TRY301
f"invalid uncompressed size: {usize=} != {expected_usize=}")
if crc32 != local_file_header.crc32:
raise ValueError( # noqa: TRY301
f"invalid crc32: {crc32=} != {local_file_header.crc32=}")
yield b'' # empty byte to signal end of file
def _read_until_next_file(local_file_header, _read_buffer, _read_into): # noqa: RET503
if local_file_header.filename == b'docProps/core.xml':
pass
zip64 = ExtraFieldId.ZIP64 in local_file_header.extra_fields
dd_sign = DataDescriptor.signature
dd_struct = '<IQQ' if zip64 else '<III'
dd_length = struct.calcsize(dd_struct)
crc32 = csize = usize = 0
if local_file_header.compression:
dsor = local_file_header.compression.decompressor()
buffer = bytearray(_read_buffer(io.DEFAULT_BUFFER_SIZE))
def flush(length):
nonlocal buffer, crc32, csize, usize
if dsor:
with memoryview(buffer)[:length] as mv:
udata = dsor.decompress(mv)
else:
udata = buffer[:length]
if length:
buffer[:-length] = buffer[length:]
csize += length
usize += len(udata)
crc32 = zlib.crc32(udata, crc32)
if udata:
yield udata
bytes_read = 0
while bytes_read < length % len(buffer):
with memoryview(buffer)[-length + bytes_read:] as mv:
bytes_read_ = _read_into(mv)
if not bytes_read_:
buffer = buffer[:-length + bytes_read]
break
bytes_read += bytes_read_
while True:
foundpk = buffer.find(b'PK')
if foundpk == -1:
assert len(buffer) > dd_length + len(dd_sign)
yield from flush(len(buffer) - dd_length - len(dd_sign))
continue
for signature in (
LocalFileHeader.signature,
CentralDirectoryFileHeader.signature,
):
found = buffer.find(signature, foundpk)
if found != -1:
break
else:
assert len(buffer) > dd_length + len(dd_sign)
yield from flush(len(buffer) - dd_length - len(dd_sign))
continue
expected_crc32, expected_csize, expected_usize = (
struct.unpack(dd_struct, buffer[found - dd_length:found]))
if (length := found - dd_length) == expected_csize - csize:
# dd signature absent
yield from flush(length)
if usize != expected_usize or crc32 != expected_crc32:
yield from flush(dd_length + 4) # len(b'PK\3\4')
continue
buffer = memoryview(buffer)[dd_length:]
elif (
(length := found - len(dd_sign) - dd_length) == expected_csize - csize
and buffer.startswith(dd_sign, length)
):
# dd signature absent
yield from flush(length)
if usize != expected_usize or crc32 != expected_crc32:
yield from flush(len(dd_sign) + dd_length + 4) # len(b'PK\3\4')
continue
buffer = memoryview(buffer)[len(dd_sign) + dd_length:]
else:
yield from flush(found + 4) # len(b'PK\3\4')
continue
if dsor:
yield from flush(0)
local_file_header.crc32 = crc32
local_file_header.compressed_size = csize
local_file_header.uncompressed_size = usize
yield b'' # signal end of file
return buffer # leftover to reinject
def extract(zipfile: io.FileIO) -> Generator[LocalFileHeader | bytes]:
buffer = None
fileno = 0
header = bytearray(LocalFileHeader._length)
def _read_buffer(n):
nonlocal buffer
if buffer is None:
return zipfile.read(n)
chunk = buffer.read(n)
if len(chunk) < n:
buffer = None
chunk += zipfile.read(n - len(chunk))
return chunk
def _read_into(buff):
nonlocal buffer
if buffer is None:
return zipfile.readinto(buff)
bytes_read = buffer.readinto(buff)
if bytes_read < len(buff):
buffer = None
bytes_read += zipfile.readinto(memoryview(buff)[bytes_read:])
return bytes_read
def reinject(data):
nonlocal buffer
if buffer is None:
buffer = io.BytesIO(leftover)
else:
buffer = io.BytesIO(buffer.read() + leftover)
try:
while True:
fileno += 1
local_file_header = None
if (_read_into(header) != LocalFileHeader._length
or not header.startswith(LocalFileHeader.signature)):
break
*_, filename_len, extra_fields_len = struct.unpack(LocalFileHeader._struct, header)
local_file_header = LocalFileHeader.unpack(
header + _read_buffer(filename_len + extra_fields_len))
yield local_file_header
if local_file_header.flags & Flags.DATA_DESCRIPTOR:
leftover = yield from _read_until_next_file(local_file_header, _read_buffer, _read_into)
reinject(leftover)
else:
yield from _read_file(local_file_header, _read_buffer)
except Exception as exc:
e = f"while reading file #{fileno} close to offset {zipfile.tell()}, "
if local_file_header:
e += f"file header was: {local_file_header}"
else:
e += "couldn't read file header"
exc.add_note(e)
raise
def helper(zstream):
while True:
try:
local_file = next(zstream)
if local_file == b'':
print("there's a b'' too much!")
continue
except StopIteration:
break
data = b''.join(itertools.takewhile(b''.__ne__, zstream))
yield local_file, data
def main():
# ruff: noqa: PLC0415, T201
import sys
import time
if len(sys.argv) != 3 or '-h' in sys.argv or '--help' in sys.argv:
sys.exit(f"usage: {sys.argv[0]} <Compress|eXtract> <file>")
_, mode, filename = sys.argv
if mode.casefold() in ('c', 'compress'):
sys.exit("not supported")
elif mode.casefold() in ('x', 'extract'):
with open(filename, 'rb') as file:
zstream = extract(file)
while True:
start = time.time()
try:
file = next(zstream)
except StopIteration:
break
datalen = sum(len(chunk) for chunk in itertools.takewhile(b''.__ne__, zstream))
stop = time.time()
print(file)
print(datalen, "bytes", round(stop - start, 6), "seconds")
else:
sys.exit(f"usage: {sys.argv[0]} <Compress|eXtract> <file>")
if __name__ == '__main__':
# ruff: noqa: PLC0415, T201
import linecache
import tracemalloc
def display_top(snapshot, key_type='lineno', limit=10):
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
))
top_stats = snapshot.statistics(key_type, cumulative=True)
print("\nMemory Summary -- Top %s lines" % limit)
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
print("#%s: %s:%s: %.1f KiB"
% (index, frame.filename, frame.lineno, stat.size / 1024))
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
print(' %s' % line)
other = top_stats[limit:]
if other:
size = sum(stat.size for stat in other)
print("%s other: %.1f KiB" % (len(other), size / 1024))
total = sum(stat.size for stat in top_stats)
print("Total allocated size: %.1f KiB" % (total / 1024))
tracemalloc.start()
main()
snapshot = tracemalloc.take_snapshot()
tracemalloc.stop()
display_top(snapshot)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment