Created
December 2, 2022 05:03
-
-
Save egor83/4c818d002f5f12c8740d83271eca51a4 to your computer and use it in GitHub Desktop.
Trying to write to a compressed zip file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import RawIOBase | |
from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED | |
class UnseekableStream(RawIOBase): | |
def __init__(self): | |
self._buffer = b'' | |
def writable(self): | |
return True | |
def write(self, b): | |
if self.closed: | |
raise ValueError('Stream was closed!') | |
self._buffer += b | |
return len(b) | |
def get(self): | |
chunk = self._buffer | |
self._buffer = b'' | |
return chunk | |
def zipfile_generator(path, stream): | |
# tried setting compresslevel as well, did not work | |
with ZipFile(stream, mode='w', compression=ZIP_DEFLATED) as zf: | |
z_info = ZipInfo.from_file(path) | |
with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest: | |
for chunk in iter(lambda: entry.read(16384), b''): | |
dest.write(chunk) | |
# Yield chunk of the zip file stream in bytes. | |
yield stream.get() | |
# ZipFile was closed. | |
yield stream.get() | |
stream = UnseekableStream() | |
path = "small.csv" | |
with open("test.zip", "wb") as f: | |
for i in zipfile_generator(path, stream): | |
f.write(i) | |
stream.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment