Skip to content

Instantly share code, notes, and snippets.

@abersheeran
Last active May 11, 2024 06:36
Show Gist options
  • Save abersheeran/de4dfb0626d9deb87fad67858e268741 to your computer and use it in GitHub Desktop.
Save abersheeran/de4dfb0626d9deb87fad67858e268741 to your computer and use it in GitHub Desktop.
Python:在多个进程中分别使用线程池处理任务
from concurrent.futures import ThreadPoolExecutor
import gc
import multiprocessing
import queue
import signal
import threading
import time
from typing import Callable, ParamSpec
from loguru import logger
P = ParamSpec("P")
class MultiWorker:
"""
在多个进程中分别使用线程池处理任务
```python
with MultiWorker(
processes=settings.receive_processes, threads=settings.receive_workers
) as workers:
...
workers.submit(fn, *args, **kwargs)
```
"""
def __init__(
self, processes: int = 4, threads: int = 32, *, maxsize: int = 0
) -> None:
self._threads = threads
self.queue = multiprocessing.Queue(maxsize)
self.processes = [
multiprocessing.Process(
target=self._run, args=(threads, self.queue), daemon=True
)
for _ in range(min(processes, multiprocessing.cpu_count()))
]
@staticmethod
def _run(threads: int, q: multiprocessing.Queue) -> None:
gc.enable()
gc.set_threshold(700, 10, 10)
shutdown = False
def exit(*args, **kwargs) -> None:
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGINT, exit)
signal.signal(signal.SIGTERM, exit)
semaphore = threading.Semaphore(threads)
def release_worker(future) -> None:
semaphore.release()
with ThreadPoolExecutor(max_workers=threads) as workers:
while not shutdown and semaphore.acquire(blocking=True, timeout=None):
fn, args, kwargs = q.get(block=True, timeout=None)
future = workers.submit(fn, *args, **kwargs)
future.add_done_callback(release_worker)
def submit(self, fn: Callable[P, None], *args: P.args, **kwargs: P.kwargs) -> None:
for idx, process in enumerate(tuple(self.processes)):
if process.is_alive():
continue
logger.warning("Child process [{}] died unexpectedly".format(process.pid))
process = multiprocessing.Process(
target=self._run, args=(self._threads, self.queue), daemon=True
)
self.processes[idx] = process
process.start()
self.queue.put((fn, args, kwargs))
def start(self) -> None:
for process in self.processes:
process.start()
def close(self, block: bool = True) -> None:
for process in self.processes:
process.terminate()
if block:
for process in self.processes:
process.join()
def __enter__(self) -> "MultiWorker":
self.start()
return self
def __exit__(self, *args) -> None:
self.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment