Skip to content

Instantly share code, notes, and snippets.

@p7g
Created August 22, 2025 14:00
Show Gist options
  • Save p7g/090604579fbb9d0c7a5b119dabe8428b to your computer and use it in GitHub Desktop.
Save p7g/090604579fbb9d0c7a5b119dabe8428b to your computer and use it in GitHub Desktop.
I wanted to try writing a thread pool 🤷
import time
from concurrent.futures import as_completed
from functools import partial
from .pool import Pool
def wait_and_return(n):
print(f"start {n}\n", end="")
time.sleep(n)
return n
with Pool(5) as pool:
futs = []
for i in range(10):
futs.append(pool.execute(partial(wait_and_return, i)))
for fut in as_completed(futs):
print(f"done {fut.result()}\n", end="")
import queue
import threading
from collections import deque
from concurrent.futures import Future
class PoolClosed(Exception):
pass
class Pool:
def __init__(self, size=1):
assert size >= 1
self._size = size
self._lock = threading.Lock()
self._workers = []
self._jobq = queue.Queue()
self._closed = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.close()
def _worker(self):
while True:
try:
func, fut = self._jobq.get()
except queue.ShutDown:
break
try:
result = func()
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(result)
finally:
self._jobq.task_done()
def _new_worker(self):
if len(self._workers) >= self._size:
return
with self._lock:
if len(self._workers) >= self._size:
return
t = threading.Thread(target=self._worker)
t.start()
self._workers.append(t)
def execute(self, func):
if self._closed:
raise PoolClosed
assert callable(func)
if self._jobq.qsize() > 0 or not self._workers:
self._new_worker()
fut = Future()
self._jobq.put_nowait((func, fut))
return fut
def close(self):
if self._closed:
return
self._closed = True
self._jobq.shutdown()
for t in self._workers:
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment