Created
June 12, 2019 20:01
-
-
Save bsamuel-ui/d5a4f8f2b9a7e36eff96803a6de7df04 to your computer and use it in GitHub Desktop.
An implementation of Executor.map that reads its inputs only on demand.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial | |
from queue import SimpleQueue | |
def imap(exc, func, src, size=None, return_exc=False): | |
if size is None: | |
size = exc._max_workers | |
if size < 1: | |
raise ValueError("imap hang if size is less than one.") | |
results = SimpleQueue() | |
futures = {} | |
def complete(slot, fut): | |
try: | |
result = slot, fut.result(), None | |
except BaseException as err: | |
result = slot, None, err | |
results.put(result) | |
def results_get(): | |
slot, out, err = results.get() | |
if err is not None: | |
if return_exc: | |
out = err | |
else: | |
raise err | |
return slot, out | |
try: | |
src_iter = iter(src) | |
free_slot = 0 | |
while True: | |
if free_slot < size: | |
slot = free_slot | |
free_slot += 1 | |
else: | |
slot, out = results_get() | |
yield out | |
try: | |
data = next(src_iter) | |
except StopIteration: | |
futures.pop(slot, None) | |
break | |
else: | |
futures[slot] = fut = exc.submit(func, data) | |
fut.add_done_callback(partial(complete, slot)) | |
while futures: | |
slot, out = results_get() | |
futures.pop(slot, None) | |
yield out | |
finally: | |
for fut in futures.values(): | |
fut.cancel() | |
# Testing code | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
import os | |
from time import sleep | |
from threading import Lock | |
def printl(*args, _lock=Lock()): | |
with _lock: | |
print(*args) | |
def add_one(x): | |
printl(f'run: {x} pid={os.getpid()}') | |
if x % 2 == 0: | |
sleep(1) | |
return x + 1 | |
def yields(n=25): | |
for x in range(n): | |
printl(f'get: {x} pid={os.getpid()}') | |
yield x | |
outs = set() | |
with ThreadPoolExecutor(7) as tpe: | |
for o in imap(tpe, add_one, yields(25), return_exc=True): | |
printl(f'got: {o} pid={os.getpid()}') | |
outs.add(o) | |
printl(f"FIN: {sorted(outs) == list(range(1, 26))}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment