Created
December 19, 2025 11:55
-
-
Save Julien00859/96b2c5b34ffa31b46570b2293e4e9a67 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Part of Odoo. See LICENSE file for full copyright and licensing details. | |
| """ | |
| Utilities to work with partial zip files. | |
| The zipfile library found in the python standard library only work with | |
| full zipfile, i.e. the entire zipfile must be created/loaded in memory. | |
| There are situations where we don't want to load the entire thing in | |
| memory, e.g. to craft a zipfile out of many large file and to send it | |
| over the network. | |
| """ | |
| # https://users.cs.jmu.edu/buchhofp/forensics/formats/pkzip.html | |
| # https://pkwaredownloads.blob.core.windows.net/pkware-general/Documentation/APPNOTE-6.3.9.TXT | |
| # https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-dosdatetimetofiletime | |
| # | |
| # The structure of a zip file is as follow: | |
| # | |
| # Local File Header 1 | |
| # File data 1 | |
| # (Optional) Data Descriptor 1 | |
| # Local File Header 2 | |
| # File data 2 | |
| # (Optional) Data Descriptor 2 | |
| # ... | |
| # Local File Header n | |
| # File data n | |
| # (Optional) Data Descriptor n | |
| # | |
| # Central Directory file 1 | |
| # Central Directory file 2 | |
| # ... | |
| # Central Directory file n | |
| # | |
| # (if zip64) End Of Central Directory Zip64 | |
| # (if zip64) End Of Central Directory Locator | |
| # EndOfCentralDirectory | |
| # | |
| # Each Local File Header contains the metadata for the file that | |
| # follows (e.g. the filename). It is possible that the crc32 or | |
| # compressed file size or uncompressed file size are not yet known when | |
| # producing the local file header and will only be known once the file | |
| # data loaded. In those cases the DATA_DESCRIPTOR flag can be set and | |
| # the actual three values will be written after the data. | |
| # | |
| # Each File data can be compressed using an algorithm whoose identifier | |
| # was written in the local file header. They can also be encrypted but | |
| # this library does not support encryption. | |
| # | |
| # The central directory is found at the end of the archive, after all | |
| # files. It contains a copy of every Local File Header enhanced with the | |
| # data descriptor values and an additional pointer (numeric offset from | |
| # the beginning of the archive) where to find the local file header. | |
| # | |
| # After the central directory comes three structures that let contain | |
| # pointers (numeric offset from the beginning of the archive) where to | |
| # find the start of the Central Directory, and the start of the Zip64 | |
| # Central Directory. | |
| import dataclasses | |
| import datetime | |
| import enum | |
| import io | |
| import itertools | |
| import struct | |
| import zlib | |
| from collections.abc import Generator, Iterable, Mapping | |
| from functools import partial | |
| from pathlib import Path | |
| try: | |
| from .mimetypes import is_mimetype_textual | |
| except ImportError: | |
| def is_mimetype_textual(mimetype): | |
| return mimetype.startswith('text/') | |
| try: | |
| import bz2 | |
| except ImportError: | |
| bz2 = None | |
| try: | |
| import lzma | |
| except ImportError: | |
| lzma = None | |
| MAX_INT32 = 0xFF_FF_FF_FF # 4GiB - 1 | |
| def serialize_time_date(dt: datetime.datetime) -> tuple[int, int]: | |
| """ | |
| Serialize a python datetime into the MS-DOS format used by ZIP. | |
| The ZIP datetime are naive, there is no timezone associated with the | |
| value. This function uses the datetime as-is, be it naive or aware, | |
| UTC or not. | |
| The MS-DOS format works for dates between 1980 and 2107 (included) | |
| and has a precision down to 2 seconds (odd seconds don't exist). | |
| This function rejects dates before the range. This function | |
| serializes dates after the range, even if the date will not fit in | |
| a 16-bits unsigned integer. | |
| :param dt: A python datetime to be serialized. | |
| :returns: A 2 value tuple (time, date), to be ``struct.pack('<HH')``. | |
| :raises ValueError: When the given datetime is before 1980. | |
| """ | |
| # https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-dosdatetimetofiletime | |
| if dt.year < 1980: | |
| raise ValueError( | |
| f"cannot serialize a date before 1970: {dt}") | |
| ziptime = (dt.hour << 11) + (dt.minute << 5) + dt.second // 2 | |
| zipdate = ((dt.year - 1980) << 9) + (dt.month << 5) + dt.day | |
| return (ziptime, zipdate) | |
| def deserialize_time_date(ziptime: int, zipdate: int) -> datetime.datetime: | |
| """ | |
| Deserialize a time and date pair in the MS-DOS format used by ZIP | |
| into a python datetime. | |
| The ZIP datetime are naive, there is no timezone associated with the | |
| value. This function likewise makes no attempt localize the date and | |
| just return the deserialized naive python datetime. | |
| The MS-DOS format works for dates between 1980 and 2107 (included) | |
| and has a precision down to 2 seconds. This function makes no | |
| attempt to support dates outside that range. | |
| :param ziptime: A MS-DOS time as a 16-bits unsigned integer. | |
| :param ziptime: A MS-DOS date as a 16-bits unsigned integer. | |
| :returns: A naive python datetime, between 1/1/1980-00:00:00 and | |
| 31/12/2107-23:59:58 (included). | |
| """ | |
| # ruff: noqa: E221 | |
| # https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-dosdatetimetofiletime | |
| second = (ziptime & 0b0000000000011111) | |
| minute = (ziptime & 0b0000011111100000) >> 5 | |
| hour = (ziptime & 0b1111100000000000) >> 11 | |
| day = (zipdate & 0b0000000000011111) | |
| month = (zipdate & 0b0000000111100000) >> 5 | |
| year = (zipdate & 0b1111111000000000) >> 9 | |
| try: | |
| return datetime.datetime(1980 + year, month, day, hour, minute, second * 2) | |
| except ValueError as exc: | |
| exc.add_note(str((1980 + year, month, day, hour, minute, second * 2))) | |
| raise | |
| class OS(enum.IntEnum): | |
| """ The operating system where a zipfile was created. """ | |
| MSDOS = 0 | |
| AMIGA = 1 | |
| OPOENVMS = 2 | |
| UNIX = 3 | |
| VM_CMS = 4 | |
| ATARI_ST = 5 | |
| OS_2_HPFS = 6 | |
| MACINTOSH = 7 | |
| Z_SYSTEM = 8 | |
| CP_M = 9 | |
| NTFS = 10 | |
| MVS = 11 | |
| VSE = 12 | |
| ACORN_RISC = 13 | |
| VFAT = 14 | |
| ALTERNATE_MVS = 15 | |
| BEOS = 16 | |
| TANDEM = 17 | |
| OS_400 = 18 | |
| DARWIN = 19 | |
| class Version(enum.IntEnum): | |
| """ The version of zip needed to work with a specific file. """ | |
| DEFAULT = 20 # 2.0 | |
| ZIP64 = 45 # 4.5 | |
| UNICODE_FILENAME = 63 # 6.3 | |
| class InternalAttribute(enum.IntFlag, boundary=enum.FlagBoundary.KEEP): | |
| """ Some flags found in the central directory. """ | |
| TEXT = 1 << 0 | |
| CONTROL_FIELD_RECORDS_PRECEDE_LOGICAL_RECORDS = 1 << 2 | |
| class CompressionMethod(enum.IntEnum): | |
| """ | |
| The compression algorithm used to compress a file. | |
| All algorithms are listed in this enumeration but only deflated, | |
| bzip2 and lzma are supported. | |
| """ | |
| NO_COMPRESSION = 0 | |
| SHRUNK = 1 | |
| REDUCED_WITH_COMPRESSION_FACTOR_1 = 2 | |
| REDUCED_WITH_COMPRESSION_FACTOR_2 = 3 | |
| REDUCED_WITH_COMPRESSION_FACTOR_3 = 4 | |
| REDUCED_WITH_COMPRESSION_FACTOR_4 = 5 | |
| IMPLODED = 6 | |
| DEFLATED = 8 | |
| ENHANCED_DEFLATED = 9 | |
| PK_WARE_DCL_IMPLODED = 10 | |
| BZIP2 = 12 | |
| LZMA = 14 | |
| IBM_TERSE = 18 | |
| IBM_LZ77_Z = 19 | |
| PPMD = 98 | |
| CompressionMethod.DEFLATED.compressor = partial(zlib.compressobj, wbits=-15) | |
| CompressionMethod.DEFLATED.decompressor = partial(zlib.decompressobj, wbits=-15) | |
| if bz2: | |
| CompressionMethod.BZIP2.compressor = bz2.BZ2Compressor | |
| CompressionMethod.BZIP2.decompressor = bz2.BZ2Decompressor | |
| if lzma: | |
| CompressionMethod.LZMA.compressor = lzma.LZMACompressor | |
| CompressionMethod.LZMA.decompressor = lzma.LZMADecompressor | |
| class Flags(enum.IntFlag, boundary=enum.FlagBoundary.KEEP): | |
| """ | |
| Some flags found in the Local File Header. | |
| All specified flags are listed in this enumeration but only | |
| data descriptor and language encoding are supported. | |
| """ | |
| ENCRYPTED_FILE = 1 << 0 | |
| COMPRESSION_OPTION1 = 1 << 1 | |
| COMPRESSION_OPTION2 = 1 << 2 | |
| DATA_DESCRIPTOR = 1 << 3 | |
| ENHANCED_DEFLATION = 1 << 4 | |
| COMPRESSED_PATCHED_DATA = 1 << 5 | |
| STRONG_ENCRYPTION = 1 << 6 | |
| LANGUAGE_ENCODING = 1 << 11 # filename and comment use UTF-8 | |
| MASK_HEADER_VALUES = 1 << 13 | |
| class ExtraFieldId(enum.IntEnum): | |
| """ The numeric identifier of every extra field. """ | |
| ZIP64 = 0x0001 | |
| class _ExtraField: | |
| """ | |
| Abstract class and registry for concrete extra fields. | |
| All subclasses must implement ``extra_field_id``, ``_struct`` and | |
| ``__init__``. All subclasses are registered in the ``_registry`` | |
| using ``extra_field_id`` as entry key. | |
| Two functions: :meth:`unpack` and :meth:`pack` are provided to | |
| serialize and parse an extra field using ``_struct``. | |
| The generic :meth:`pack` works by introspecting the dataclass fields | |
| of the concrete ExtraField class. It only works when ``_struct`` | |
| maps 1-1 to the dataclass fields. In other cases, please override | |
| :meth:`pack`. | |
| """ | |
| _registry: 'Mapping[ExtraFieldId, _ExtraField]' = {} | |
| """ A registry of extra fields. Populated by :meth:`__init_subclass__`. """ | |
| extra_field_id: ExtraFieldId | |
| """ The unique numeric identifier of this extra field. """ | |
| _struct: bytes | |
| """ | |
| The structure (without the ``<HH`` prefix) used to parse and | |
| serialize the extra field. | |
| """ | |
| @classmethod | |
| def unpack(cls, data): | |
| # Don't override this method. Implement your logic inside __init__. | |
| # support unpacking with and without header | |
| if len(data) == struct.calcsize(cls._struct) + 4: | |
| # header present, unpack it | |
| extra_field_id, extra_field_size = struct.unpack('<HH', data[:4]) | |
| if extra_field_id != cls.extra_field_id: | |
| raise ValueError( | |
| f"invalid header, expected {cls.extra_field_id!r}, got {extra_field_id}") | |
| if extra_field_size != len(data): | |
| raise ValueError( | |
| f"invalid header, {extra_field_size=} but {len(data)=}") | |
| data = data[4:] | |
| return cls(*struct.unpack(cls._struct, data)) | |
| def pack(self): | |
| # It is ok to override this method. | |
| # always pack the extra field with its header | |
| return struct.pack('<HH' + self._struct.removeprefix('<'), | |
| self.extra_field_id, | |
| struct.calcsize(self._struct), | |
| *( | |
| getattr(self, field.name) | |
| for field in dataclasses.fields(self) | |
| ), | |
| ) | |
| def __init_subclass__(cls, **kwargs): | |
| super().__init_subclass__(**kwargs) | |
| other_cls = cls._registry.setdefault(cls.extra_field_id, cls) | |
| assert other_cls is cls, \ | |
| f"conflicting classes for {cls.extra_field_id!r}: {other_cls} vs {cls}" | |
| def _parse_extra_fields(data: bytes) -> dict[ExtraFieldId | int, _ExtraField | bytes]: | |
| # A field is a header with type (2 bytes), length (2 bytes) and the | |
| # actual data ({length} bytes). Parsing the extra field is delegated | |
| # to the concrete _ExtraField class for {type} (or left as bytes if | |
| # no concrete class exists). | |
| buffer = io.BytesIO(data) | |
| extra_fields = {} | |
| while buffer.tell() < len(data): | |
| id_, size = struct.unpack('<HH', buffer.read(4)) | |
| if buffer.tell() + size > len(data): | |
| raise ValueError( | |
| f"invalid header, expected {size} bytes, but only {len(data) - size} bytes remaining") | |
| extra_field = buffer.read(size) | |
| try: | |
| id_ = ExtraFieldId(id_) | |
| ExtraField = _ExtraField._registry[id_] | |
| except (KeyError, ValueError): | |
| extra_fields[id_] = extra_field | |
| else: | |
| extra_fields[id_] = ExtraField.unpack(extra_field) | |
| return extra_fields | |
| def _serialize_extra_fields( | |
| dico: dict[ExtraFieldId | int, _ExtraField | bytes], | |
| ) -> bytes: | |
| out = [] | |
| for id_, item in dico.items(): | |
| match item: | |
| case _ExtraField(): | |
| data = item.pack() | |
| case bytes(): | |
| data = item | |
| case e: | |
| raise TypeError(e) | |
| out.append(struct.pack('<HH', id_, len(data))) | |
| out.append(data) | |
| return b''.join(out) | |
| @dataclasses.dataclass | |
| class Zip64(_ExtraField): | |
| extra_field_id = ExtraFieldId.ZIP64 # noqa: RUF045 | |
| _struct = '<QQQI' | |
| original_size: int | |
| compressed_size: int | |
| offset: int | |
| disk_no: int | |
| @dataclasses.dataclass | |
| class DataDescriptor: | |
| _struct = '<4sIII' | |
| _struct_64 = '<4sIQQ' | |
| signature = b'PK\7\x08' # noqa: RUF045 | |
| crc32: int | |
| compressed_size: int | |
| uncompressed_size: int | |
| def pack(self, zip64=None): | |
| if zip64 is None: | |
| zip64 = self.uncompressed_size > 0xFF_FF_FF_FF | |
| return struct.pack(self._struct_64 if zip64 else self._struct, | |
| self.signature, | |
| self.crc32, | |
| self.compressed_size, | |
| self.uncompressed_size, | |
| ) | |
| @classmethod | |
| def unpack(cls, data, zip64=False): | |
| dd_struct = cls._struct_64 if zip64 else cls._struct | |
| if len(data) == struct.calcsize(dd_struct) - 4: | |
| data = cls.signature + data | |
| sign, crc32, csize, usize = struct.unpack(dd_struct, data) | |
| if sign != cls.signature: | |
| raise ValueError( | |
| f"invalid data descriptor, exptected {cls.signature!r}, got {sign}") | |
| return cls(crc32, csize, usize) | |
| @dataclasses.dataclass | |
| class LocalFileHeader: | |
| _struct = '<4sHHHHHIIIHH' | |
| _length = struct.calcsize(_struct) | |
| signature = b'PK\3\4' # noqa: RUF045 | |
| version: int | |
| flags: Flags | |
| compression: CompressionMethod | |
| modification: datetime.datetime | |
| crc32: int | |
| compressed_size: int | |
| uncompressed_size: int | |
| filename: str | |
| extra_fields: dict[ExtraFieldId | int, _ExtraField | bytes] | |
| def pack(self, *, encoding='ascii'): | |
| filename = self.filename.encode( | |
| 'utf-8' if self.flags & Flags.UNICODE_FILENAME else encoding) | |
| extra_fields = _serialize_extra_fields(self.extra_fields) | |
| return struct.pack(self._struct, | |
| self.signature, | |
| self.version, | |
| self.flags, | |
| self.compression, | |
| *serialize_time_date(self.modification), | |
| self.crc32, | |
| self.compressed_size, | |
| self.uncompressed_size, | |
| len(filename), | |
| len(extra_fields), | |
| ) + filename + extra_fields | |
| @classmethod | |
| def unpack(cls, file_header, *, encoding='ascii'): | |
| buffer = io.BytesIO(file_header) | |
| ( | |
| signature, | |
| version, | |
| flags, | |
| compression, | |
| modtime, | |
| moddate, | |
| crc32, | |
| compressed_size, | |
| uncompressed_size, | |
| filename_len, | |
| extra_fields_len, | |
| ) = struct.unpack(cls._struct, buffer.read(cls._length)) | |
| if signature != cls.signature: | |
| raise ValueError( | |
| f"invalid header, expected {cls.signature}, got {signature}") | |
| if len(file_header) != (cls._length + filename_len + extra_fields_len): | |
| raise ValueError( | |
| f"invalid header, expected {cls._length+filename_len+extra_fields_len=} bytes where cls._length=struct.calcsize({cls._struct})={cls._length} {filename_len=} {extra_fields_len=}, got {len(file_header)=} bytes") # noqa: E226 | |
| flags = Flags(flags) | |
| return cls( | |
| version=version, | |
| flags=flags, | |
| compression=CompressionMethod(compression), | |
| modification=deserialize_time_date(modtime, moddate), | |
| crc32=crc32, | |
| compressed_size=compressed_size, | |
| uncompressed_size=uncompressed_size, | |
| filename=buffer.read(filename_len).decode( | |
| 'utf-8' if flags & Flags.UNICODE_FILENAME else encoding), | |
| extra_fields=_parse_extra_fields(buffer.read(extra_fields_len)), | |
| ) | |
| @dataclasses.dataclass | |
| class CentralDirectoryFileHeader: | |
| _struct = '<4sBBHHHHHIIIHHHHHII' | |
| _length = struct.calcsize(_struct) | |
| signature = b'PK\1\2' # noqa: RUF045 | |
| version_os: OS | |
| version_zip: Version | |
| version_needed: Version | |
| flags: Flags | |
| compression: CompressionMethod | |
| modification: datetime.datetime | |
| crc32: int | |
| compressed_size: int | |
| uncompressed_size: int | |
| disk_no: int | |
| internal_attribute: int | |
| external_attribute: int | |
| local_header_offset: int | |
| filename: str | |
| extra_fields: dict[ExtraFieldId | int, _ExtraField | bytes] | |
| comment: str | |
| def pack(self, *, encoding='ascii'): | |
| filename = self.filename.encode( | |
| 'utf-8' if self.flags & Flags.UNICODE_FILENAME else encoding) | |
| comment = self.comment.encode( | |
| 'utf-8' if self.flags & Flags.UNICODE_FILENAME else encoding) | |
| extra_fields = _serialize_extra_fields(self.extra_fields) | |
| return struct.pack(self._struct, | |
| self.signature, | |
| self.version_os, | |
| self.version_zip, | |
| self.version_needed, | |
| self.flags, | |
| self.compression, | |
| *serialize_time_date(self.modification), | |
| self.crc32, | |
| self.compressed_size, | |
| self.uncompressed_size, | |
| len(filename), | |
| len(extra_fields), | |
| len(comment), | |
| self.disk_no, | |
| self.internal_attribute, | |
| self.external_attribute, | |
| self.local_header_offset, | |
| ) + filename + extra_fields + comment | |
| @classmethod | |
| def unpack(cls, cd_file_header, *, encoding='ascii'): | |
| buffer = io.BytesIO(cd_file_header) | |
| ( | |
| signature, | |
| version_os, | |
| version_zip, | |
| version_needed, | |
| flags, | |
| compression, | |
| modtime, | |
| moddate, | |
| crc32, | |
| compressed_size, | |
| uncompressed_size, | |
| filename_len, | |
| extra_fields_len, | |
| comment_len, | |
| disk_no, | |
| internal_attribute, | |
| external_attribute, | |
| local_header_offset, | |
| ) = struct.unpack(cls._struct, buffer.read(cls._length)) | |
| if signature != cls.signature: | |
| raise ValueError( | |
| f"invalid header, expected {cls.signature}, got {signature}") | |
| if len(cd_file_header) != (cls._length + filename_len + extra_fields_len + comment_len): | |
| raise ValueError( | |
| f"invalid header, expected {cls._length+filename_len+extra_fields_len+comment_len=} bytes where cls._length=struct.calcsize({cls._struct})={cls._length} {filename_len=} {extra_fields_len=} {comment_len=}, got {len(cd_file_header)=} bytes") # noqa: E226 | |
| flags = Flags(flags) | |
| return cls( | |
| version_os=OS(version_os), | |
| version_zip=Version(version_zip), | |
| version_needed=Version(version_needed), | |
| flags=flags, | |
| compression=CompressionMethod(compression), | |
| modification=deserialize_time_date(modtime, moddate), | |
| crc32=crc32, | |
| compressed_size=compressed_size, | |
| uncompressed_size=uncompressed_size, | |
| disk_no=disk_no, | |
| internal_attribute=internal_attribute, | |
| external_attribute=external_attribute, | |
| local_header_offset=local_header_offset, | |
| filename=buffer.read(filename_len).decode( | |
| 'utf-8' if flags & Flags.UNICODE_FILENAME else encoding), | |
| extra_fields=_parse_extra_fields(buffer.read(extra_fields_len)), | |
| comment=buffer.read(comment_len).decode( | |
| 'utf-8' if flags & Flags.UNICODE_FILENAME else encoding), | |
| ) | |
| @dataclasses.dataclass | |
| class EndOfCentralDirectory64: | |
| _struct = '<4sQBBHIIQQQQ' | |
| _length = struct.calcsize(_struct) | |
| _length_header = struct.calcsize('<4sQ') | |
| signature = b'PK\6\6' # noqa: RUF045 | |
| version_os: OS | |
| version_zip: Version | |
| version_needed: Version | |
| disk_no: int | |
| central_directory_disk_no: int | |
| central_directory_disk_entries_count: int | |
| central_directory_total_entries_count: int | |
| central_directory_size: int | |
| central_directory_offset: int | |
| comment: str | |
| @classmethod | |
| def unpack(cls, eocd, encoding='utf-8'): | |
| signature, size, *fields = struct.unpack(cls._struct, eocd[:cls._length]) | |
| if signature != cls.signature: | |
| raise ValueError( | |
| f"invalid header, expected {cls.signature}, got {signature}") | |
| if len(eocd) != size + cls._length_header: | |
| raise ValueError( | |
| f"invalid header, expected {size} bytes, got {len(eocd)=} bytes") | |
| comment = eocd[cls._length:].decode(encoding) | |
| return cls(*fields, comment) | |
| def pack(self, encoding='utf-8'): | |
| return struct.pack(self._struct, | |
| self.signature, | |
| self._length + len(self.comment) - self._length_header, | |
| self.version_os, | |
| self.version_needed, | |
| self.disk_no, | |
| self.central_directory_disk_no, | |
| self.central_directory_disk_entries_count, | |
| self.central_directory_total_entries_count, | |
| self.central_directory_size, | |
| self.central_directory_offset, | |
| ) + self.comment.encode(encoding) | |
| @dataclasses.dataclass | |
| class EndOfCentralDirectoryLocator: | |
| _struct = '<4sIQI' | |
| _length = struct.calcsize(_struct) | |
| signature = b'PK\6\7' # noqa: RUF045 | |
| central_directory64_disk_no: int | |
| central_directory64_offset: int | |
| total_disk_count: int | |
| @classmethod | |
| def unpack(cls, eocdl): | |
| signature, *fields = struct.unpack(cls._struct, eocdl) | |
| if signature != cls.signature: | |
| pass | |
| return cls(*fields) | |
| def pack(self): | |
| return struct.pack(self.struct, | |
| self.signature, | |
| self.central_directory64_disk_no, | |
| self.central_directory64_offset, | |
| self.total_disk_count, | |
| ) | |
| @dataclasses.dataclass | |
| class EndOfCentralDirectory: | |
| _struct = '<4sHHHHIIH' | |
| _length = struct.calcsize(_struct) | |
| signature = b'PK\5\6' # noqa: RUF045 | |
| disk_no: int | |
| central_directory_disk_no: int | |
| central_directory_disk_entries_count: int | |
| central_directory_total_entries_count: int | |
| central_directory_size: int | |
| central_directory_offset: int | |
| comment: bytes | |
| @classmethod | |
| def unpack(cls, eocd): | |
| signature, *fields, comment_len = struct.unpack(cls._struct, eocd[:cls._length]) | |
| if signature != cls.signature: | |
| raise ValueError( | |
| f"invalid header, expected {cls.signature}, got {signature}") | |
| if len(eocd) != (cls._length + comment_len): | |
| raise ValueError( | |
| f"invalid header, expected {cls._length+comment_len=} bytes where cls._length=struct.calcsize({cls._struct})={cls._length} {comment_len=}, got {len(eocd)=} bytes") # noqa: E226 | |
| return cls(*fields, eocd[cls._length:]) | |
| def pack(self): | |
| return struct.pack(self._struct, | |
| self.signature, | |
| self.disk_no, | |
| self.central_directory_disk_no, | |
| self.central_directory_disk_entries_count, | |
| self.central_directory_total_entries_count, | |
| self.central_directory_size, | |
| self.central_directory_offset, | |
| len(self.comment), | |
| ) + self.comment | |
| class _DirectoryItem: | |
| def __init__(self, attachment, offset): | |
| self.attachment = attachment | |
| self.offset = offset | |
| self.local_file_header = None | |
| self.data_descriptor = None | |
| def __iter__(self): | |
| return iter(( | |
| self.attachment, | |
| self.offset, | |
| self.local_file_header, | |
| self.data_descriptor, | |
| )) | |
| class _Flat(Mapping): | |
| def __contains__(self, item): | |
| return True | |
| def __getitem__(self, item): | |
| return '' | |
| def __iter__(self): | |
| raise NotImplementedError | |
| def __len__(self): | |
| raise NotImplementedError | |
| def _read_file(local_file_header, _read_buffer): | |
| zip64 = local_file_header.extra_fields.get(ExtraFieldId.ZIP64) | |
| csize = (zip64 or local_file_header).compressed_size | |
| expected_usize = (zip64 or local_file_header).uncompressed_size | |
| if local_file_header.compression: | |
| decompressor = local_file_header.compression.decompressor() | |
| crc32 = 0 | |
| usize = 0 | |
| d, m = divmod(csize, io.DEFAULT_BUFFER_SIZE) | |
| for i in range(d + bool(m)): | |
| chunk_size = m if i == d else io.DEFAULT_BUFFER_SIZE | |
| csize += chunk_size | |
| chunk = _read_buffer(chunk_size) | |
| if len(chunk) != chunk_size: | |
| raise ValueError( # noqa: TRY301 | |
| "unexpected end of file") # noqa: EM101 | |
| if local_file_header.compression: | |
| chunk = decompressor.decompress(chunk) | |
| crc32 = zlib.crc32(chunk, crc32) | |
| usize += len(chunk) | |
| yield chunk | |
| if local_file_header.compression: | |
| # flush the decompressor | |
| chunk = decompressor.decompress(b'') | |
| if hasattr(decompressor, 'flush'): | |
| chunk += decompressor.flush() | |
| if not decompressor.eof: | |
| raise ValueError( # noqa: TRY301 | |
| f"expected end of file, but found {len(decompressor.unused_data)} remaining bytes: {decompressor.unused_data}") | |
| if chunk: | |
| crc32 = zlib.crc32(chunk, crc32) | |
| usize += len(chunk) | |
| yield chunk | |
| if usize != expected_usize: | |
| raise ValueError( # noqa: TRY301 | |
| f"invalid uncompressed size: {usize=} != {expected_usize=}") | |
| if crc32 != local_file_header.crc32: | |
| raise ValueError( # noqa: TRY301 | |
| f"invalid crc32: {crc32=} != {local_file_header.crc32=}") | |
| yield b'' # empty byte to signal end of file | |
| def _read_until_next_file(local_file_header, _read_buffer, _read_into): # noqa: RET503 | |
| if local_file_header.filename == b'docProps/core.xml': | |
| pass | |
| zip64 = ExtraFieldId.ZIP64 in local_file_header.extra_fields | |
| dd_sign = DataDescriptor.signature | |
| dd_struct = '<IQQ' if zip64 else '<III' | |
| dd_length = struct.calcsize(dd_struct) | |
| crc32 = csize = usize = 0 | |
| if local_file_header.compression: | |
| dsor = local_file_header.compression.decompressor() | |
| buffer = bytearray(_read_buffer(io.DEFAULT_BUFFER_SIZE)) | |
| def flush(length): | |
| nonlocal buffer, crc32, csize, usize | |
| if dsor: | |
| with memoryview(buffer)[:length] as mv: | |
| udata = dsor.decompress(mv) | |
| else: | |
| udata = buffer[:length] | |
| if length: | |
| buffer[:-length] = buffer[length:] | |
| csize += length | |
| usize += len(udata) | |
| crc32 = zlib.crc32(udata, crc32) | |
| if udata: | |
| yield udata | |
| bytes_read = 0 | |
| while bytes_read < length % len(buffer): | |
| with memoryview(buffer)[-length + bytes_read:] as mv: | |
| bytes_read_ = _read_into(mv) | |
| if not bytes_read_: | |
| buffer = buffer[:-length + bytes_read] | |
| break | |
| bytes_read += bytes_read_ | |
| while True: | |
| foundpk = buffer.find(b'PK') | |
| if foundpk == -1: | |
| assert len(buffer) > dd_length + len(dd_sign) | |
| yield from flush(len(buffer) - dd_length - len(dd_sign)) | |
| continue | |
| for signature in ( | |
| LocalFileHeader.signature, | |
| CentralDirectoryFileHeader.signature, | |
| ): | |
| found = buffer.find(signature, foundpk) | |
| if found != -1: | |
| break | |
| else: | |
| assert len(buffer) > dd_length + len(dd_sign) | |
| yield from flush(len(buffer) - dd_length - len(dd_sign)) | |
| continue | |
| expected_crc32, expected_csize, expected_usize = ( | |
| struct.unpack(dd_struct, buffer[found - dd_length:found])) | |
| if (length := found - dd_length) == expected_csize - csize: | |
| # dd signature absent | |
| yield from flush(length) | |
| if usize != expected_usize or crc32 != expected_crc32: | |
| yield from flush(dd_length + 4) # len(b'PK\3\4') | |
| continue | |
| buffer = memoryview(buffer)[dd_length:] | |
| elif ( | |
| (length := found - len(dd_sign) - dd_length) == expected_csize - csize | |
| and buffer.startswith(dd_sign, length) | |
| ): | |
| # dd signature absent | |
| yield from flush(length) | |
| if usize != expected_usize or crc32 != expected_crc32: | |
| yield from flush(len(dd_sign) + dd_length + 4) # len(b'PK\3\4') | |
| continue | |
| buffer = memoryview(buffer)[len(dd_sign) + dd_length:] | |
| else: | |
| yield from flush(found + 4) # len(b'PK\3\4') | |
| continue | |
| if dsor: | |
| yield from flush(0) | |
| local_file_header.crc32 = crc32 | |
| local_file_header.compressed_size = csize | |
| local_file_header.uncompressed_size = usize | |
| yield b'' # signal end of file | |
| return buffer # leftover to reinject | |
| def extract(zipfile: io.FileIO) -> Generator[LocalFileHeader | bytes]: | |
| buffer = None | |
| fileno = 0 | |
| header = bytearray(LocalFileHeader._length) | |
| def _read_buffer(n): | |
| nonlocal buffer | |
| if buffer is None: | |
| return zipfile.read(n) | |
| chunk = buffer.read(n) | |
| if len(chunk) < n: | |
| buffer = None | |
| chunk += zipfile.read(n - len(chunk)) | |
| return chunk | |
| def _read_into(buff): | |
| nonlocal buffer | |
| if buffer is None: | |
| return zipfile.readinto(buff) | |
| bytes_read = buffer.readinto(buff) | |
| if bytes_read < len(buff): | |
| buffer = None | |
| bytes_read += zipfile.readinto(memoryview(buff)[bytes_read:]) | |
| return bytes_read | |
| def reinject(data): | |
| nonlocal buffer | |
| if buffer is None: | |
| buffer = io.BytesIO(leftover) | |
| else: | |
| buffer = io.BytesIO(buffer.read() + leftover) | |
| try: | |
| while True: | |
| fileno += 1 | |
| local_file_header = None | |
| if (_read_into(header) != LocalFileHeader._length | |
| or not header.startswith(LocalFileHeader.signature)): | |
| break | |
| *_, filename_len, extra_fields_len = struct.unpack(LocalFileHeader._struct, header) | |
| local_file_header = LocalFileHeader.unpack( | |
| header + _read_buffer(filename_len + extra_fields_len)) | |
| yield local_file_header | |
| if local_file_header.flags & Flags.DATA_DESCRIPTOR: | |
| leftover = yield from _read_until_next_file(local_file_header, _read_buffer, _read_into) | |
| reinject(leftover) | |
| else: | |
| yield from _read_file(local_file_header, _read_buffer) | |
| except Exception as exc: | |
| e = f"while reading file #{fileno} close to offset {zipfile.tell()}, " | |
| if local_file_header: | |
| e += f"file header was: {local_file_header}" | |
| else: | |
| e += "couldn't read file header" | |
| exc.add_note(e) | |
| raise | |
| def helper(zstream): | |
| while True: | |
| try: | |
| local_file = next(zstream) | |
| if local_file == b'': | |
| print("there's a b'' too much!") | |
| continue | |
| except StopIteration: | |
| break | |
| data = b''.join(itertools.takewhile(b''.__ne__, zstream)) | |
| yield local_file, data | |
| def main(): | |
| # ruff: noqa: PLC0415, T201 | |
| import sys | |
| import time | |
| if len(sys.argv) != 3 or '-h' in sys.argv or '--help' in sys.argv: | |
| sys.exit(f"usage: {sys.argv[0]} <Compress|eXtract> <file>") | |
| _, mode, filename = sys.argv | |
| if mode.casefold() in ('c', 'compress'): | |
| sys.exit("not supported") | |
| elif mode.casefold() in ('x', 'extract'): | |
| with open(filename, 'rb') as file: | |
| zstream = extract(file) | |
| while True: | |
| start = time.time() | |
| try: | |
| file = next(zstream) | |
| except StopIteration: | |
| break | |
| datalen = sum(len(chunk) for chunk in itertools.takewhile(b''.__ne__, zstream)) | |
| stop = time.time() | |
| print(file) | |
| print(datalen, "bytes", round(stop - start, 6), "seconds") | |
| else: | |
| sys.exit(f"usage: {sys.argv[0]} <Compress|eXtract> <file>") | |
| if __name__ == '__main__': | |
| # ruff: noqa: PLC0415, T201 | |
| import linecache | |
| import tracemalloc | |
| def display_top(snapshot, key_type='lineno', limit=10): | |
| snapshot = snapshot.filter_traces(( | |
| tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), | |
| tracemalloc.Filter(False, "<unknown>"), | |
| )) | |
| top_stats = snapshot.statistics(key_type, cumulative=True) | |
| print("\nMemory Summary -- Top %s lines" % limit) | |
| for index, stat in enumerate(top_stats[:limit], 1): | |
| frame = stat.traceback[0] | |
| print("#%s: %s:%s: %.1f KiB" | |
| % (index, frame.filename, frame.lineno, stat.size / 1024)) | |
| line = linecache.getline(frame.filename, frame.lineno).strip() | |
| if line: | |
| print(' %s' % line) | |
| other = top_stats[limit:] | |
| if other: | |
| size = sum(stat.size for stat in other) | |
| print("%s other: %.1f KiB" % (len(other), size / 1024)) | |
| total = sum(stat.size for stat in top_stats) | |
| print("Total allocated size: %.1f KiB" % (total / 1024)) | |
| tracemalloc.start() | |
| main() | |
| snapshot = tracemalloc.take_snapshot() | |
| tracemalloc.stop() | |
| display_top(snapshot) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment