Last active
April 10, 2020 04:58
-
-
Save chiragjn/bfcf36b0f2599dfeb97d12afdf000f7f to your computer and use it in GitHub Desktop.
Experiment where we try to download a requested asset on a separate thread but return already loaded older asset if that is available
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Locks are tricky, maybe write with Queues ? | |
import concurrent.futures | |
import contextlib | |
import datetime | |
import threading | |
import time | |
scheduled = {} | |
loaded = {} | |
download_executor = concurrent.futures.ThreadPoolExecutor( | |
max_workers=4, thread_name_prefix='download' | |
) | |
load_lock = threading.RLock() | |
scheduled_lock = threading.RLock() | |
def print2(event): | |
print(f"{threading.currentThread().getName()} {event}") | |
def download(key, t=20, schedule_mode=False): # Must release GIL somewhere | |
# Full execution on another thread | |
value = datetime.datetime.now().isoformat() | |
for i in range(t): | |
print2(f'Downloading {key}. Progress: {i} / {t}') | |
time.sleep(1) | |
with load_lock: | |
if schedule_mode: | |
with scheduled_lock: | |
print2("Removing reference to future") | |
if key in scheduled: | |
del scheduled[key] | |
print2("Load finished, writing to key!") | |
loaded[key] = value | |
return loaded[key] | |
def _download_and_load(key, schedule_mode=False): | |
# Full execution on main thread | |
if schedule_mode: | |
with scheduled_lock: | |
if key not in scheduled: | |
print2("scheduling and setting reference to future") | |
future = download_executor.submit( | |
download, key=key, schedule_mode=schedule_mode) | |
scheduled[key] = future | |
print2("returning reference to scheduled future") | |
return scheduled[key] | |
else: | |
return download(key, schedule_mode=schedule_mode) | |
def _wait_on_load(key, future: concurrent.futures.Future): | |
if future is not None and not future.done(): | |
print2("Waiting on scheduled to get finished") | |
return future.result() | |
else: | |
print2("No longer scheduled, trying loaded") | |
with load_lock: | |
return loaded[key] | |
def _already_loaded(key): | |
with load_lock: | |
if key in loaded: | |
return loaded[key] | |
return None | |
def main_load(key, schedule_mode=False): | |
print2("Main load started") | |
if not schedule_mode: | |
return _download_and_load(key, schedule_mode=False) | |
else: | |
future = _download_and_load(key, schedule_mode=True) | |
al = _already_loaded(key) | |
if not al: | |
al = _wait_on_load(key, future) | |
print2(al) | |
return al | |
if __name__ == '__main__': | |
with concurrent.futures.ThreadPoolExecutor(max_workers=3, thread_name_prefix='main_child') as e: | |
fut1 = e.submit(main_load, key='a', schedule_mode=True) | |
fut2 = e.submit(main_load, key='a', schedule_mode=True) | |
fut3 = e.submit(main_load, key='b', schedule_mode=True) | |
print2(fut2.result()) | |
print2(fut1.result()) | |
print2(fut3.result()) | |
print2("older version exists now, so should return that") | |
print2(main_load(key='a', schedule_mode=True)) | |
print2(main_load(key='b', schedule_mode=True)) | |
print("End of main") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment