Last active
April 10, 2025 19:52
-
-
Save frankcleary/f97fe244ef54cd75278e521ea52a697a to your computer and use it in GitHub Desktop.
Python ThreadPoolExecutor with bounded queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor | |
from threading import BoundedSemaphore | |
class BoundedExecutor: | |
"""BoundedExecutor behaves as a ThreadPoolExecutor which will block on | |
calls to submit() once the limit given as "bound" work items are queued for | |
execution. | |
:param bound: Integer - the maximum number of items in the work queue | |
:param max_workers: Integer - the size of the thread pool | |
""" | |
def __init__(self, bound, max_workers): | |
self.executor = ThreadPoolExecutor(max_workers=max_workers) | |
self.semaphore = BoundedSemaphore(bound + max_workers) | |
"""See concurrent.futures.Executor#submit""" | |
def submit(self, fn, *args, **kwargs): | |
self.semaphore.acquire() | |
try: | |
future = self.executor.submit(fn, *args, **kwargs) | |
except: | |
self.semaphore.release() | |
raise | |
else: | |
future.add_done_callback(lambda x: self.semaphore.release()) | |
return future | |
"""See concurrent.futures.Executor#shutdown""" | |
def shutdown(self, wait=True): | |
self.executor.shutdown(wait) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from boundedexecutor import BoundedExecutor | |
def work(): | |
time.sleep(1) | |
print('work done') | |
if __name__ == '__main__': | |
executor = BoundedExecutor(10, 200) | |
for i in range(1000): | |
executor.submit(work) | |
print('work submitted') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thx. It's helpful. I have a need of such functionality because i can overflow a job queue is inside a pool.