Created
May 28, 2021 12:23
-
-
Save valsteen/7f4b85d6c31e08b77131ceb5654a09cb to your computer and use it in GitHub Desktop.
Task queue executor with queue size limit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from concurrent.futures import ThreadPoolExecutor | |
from queue import Queue | |
from time import sleep | |
class ThreadPoolExecutorWithQueueSizeLimit(ThreadPoolExecutor): | |
# limiting queue size allows to keep under control the memory used by the producer | |
# inspired by https://stackoverflow.com/a/48327162/34871 | |
def __init__(self, max_queue_size: int, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._work_queue = Queue(maxsize=max_queue_size) | |
def task(task_arg): | |
print("start", task_arg) | |
sleep(random.randint(100, 1000) / 1000) | |
print("done", task_arg) | |
return "result for {}".format(task_arg) | |
def main(): | |
with ThreadPoolExecutorWithQueueSizeLimit(max_queue_size=10, max_workers=5) as executor: | |
for task_arg in range(1, 100): | |
print("scheduling", task_arg) | |
executor.submit(task, (task_arg,)) # blocking call if there are more than 10 queued items | |
# 'with' is exited only once all tasks are executed | |
print("done") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment