Skip to content

Instantly share code, notes, and snippets.

@ultrafunkamsterdam
Last active February 28, 2020 13:16
Show Gist options
  • Save ultrafunkamsterdam/da935bf2fddd1e30aa9f4b3cd9ef192a to your computer and use it in GitHub Desktop.
Save ultrafunkamsterdam/da935bf2fddd1e30aa9f4b3cd9ef192a to your computer and use it in GitHub Desktop.
python asyncio multiprocessing pool (cancellable)
import asyncio
import multiprocessing
class Pool(object):
def __init__(self, max_workers=3):
self._workers_available = {self._new_pool() for _ in range(max_workers)}
self._workers_busy = set()
self._can_rotate = asyncio.Event()
def _create_fresh_worker(self):
return multiprocessing.Pool(1)
async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._workers_available:
await self._can_rotate.wait()
self._can_rotate.clear()
worker = usable_worker = self._workers_available.pop()
self._workers_busy.add(worker)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
worker.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
try:
return await fut
except asyncio.CancelledError:
worker.terminate()
usable_worker = self._create_fresh_worker()
finally:
self._workers_busy.remove(worker)
self._workers_available.add(usable_worker)
self._can_rotate.set()
def shutdown(self):
for p in self._workers_busy | self._workers_available:
p.terminate()
self._workers_available.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment