Last active
June 16, 2021 02:08
-
-
Save niccokunzmann/9170072 to your computer and use it in GitHub Desktop.
The RecursiveThreadpoolExecutor enables the use of futures in the functions that are parallely executed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This is crated as an answer to the stackoverflow qustion: | |
http://stackoverflow.com/questions/21960514/can-i-use-a-processpoolexecutor-from-within-a-future | |
The updated code can be found here: | |
https://gist.github.com/niccokunzmann/9170072 | |
The RecursiveThreadPoolExecutor allows the usage of futures in the | |
submitted functions. | |
It uses the yield and yield from statements to suspend the execution of the | |
function and free the worker thread to execute the recursive calls. | |
To make the RecursiveThreadPoolExecutor | |
to a RecursiveProcessPoolExecutor | |
The the generator_start and generator_next have to become references according to | |
multiprocessing. | |
''' | |
import sys | |
import traceback | |
from concurrent.futures.thread import * | |
from concurrent.futures.process import * | |
from concurrent.futures import * | |
from concurrent.futures._base import * | |
##import hanging_threads | |
class RecursiveThreadPoolExecutor(ThreadPoolExecutor): | |
def _submit(self, fn, *args, **kwargs): | |
return ThreadPoolExecutor.submit(self, fn, *args, **kwargs) | |
def submit(self, fn, *args, **kwargs): | |
"""Submits a callable to be executed with the given arguments. | |
Schedules the callable to be executed as fn(*args, **kwargs) and returns | |
a Future instance representing the execution of the callable. | |
Returns: | |
A Future representing the given call. | |
""" | |
real_future = Future() | |
def exception(): | |
ty, err, tb = sys.exc_info() | |
real_future.set_exception(err) | |
## traceback.print_exception(ty, err, tb) | |
def generator_start(): | |
try: | |
generator = fn(*args, **kwargs) | |
def generator_next(): | |
try: | |
try: | |
future = next(generator) | |
except StopIteration as stop: | |
real_future.set_result(stop.value) | |
else: | |
if future is None: | |
self._submit(generator_next) | |
else: | |
future.add_done_callback(lambda future: generator_next()) | |
except: | |
exception() | |
self._submit(generator_next) | |
except: | |
exception() | |
self._submit(generator_start) | |
return real_future | |
def recursive_map(self, fn, *iterables, timeout=None): | |
"""Returns a iterator equivalent to map(fn, iter). | |
Args: | |
fn: A callable that will take as many arguments as there are | |
passed iterables. | |
timeout: The maximum number of seconds to wait. If None, then there | |
is no limit on the wait time. | |
Returns: | |
An iterator equivalent to: map(func, *iterables) but the calls may | |
be evaluated out-of-order. | |
Raises: | |
TimeoutError: If the entire result iterator could not be generated | |
before the given timeout. | |
Exception: If fn(*args) raises for any values. | |
""" | |
if timeout is not None: | |
end_time = timeout + time.time() | |
fs = [self.submit(fn, *args) for args in zip(*iterables)] | |
# Yield must be hidden in closure so that the futures are submitted | |
# before the first iterator value is required. | |
def result_iterator(): | |
yield from fs | |
return fs | |
return result_iterator() | |
def f(args): | |
executor, tasks = args | |
print ('tasks:', tasks) | |
if type(tasks) == int: | |
## if tasks == 3: raise NotImplementedError() # try this out | |
return tasks | |
# waiting for all futures without blocking the thread | |
futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks]) | |
return sum([future.result() for future in futures]) | |
if __name__ == '__main__': | |
with RecursiveThreadPoolExecutor(max_workers = 1) as executor: | |
r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1) | |
import time | |
time.sleep(0.1) | |
for v in r: | |
print('v: {}'.format(v)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment