Skip to content

Instantly share code, notes, and snippets.

@basilfx
Created May 18, 2014 20:56
Show Gist options
  • Save basilfx/e8b9797906962e764f3e to your computer and use it in GitHub Desktop.
Save basilfx/e8b9797906962e764f3e to your computer and use it in GitHub Desktop.
Gevent download, cache and stream
import os
import shutil
import gevent
def download_and_cache(remote_fd, target_file, on_error=None, on_completed=None):
"""
"""
temp_file = "%s.temp" % target_file
def _inner():
exhausted = False
bytes_read = 0
try:
with open(temp_file, "wb") as local_fd:
try:
while True:
data = remote_fd.read(8192)
if not data:
exhausted = True
break
local_fd.write(data)
bytes_read += len(data)
# Yield in the form of (start_index, stop_index, data)
yield bytes_read - len(data), bytes_read, data
finally:
# Make sure the remaining bytes are read from fremote and
# written to disk.
if not exhausted:
while True:
data = remote_fd.read(8192)
if not data:
break
local_fd.write(data)
# Move the temp file to the target file.
shutil.move(temp_file, target_file)
# Invoke completed callback
if on_completed:
on_completed(target_file)
except:
# Invoke error callback
if on_error:
on_error(target_file)
# Re-raise exception
raise
return _inner
def stream_and_download(remote_fd, target_file, start=None, stop=None):
"""
"""
data_buffer = gevent.queue.Queue()
iterator = download_and_cache(remote_fd, target_file)
def _downloader():
for data_start, data_stop, data in iterator():
put = False
if start and (start >= data_start and start < data_stop):
put = True
elif stop and (stop >= data_start and stop < data_stop):
put = True
elif not start and not stop:
put = True
if put:
data_buffer.put((data_start, data_stop, data))
# Make sure the streamer stops
data_buffer.put(StopIteration)
def _streamer():
# Spawn the download greenlet
greenlet = gevent.spawn(_downloader)
try:
for data_start, data_stop, data in data_buffer:
if start and (start >= data_start and start < data_stop):
if stop and (stop >= data_start and stop < data_stop):
yield data[start - data_start:stop - data_stop]
else:
yield data[start - data_start:]
elif stop and (stop >= data_start and stop < data_stop):
if start and (start >= data_start and start < data_stop):
yield data[start - data_start:stop - data_stop]
else:
yield data[:stop - data_stop]
else:
yield data
finally:
# Make sure the greenlet gets killed when this iterator is closed
greenlet.kill()
return _streamer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment