Skip to content

Instantly share code, notes, and snippets.

@chiragjn
Last active April 10, 2020 04:58
Show Gist options
  • Save chiragjn/bfcf36b0f2599dfeb97d12afdf000f7f to your computer and use it in GitHub Desktop.
Save chiragjn/bfcf36b0f2599dfeb97d12afdf000f7f to your computer and use it in GitHub Desktop.
Experiment where we try to download a requested asset on a separate thread but return already loaded older asset if that is available
# Locks are tricky, maybe write with Queues ?
import concurrent.futures
import contextlib
import datetime
import threading
import time
scheduled = {}
loaded = {}
download_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=4, thread_name_prefix='download'
)
load_lock = threading.RLock()
scheduled_lock = threading.RLock()
def print2(event):
print(f"{threading.currentThread().getName()} {event}")
def download(key, t=20, schedule_mode=False): # Must release GIL somewhere
# Full execution on another thread
value = datetime.datetime.now().isoformat()
for i in range(t):
print2(f'Downloading {key}. Progress: {i} / {t}')
time.sleep(1)
with load_lock:
if schedule_mode:
with scheduled_lock:
print2("Removing reference to future")
if key in scheduled:
del scheduled[key]
print2("Load finished, writing to key!")
loaded[key] = value
return loaded[key]
def _download_and_load(key, schedule_mode=False):
# Full execution on main thread
if schedule_mode:
with scheduled_lock:
if key not in scheduled:
print2("scheduling and setting reference to future")
future = download_executor.submit(
download, key=key, schedule_mode=schedule_mode)
scheduled[key] = future
print2("returning reference to scheduled future")
return scheduled[key]
else:
return download(key, schedule_mode=schedule_mode)
def _wait_on_load(key, future: concurrent.futures.Future):
if future is not None and not future.done():
print2("Waiting on scheduled to get finished")
return future.result()
else:
print2("No longer scheduled, trying loaded")
with load_lock:
return loaded[key]
def _already_loaded(key):
with load_lock:
if key in loaded:
return loaded[key]
return None
def main_load(key, schedule_mode=False):
print2("Main load started")
if not schedule_mode:
return _download_and_load(key, schedule_mode=False)
else:
future = _download_and_load(key, schedule_mode=True)
al = _already_loaded(key)
if not al:
al = _wait_on_load(key, future)
print2(al)
return al
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(max_workers=3, thread_name_prefix='main_child') as e:
fut1 = e.submit(main_load, key='a', schedule_mode=True)
fut2 = e.submit(main_load, key='a', schedule_mode=True)
fut3 = e.submit(main_load, key='b', schedule_mode=True)
print2(fut2.result())
print2(fut1.result())
print2(fut3.result())
print2("older version exists now, so should return that")
print2(main_load(key='a', schedule_mode=True))
print2(main_load(key='b', schedule_mode=True))
print("End of main")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment