Last active
June 27, 2022 08:08
-
-
Save internetimagery/03a8a665ec742cf7154a421c216c6de8 to your computer and use it in GitHub Desktop.
Remote Executor (ProcessPoolExecutor / ThreadPoolExecutor manager)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Permission to use, copy, modify, and/or distribute this software for any purpose with or without | |
# fee is hereby granted. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO | |
# THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE | |
# AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER | |
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE | |
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
# Access to PoolExecutors from multiprocessing managers | |
# from queue import Queue | |
from six.moves.queue import Queue | |
from itertools import count | |
from functools import partial | |
from threading import Thread, Lock | |
from weakref import WeakKeyDictionary | |
from multiprocessing.managers import SyncManager, MakeProxyType | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
def register_executors(manager_cls=SyncManager): | |
""" | |
Register Thread and Process Pool Executors to the provided Manager class. | |
For simplicity, this assumes a SyncManager or subclass. It expects the Queue and Iterator to | |
already be registered. | |
""" | |
manager_cls.register("ThreadPoolExecutor", ThreadPoolExecutor, proxytype=ExecutorProxy) | |
manager_cls.register("ProcessPoolExecutor", ProcessPoolExecutor, proxytype=ExecutorProxy) | |
# Utility, not exposed | |
manager_cls.register("Executor", proxytype=ExecutorProxy, create_method=False) | |
manager_cls.register("Future", proxytype=FutureProxy, create_method=False) | |
BaseFutureProxy = MakeProxyType( | |
"BaseFutureProxy", ("add_done_callback", "cancel", "cancelled", "done", "exception", "result", "running") | |
) | |
class FutureProxy(BaseFutureProxy): | |
__callback_management = WeakKeyDictionary() | |
__callback_management_lock = Lock() | |
__callback_management_id = count() | |
__callback_management_store = {} | |
@classmethod | |
def __callback_management_loop(cls, queue): | |
while True: | |
try: | |
id_ = queue.get() | |
except EOFError: | |
break | |
func, fut = cls.__callback_management_store.pop(id_) | |
func(fut) | |
def add_done_callback(self, func): | |
if self.done(): | |
func(self) | |
return | |
# Kick off a thread to watch for callbacks in a queue, and execute them as they come in. | |
# Callbacks could error. Rebuild the thread if it dies. | |
manager = self._manager | |
assert manager, "We should have a manager connected" | |
with self.__callback_management_lock: | |
id_ = next(self.__callback_management_id) | |
try: | |
queue, thread = self.__callback_management[manager] | |
except KeyError: | |
queue, thread = manager.Queue(), None | |
# Start or rebuild a fallen thread | |
if not thread or not thread.is_alive(): | |
thread = Thread(target=self.__callback_management_loop, args=(queue,)) | |
thread.start() | |
self.__callback_management[manager] = (queue, thread) | |
self.__callback_management_store[id_] = (func, self) | |
self._callmethod("add_done_callback", (partial(_call_id, queue, id_),)) | |
def _call_id(queue, id_, _fut): | |
queue.put(id_) # Run on manager process, trigger callback on main process | |
BaseExecutorProxy = MakeProxyType("BaseExecutorProxy", ("map", "submit", "__enter__", "__exit__")) | |
BaseExecutorProxy._method_to_typeid_ = {"map": "Iterator", "submit": "Future", "__enter__": "Executor"} | |
class ExecutorProxy(BaseExecutorProxy): | |
def __exit__(self, *_): | |
self._callmethod("__exit__", (None, None, None)) | |
# Demo Usage | |
if __name__ == "__main__": | |
def count(num): | |
return list(range(num)) | |
def whereami(fut): | |
import multiprocessing | |
print("Got result", fut.result(), "From", multiprocessing.current_process()) | |
class ExecutorManager(SyncManager): | |
pass | |
register_executors(ExecutorManager) | |
manager = ExecutorManager() | |
manager.start() | |
with manager.ThreadPoolExecutor() as pool: | |
numbers = list(range(8)) | |
for seq in pool.map(count, numbers): | |
print(">>", seq) | |
fut = pool.submit(count, 4) | |
print(fut.result()) | |
fut.add_done_callback(whereami) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment