Last active
October 31, 2023 21:22
-
-
Save surenkov/bc66d3049b0cda340ed487832051fad6 to your computer and use it in GitHub Desktop.
Gzip-compressed binary stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import gzip | |
import io | |
import typing as ty | |
if ty.TYPE_CHECKING: | |
from collections.abc import Iterable, Iterator | |
DEFAULT_BLOCK_SIZE = 512 * 1024 # 512 KiB | |
class GzipCompressedStream(io.RawIOBase): | |
"""A stream that compresses iterables of text/binary data on the fly. | |
This class is a wrapper around `gzip.GzipFile` that allows to compress iterables of text/binary data on the fly. | |
It is useful when you want to compress data that is too large to fit in memory. | |
Example: | |
>>> from django.http import FileResponse | |
>>> def large_view(request): | |
... large_queryset = Model.objects.values_list("source_data", flat=True) | |
... compressed_stream = GzipCompressedStream(large_queryset.iterator()) | |
... return FileResponse(compressed_stream, as_attachment=True, filename="data.gz") | |
""" | |
@ty.overload | |
def __init__( | |
self, | |
iterable: Iterable[ty.ByteString], | |
mode: ty.Literal["binary"], | |
compresslevel: int = ..., | |
block_size: int = ..., | |
): | |
... | |
@ty.overload | |
def __init__( | |
self, | |
iterable: Iterable[str], | |
mode: ty.Literal["text"] = ..., | |
compresslevel: int = ..., | |
block_size: int = ..., | |
**kwargs, | |
): | |
... | |
def __init__(self, iterable, mode="text", compresslevel=9, block_size=DEFAULT_BLOCK_SIZE, **kwargs): | |
"""Initialize a GzipCompressedStream. | |
:param iterable: An iterable of text/binary data to compress. | |
:param mode: The mode in which the stream is opened. Can be either "text" or "binary". | |
:param compresslevel: The compression level to use. Must be an integer in the range 0-9. | |
:param block_size: The block size to use for buffering, in bytes. Must be a positive integer. | |
:param kwargs: Additional keyword arguments to pass to `io.TextIOWrapper`, if `mode` is "text". | |
""" | |
self._iterable: Iterator = iter(iterable) | |
self._block_size: int = block_size | |
self._bytes_read: int = 0 | |
self._buffer = io.BytesIO() | |
self._compressor = gzip.GzipFile(None, mode="wb", compresslevel=compresslevel, fileobj=self._buffer) | |
# For efficiency, we want to buffer the compressor writes in `block_size` bytes. | |
self._buffered_compressor: io.RawIOBase = io.BufferedWriter(self._compressor, block_size) # type: ignore | |
if mode == "text": | |
kwargs.update(write_through=True) | |
self._buffered_compressor: io.TextIOBase = io.TextIOWrapper(self._buffered_compressor, **kwargs) | |
def read(self, size: int = -1) -> bytes: | |
if self._compressor.closed and not self._buffer.closed: | |
return self._read_from_internal_buffer(size) | |
if size < 0: | |
size = io.DEFAULT_BUFFER_SIZE | |
if self._buffer.tell() < size: | |
self._fill_internal_buffer(size) | |
return self._read_from_internal_buffer(size) | |
def readinto(self, buffer: bytearray) -> int: | |
if chunk := self.read(len(buffer)): | |
buffer[: len(chunk)] = chunk | |
return len(chunk) | |
def readable(self) -> bool: | |
return True | |
def writable(self) -> bool: | |
return False | |
def write(self, b: bytes) -> int: | |
raise io.UnsupportedOperation("write") | |
def seekable(self) -> bool: | |
return False | |
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: | |
"""Seek is not supported, since the internal buffer is flushed on every read.""" | |
raise io.UnsupportedOperation("seek") | |
def tell(self) -> int: | |
return self._bytes_read | |
def close(self) -> None: | |
self._buffered_compressor.close() | |
self._buffer.close() | |
def flush(self) -> None: | |
# Firts, flush the buffered writer to the compressor. | |
self._buffered_compressor.flush() | |
# Then, flush the compressor to the buffer. | |
self._compressor.flush() | |
@property | |
def closed(self) -> bool: | |
return self._compressor.closed and self._buffer.closed | |
def _fill_internal_buffer(self, size: int): | |
# We want to buffer at least one block of data before compressing it. | |
target_block_size = max(size, self._block_size) - 1 | |
target_block_size = (target_block_size // self._block_size + 1) * self._block_size | |
iterable = self._iterable | |
compressor = self._buffered_compressor | |
raw_bytes_written = 0 | |
while raw_bytes_written < target_block_size: | |
try: | |
line = next(iterable) | |
# Writing to buffered compressor ensures we're first buffering the data | |
# to at least one block before compressing it. | |
raw_bytes_written += compressor.write(line) | |
except StopIteration: | |
# Also closes the `self._compressor` and flushes it to the buffer. | |
compressor.close() | |
break | |
else: | |
self.flush() # Flush the compressor to the buffer. | |
def _read_from_internal_buffer(self, size: int) -> bytes: | |
"""Reads `size` bytes from the internal buffer and returns them.""" | |
buffer = self._buffer | |
write_pos = buffer.tell() | |
buffer.seek(0) | |
value = buffer.read(size) | |
read_pos = buffer.tell() | |
if read_pos > 0: | |
# Move the remaining data to the beginning of the buffer. | |
with buffer.getbuffer() as buf: | |
remaining_size = write_pos - read_pos | |
buf[:remaining_size] = buf[read_pos:write_pos] | |
buffer.seek(remaining_size) | |
buffer.truncate() | |
self._bytes_read += read_pos | |
return value |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment