Created
August 22, 2025 14:00
-
-
Save p7g/090604579fbb9d0c7a5b119dabe8428b to your computer and use it in GitHub Desktop.
I wanted to try writing a thread pool 🤷
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from concurrent.futures import as_completed | |
from functools import partial | |
from .pool import Pool | |
def wait_and_return(n): | |
print(f"start {n}\n", end="") | |
time.sleep(n) | |
return n | |
with Pool(5) as pool: | |
futs = [] | |
for i in range(10): | |
futs.append(pool.execute(partial(wait_and_return, i))) | |
for fut in as_completed(futs): | |
print(f"done {fut.result()}\n", end="") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import queue | |
import threading | |
from collections import deque | |
from concurrent.futures import Future | |
class PoolClosed(Exception): | |
pass | |
class Pool: | |
def __init__(self, size=1): | |
assert size >= 1 | |
self._size = size | |
self._lock = threading.Lock() | |
self._workers = [] | |
self._jobq = queue.Queue() | |
self._closed = False | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc, tb): | |
self.close() | |
def _worker(self): | |
while True: | |
try: | |
func, fut = self._jobq.get() | |
except queue.ShutDown: | |
break | |
try: | |
result = func() | |
except Exception as exc: | |
fut.set_exception(exc) | |
else: | |
fut.set_result(result) | |
finally: | |
self._jobq.task_done() | |
def _new_worker(self): | |
if len(self._workers) >= self._size: | |
return | |
with self._lock: | |
if len(self._workers) >= self._size: | |
return | |
t = threading.Thread(target=self._worker) | |
t.start() | |
self._workers.append(t) | |
def execute(self, func): | |
if self._closed: | |
raise PoolClosed | |
assert callable(func) | |
if self._jobq.qsize() > 0 or not self._workers: | |
self._new_worker() | |
fut = Future() | |
self._jobq.put_nowait((func, fut)) | |
return fut | |
def close(self): | |
if self._closed: | |
return | |
self._closed = True | |
self._jobq.shutdown() | |
for t in self._workers: | |
t.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment