Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save valsteen/7f4b85d6c31e08b77131ceb5654a09cb to your computer and use it in GitHub Desktop.
Save valsteen/7f4b85d6c31e08b77131ceb5654a09cb to your computer and use it in GitHub Desktop.
Task queue executor with queue size limit
import random
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from time import sleep
class ThreadPoolExecutorWithQueueSizeLimit(ThreadPoolExecutor):
# limiting queue size allows to keep under control the memory used by the producer
# inspired by https://stackoverflow.com/a/48327162/34871
def __init__(self, max_queue_size: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self._work_queue = Queue(maxsize=max_queue_size)
def task(task_arg):
print("start", task_arg)
sleep(random.randint(100, 1000) / 1000)
print("done", task_arg)
return "result for {}".format(task_arg)
def main():
with ThreadPoolExecutorWithQueueSizeLimit(max_queue_size=10, max_workers=5) as executor:
for task_arg in range(1, 100):
print("scheduling", task_arg)
executor.submit(task, (task_arg,)) # blocking call if there are more than 10 queued items
# 'with' is exited only once all tasks are executed
print("done")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment