Skip to content

Instantly share code, notes, and snippets.

@kylemanna
Created May 16, 2020 00:08
Show Gist options
  • Save kylemanna/1e22bbf31b7e5ae84bbdfa32c68e03a9 to your computer and use it in GitHub Desktop.
Save kylemanna/1e22bbf31b7e5ae84bbdfa32c68e03a9 to your computer and use it in GitHub Desktop.
Falcon + zipfile streaming proof of concept v2
#!/usr/bin/env python3
#
# Falcon + zipfile streaming content using os.pipe() to minimize RAM usage to a
# pipes worth of data.
#
# This test program generated a nearly 500 MB zip file with various compression
# arguments supported by the python3 standard library.
#
# Usage for debug webserver:
# $ ./app.py
#
# Usage for client:
# $ curl -OJ http://localhost:8000/diagnostics/dump
#
import falcon
import hashlib
import os
import threading
import time
import zipfile
import subprocess
# Simulate slow input by sleeping
def feed_slow(fd):
with open(fd, 'wb') as fp:
for i in range(2):
fp.write(f"i is currently {i}\n".encode())
fp.flush()
time.sleep(1)
# Simulate monsterous input by writing random data repeatedly
def feed_large(fd, byte_len = 1000 * 1000 * 512):
chunk_sz = 4096
chunk = os.urandom(chunk_sz)
rem = byte_len
with open(fd, 'wb') as fp:
m = hashlib.sha1()
while rem > chunk_sz:
rem = rem - fp.write(chunk)
m.update(chunk)
if rem > 0:
fp.write(chunk[:rem])
m.update(chunk[:rem])
print(f'Final hash of big file: {m.hexdigest()}')
def dump_sqlite3_run(filename, write_fd):
timeout = 10
cmd = '.timeout {}\n.dump\n'.format(timeout*1000).encode()
with open(write_fd, 'wb') as fp:
subprocess.run(['sqlite3', filename], input=cmd, stdout=fp, timeout=timeout + 1)
# Do the things
def zip_things(file_paths, out_stream, compression = zipfile.ZIP_LZMA):
(read1, write1) = os.pipe()
threading.Thread(target=feed_slow, args=(write1,)).start()
(read2, write2) = os.pipe()
threading.Thread(target=feed_large, args=(write2,)).start()
(read3, write3) = os.pipe()
threading.Thread(target=dump_sqlite3_run, args=('example.db', write3,)).start()
with zipfile.ZipFile(out_stream, 'w', compression) as myzip:
# Zip simple files
[myzip.write(f) for f in file_paths]
# Note: we can pass a operating system file descriptor here and it will
# be opened instead of a filename.
myzip.write(read1, arcname='somedir/slow.txt')
myzip.write(read2, arcname='oink-oink-fat.bin')
myzip.write(read3, arcname='example_run.db')
class DebugResource:
def on_get(self, req, resp):
file_paths = [ '/etc/lsb-release', '/etc/services' ]
qs = falcon.uri.parse_query_string(req.query_string)
comp_qs = qs.get('compression', 'none')
comp_qs_lut = {
'bzip2': zipfile.ZIP_BZIP2,
'deflated': zipfile.ZIP_DEFLATED,
'lzma': zipfile.ZIP_LZMA,
}
compression = comp_qs_lut.get(comp_qs, zipfile.ZIP_STORED)
rfd, wfd = os.pipe()
zip_out = os.fdopen(rfd, 'rb')
zip_in = os.fdopen(wfd, 'wb')
# TODO confirm the threads are cleaned-up when they eixt
args=(file_paths, zip_in, compression)
threading.Thread(target=zip_things, args=args).start()
# Falcon (or a subprocess) will take the file like object from here
resp.stream = zip_out
zip_filename = f'test_{comp_qs}.zip'
resp.content_type = 'application/zip'
resp.set_headers([
('Content-Disposition', f'attachment; filename="{zip_filename}"')
])
api = falcon.API()
api.add_route('/diagnostics/dump', DebugResource())
#
# Standalone test interface
#
if __name__ == '__main__':
use_werkzeug=False
try:
import werkzeug.serving
use_werkzeug=True
except ModuleNotFoundError as e:
import wsgiref.simple_server
host = os.getenv('LISTEN_HOST', 'localhost')
port = int(os.getenv('LISTEN_PORT', 8000))
# These are run here instead of in the try/except block above because we
# want exceptions from this code to not be captured by the try/except
# import block.
if use_werkzeug:
werkzeug.serving.run_simple(host, port, api, use_reloader=True)
else:
wsgiref.simple_server.make_server(host, port, api).serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment