Skip to content

Instantly share code, notes, and snippets.

@surenkov
Last active October 31, 2023 21:22
Show Gist options
  • Save surenkov/bc66d3049b0cda340ed487832051fad6 to your computer and use it in GitHub Desktop.
Save surenkov/bc66d3049b0cda340ed487832051fad6 to your computer and use it in GitHub Desktop.
Gzip-compressed binary stream
from __future__ import annotations
import gzip
import io
import typing as ty
if ty.TYPE_CHECKING:
from collections.abc import Iterable, Iterator
DEFAULT_BLOCK_SIZE = 512 * 1024 # 512 KiB
class GzipCompressedStream(io.RawIOBase):
"""A stream that compresses iterables of text/binary data on the fly.
This class is a wrapper around `gzip.GzipFile` that allows to compress iterables of text/binary data on the fly.
It is useful when you want to compress data that is too large to fit in memory.
Example:
>>> from django.http import FileResponse
>>> def large_view(request):
... large_queryset = Model.objects.values_list("source_data", flat=True)
... compressed_stream = GzipCompressedStream(large_queryset.iterator())
... return FileResponse(compressed_stream, as_attachment=True, filename="data.gz")
"""
@ty.overload
def __init__(
self,
iterable: Iterable[ty.ByteString],
mode: ty.Literal["binary"],
compresslevel: int = ...,
block_size: int = ...,
):
...
@ty.overload
def __init__(
self,
iterable: Iterable[str],
mode: ty.Literal["text"] = ...,
compresslevel: int = ...,
block_size: int = ...,
**kwargs,
):
...
def __init__(self, iterable, mode="text", compresslevel=9, block_size=DEFAULT_BLOCK_SIZE, **kwargs):
"""Initialize a GzipCompressedStream.
:param iterable: An iterable of text/binary data to compress.
:param mode: The mode in which the stream is opened. Can be either "text" or "binary".
:param compresslevel: The compression level to use. Must be an integer in the range 0-9.
:param block_size: The block size to use for buffering, in bytes. Must be a positive integer.
:param kwargs: Additional keyword arguments to pass to `io.TextIOWrapper`, if `mode` is "text".
"""
self._iterable: Iterator = iter(iterable)
self._block_size: int = block_size
self._bytes_read: int = 0
self._buffer = io.BytesIO()
self._compressor = gzip.GzipFile(None, mode="wb", compresslevel=compresslevel, fileobj=self._buffer)
# For efficiency, we want to buffer the compressor writes in `block_size` bytes.
self._buffered_compressor: io.RawIOBase = io.BufferedWriter(self._compressor, block_size) # type: ignore
if mode == "text":
kwargs.update(write_through=True)
self._buffered_compressor: io.TextIOBase = io.TextIOWrapper(self._buffered_compressor, **kwargs)
def read(self, size: int = -1) -> bytes:
if self._compressor.closed and not self._buffer.closed:
return self._read_from_internal_buffer(size)
if size < 0:
size = io.DEFAULT_BUFFER_SIZE
if self._buffer.tell() < size:
self._fill_internal_buffer(size)
return self._read_from_internal_buffer(size)
def readinto(self, buffer: bytearray) -> int:
if chunk := self.read(len(buffer)):
buffer[: len(chunk)] = chunk
return len(chunk)
def readable(self) -> bool:
return True
def writable(self) -> bool:
return False
def write(self, b: bytes) -> int:
raise io.UnsupportedOperation("write")
def seekable(self) -> bool:
return False
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
"""Seek is not supported, since the internal buffer is flushed on every read."""
raise io.UnsupportedOperation("seek")
def tell(self) -> int:
return self._bytes_read
def close(self) -> None:
self._buffered_compressor.close()
self._buffer.close()
def flush(self) -> None:
# Firts, flush the buffered writer to the compressor.
self._buffered_compressor.flush()
# Then, flush the compressor to the buffer.
self._compressor.flush()
@property
def closed(self) -> bool:
return self._compressor.closed and self._buffer.closed
def _fill_internal_buffer(self, size: int):
# We want to buffer at least one block of data before compressing it.
target_block_size = max(size, self._block_size) - 1
target_block_size = (target_block_size // self._block_size + 1) * self._block_size
iterable = self._iterable
compressor = self._buffered_compressor
raw_bytes_written = 0
while raw_bytes_written < target_block_size:
try:
line = next(iterable)
# Writing to buffered compressor ensures we're first buffering the data
# to at least one block before compressing it.
raw_bytes_written += compressor.write(line)
except StopIteration:
# Also closes the `self._compressor` and flushes it to the buffer.
compressor.close()
break
else:
self.flush() # Flush the compressor to the buffer.
def _read_from_internal_buffer(self, size: int) -> bytes:
"""Reads `size` bytes from the internal buffer and returns them."""
buffer = self._buffer
write_pos = buffer.tell()
buffer.seek(0)
value = buffer.read(size)
read_pos = buffer.tell()
if read_pos > 0:
# Move the remaining data to the beginning of the buffer.
with buffer.getbuffer() as buf:
remaining_size = write_pos - read_pos
buf[:remaining_size] = buf[read_pos:write_pos]
buffer.seek(remaining_size)
buffer.truncate()
self._bytes_read += read_pos
return value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment