Created
July 4, 2022 09:41
-
-
Save internetimagery/2bfafab9562559b4ed5aff00f0e3335e to your computer and use it in GitHub Desktop.
Simple job submitter for multiprocessing.SyncManager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from threading import Thread | |
from pickle import dumps, loads | |
from six.moves.queue import Empty | |
from itertools import chain, count | |
from multiprocessing.util import Finalize | |
from concurrent.futures import Future, ProcessPoolExecutor | |
def register_submit(manager_cls, timeout=10, id_generator=count(), future_store={}): | |
# Server Side | |
def process_tasks(queues, timeout): | |
submit_queue, result_queue = loads(queues) | |
def read(timeout=None): | |
while True: | |
try: | |
item = submit_queue.get(block=not timeout, timeout=timeout) | |
except Empty: | |
break | |
if item is None: | |
break | |
yield item | |
def chunk(): | |
try: | |
# Block until some work is ready, then kick off | |
# some process pools to perform work. | |
# If no work comes in within timeout, then release pools | |
# and wait for more work. | |
for item in read(): | |
with ProcessPoolExecutor() as pool: | |
for result in pool.map(_work, chain((item,), read(timeout))): | |
result_queue.put(result) | |
finally: | |
result_queue.put_nowait(None) | |
th = Thread(target=chunk) | |
th.start() | |
manager_cls.register("_process_tasks", process_tasks) | |
# Client Side | |
def submit(self, func, *args, **kwargs): | |
if not hasattr(self, "_submit_queue"): | |
self._submit_queue = submit_queue = self.Queue() | |
self._result_queue = self.Queue() | |
self.submit_shutdown = Finalize(self, lambda *_: submit_queue.put_nowait(None)) | |
# Read in futures and process results | |
def collect_results(): | |
for id_, result, err in iter(self._result_queue.get, None): | |
fut = future_store.pop(id_) | |
if err: | |
fut.set_exception(err) | |
else: | |
fut.set_result(result) | |
th = Thread(target=collect_results) | |
th.start() | |
self._process_tasks(dumps((self._submit_queue, self._result_queue)), timeout) | |
# Submit tasks to be processed | |
id_ = next(id_generator) | |
fut = future_store[id_] = Future() | |
self._submit_queue.put((id_, func, args, kwargs)) | |
return fut | |
manager_cls.submit = submit | |
def _work(item): | |
# Run inside process pool | |
id_, func, args, kwargs = item | |
err = None | |
try: | |
result = func(*args, **kwargs) | |
except Exception as err: | |
result = None | |
return id_, result, err | |
# DEMO ######## | |
from multiprocessing.managers import SyncManager | |
register_submit(SyncManager) | |
if __name__ == "__main__": | |
manager = SyncManager() | |
manager.start() | |
fut = manager.submit(str, 123) | |
print(fut, fut.result()) | |
manager.submit_shutdown() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment