Created
May 18, 2014 20:56
-
-
Save basilfx/e8b9797906962e764f3e to your computer and use it in GitHub Desktop.
Gevent download, cache and stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import gevent | |
def download_and_cache(remote_fd, target_file, on_error=None, on_completed=None): | |
""" | |
""" | |
temp_file = "%s.temp" % target_file | |
def _inner(): | |
exhausted = False | |
bytes_read = 0 | |
try: | |
with open(temp_file, "wb") as local_fd: | |
try: | |
while True: | |
data = remote_fd.read(8192) | |
if not data: | |
exhausted = True | |
break | |
local_fd.write(data) | |
bytes_read += len(data) | |
# Yield in the form of (start_index, stop_index, data) | |
yield bytes_read - len(data), bytes_read, data | |
finally: | |
# Make sure the remaining bytes are read from fremote and | |
# written to disk. | |
if not exhausted: | |
while True: | |
data = remote_fd.read(8192) | |
if not data: | |
break | |
local_fd.write(data) | |
# Move the temp file to the target file. | |
shutil.move(temp_file, target_file) | |
# Invoke completed callback | |
if on_completed: | |
on_completed(target_file) | |
except: | |
# Invoke error callback | |
if on_error: | |
on_error(target_file) | |
# Re-raise exception | |
raise | |
return _inner | |
def stream_and_download(remote_fd, target_file, start=None, stop=None): | |
""" | |
""" | |
data_buffer = gevent.queue.Queue() | |
iterator = download_and_cache(remote_fd, target_file) | |
def _downloader(): | |
for data_start, data_stop, data in iterator(): | |
put = False | |
if start and (start >= data_start and start < data_stop): | |
put = True | |
elif stop and (stop >= data_start and stop < data_stop): | |
put = True | |
elif not start and not stop: | |
put = True | |
if put: | |
data_buffer.put((data_start, data_stop, data)) | |
# Make sure the streamer stops | |
data_buffer.put(StopIteration) | |
def _streamer(): | |
# Spawn the download greenlet | |
greenlet = gevent.spawn(_downloader) | |
try: | |
for data_start, data_stop, data in data_buffer: | |
if start and (start >= data_start and start < data_stop): | |
if stop and (stop >= data_start and stop < data_stop): | |
yield data[start - data_start:stop - data_stop] | |
else: | |
yield data[start - data_start:] | |
elif stop and (stop >= data_start and stop < data_stop): | |
if start and (start >= data_start and start < data_stop): | |
yield data[start - data_start:stop - data_stop] | |
else: | |
yield data[:stop - data_stop] | |
else: | |
yield data | |
finally: | |
# Make sure the greenlet gets killed when this iterator is closed | |
greenlet.kill() | |
return _streamer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment