Skip to content

Instantly share code, notes, and snippets.

@niccokunzmann
Last active June 16, 2021 02:08
Show Gist options
  • Save niccokunzmann/9170072 to your computer and use it in GitHub Desktop.
Save niccokunzmann/9170072 to your computer and use it in GitHub Desktop.
The RecursiveThreadpoolExecutor enables the use of futures in the functions that are parallely executed.
'''
This is crated as an answer to the stackoverflow qustion:
http://stackoverflow.com/questions/21960514/can-i-use-a-processpoolexecutor-from-within-a-future
The updated code can be found here:
https://gist.github.com/niccokunzmann/9170072
The RecursiveThreadPoolExecutor allows the usage of futures in the
submitted functions.
It uses the yield and yield from statements to suspend the execution of the
function and free the worker thread to execute the recursive calls.
To make the RecursiveThreadPoolExecutor
to a RecursiveProcessPoolExecutor
The the generator_start and generator_next have to become references according to
multiprocessing.
'''
import sys
import traceback
from concurrent.futures.thread import *
from concurrent.futures.process import *
from concurrent.futures import *
from concurrent.futures._base import *
##import hanging_threads
class RecursiveThreadPoolExecutor(ThreadPoolExecutor):
def _submit(self, fn, *args, **kwargs):
return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
real_future = Future()
def exception():
ty, err, tb = sys.exc_info()
real_future.set_exception(err)
## traceback.print_exception(ty, err, tb)
def generator_start():
try:
generator = fn(*args, **kwargs)
def generator_next():
try:
try:
future = next(generator)
except StopIteration as stop:
real_future.set_result(stop.value)
else:
if future is None:
self._submit(generator_next)
else:
future.add_done_callback(lambda future: generator_next())
except:
exception()
self._submit(generator_next)
except:
exception()
self._submit(generator_start)
return real_future
def recursive_map(self, fn, *iterables, timeout=None):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
yield from fs
return fs
return result_iterator()
def f(args):
executor, tasks = args
print ('tasks:', tasks)
if type(tasks) == int:
## if tasks == 3: raise NotImplementedError() # try this out
return tasks
# waiting for all futures without blocking the thread
futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks])
return sum([future.result() for future in futures])
if __name__ == '__main__':
with RecursiveThreadPoolExecutor(max_workers = 1) as executor:
r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1)
import time
time.sleep(0.1)
for v in r:
print('v: {}'.format(v))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment