Skip to content

Instantly share code, notes, and snippets.

@tomoemon
Last active March 25, 2022 01:55
Show Gist options
  • Save tomoemon/8a3c6c45130fe95d2df3960f583bf9ef to your computer and use it in GitHub Desktop.
Save tomoemon/8a3c6c45130fe95d2df3960f583bf9ef to your computer and use it in GitHub Desktop.
submit できるキューに制限をかけながら実行する
import concurrent.futures
from threading import Semaphore
# ThreadPoolExecutor の _work_queue を差し替える方法もあるようだが、試したところ意図しない挙動になった(最後のタスクが終了しても制御が戻らない)
# https://stackoverflow.com/questions/48263704/threadpoolexecutor-how-to-limit-the-queue-maxsize
class ThreadPoolExecutorWithLimitedWorkQueue(concurrent.futures.ThreadPoolExecutor):
def __init__(self, max_queue_size, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = Semaphore(max_queue_size)
def submit(self, *args, **kwargs):
self.semaphore.acquire()
future = super().submit(*args, **kwargs)
future.add_done_callback(self._release_semaphore)
return future
def _release_semaphore(self, future):
self.semaphore.release()
if e := future.exception():
self.shutdown(wait=False)
raise e
if __name__ == "__main__":
import time
def task1(payload):
print(payload, "started")
time.sleep(1)
# if task == 9:
# raise Exception(f"{task} error: something failed")
return payload
futures = []
tasks = range(10)
executor = ThreadPoolExecutorWithLimitedWorkQueue(max_queue_size=2, max_workers=2)
for t in tasks:
future = executor.submit(task1, t)
futures.append(future)
print(t, "submitted")
concurrent.futures.wait(futures)
print("finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment