Skip to content

Instantly share code, notes, and snippets.

@internetimagery
Last active June 27, 2022 08:08
Show Gist options
  • Save internetimagery/03a8a665ec742cf7154a421c216c6de8 to your computer and use it in GitHub Desktop.
Save internetimagery/03a8a665ec742cf7154a421c216c6de8 to your computer and use it in GitHub Desktop.
Remote Executor (ProcessPoolExecutor / ThreadPoolExecutor manager)
# Permission to use, copy, modify, and/or distribute this software for any purpose with or without
# fee is hereby granted.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
# AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# Access to PoolExecutors from multiprocessing managers
# from queue import Queue
from six.moves.queue import Queue
from itertools import count
from functools import partial
from threading import Thread, Lock
from weakref import WeakKeyDictionary
from multiprocessing.managers import SyncManager, MakeProxyType
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def register_executors(manager_cls=SyncManager):
"""
Register Thread and Process Pool Executors to the provided Manager class.
For simplicity, this assumes a SyncManager or subclass. It expects the Queue and Iterator to
already be registered.
"""
manager_cls.register("ThreadPoolExecutor", ThreadPoolExecutor, proxytype=ExecutorProxy)
manager_cls.register("ProcessPoolExecutor", ProcessPoolExecutor, proxytype=ExecutorProxy)
# Utility, not exposed
manager_cls.register("Executor", proxytype=ExecutorProxy, create_method=False)
manager_cls.register("Future", proxytype=FutureProxy, create_method=False)
BaseFutureProxy = MakeProxyType(
"BaseFutureProxy", ("add_done_callback", "cancel", "cancelled", "done", "exception", "result", "running")
)
class FutureProxy(BaseFutureProxy):
__callback_management = WeakKeyDictionary()
__callback_management_lock = Lock()
__callback_management_id = count()
__callback_management_store = {}
@classmethod
def __callback_management_loop(cls, queue):
while True:
try:
id_ = queue.get()
except EOFError:
break
func, fut = cls.__callback_management_store.pop(id_)
func(fut)
def add_done_callback(self, func):
if self.done():
func(self)
return
# Kick off a thread to watch for callbacks in a queue, and execute them as they come in.
# Callbacks could error. Rebuild the thread if it dies.
manager = self._manager
assert manager, "We should have a manager connected"
with self.__callback_management_lock:
id_ = next(self.__callback_management_id)
try:
queue, thread = self.__callback_management[manager]
except KeyError:
queue, thread = manager.Queue(), None
# Start or rebuild a fallen thread
if not thread or not thread.is_alive():
thread = Thread(target=self.__callback_management_loop, args=(queue,))
thread.start()
self.__callback_management[manager] = (queue, thread)
self.__callback_management_store[id_] = (func, self)
self._callmethod("add_done_callback", (partial(_call_id, queue, id_),))
def _call_id(queue, id_, _fut):
queue.put(id_) # Run on manager process, trigger callback on main process
BaseExecutorProxy = MakeProxyType("BaseExecutorProxy", ("map", "submit", "__enter__", "__exit__"))
BaseExecutorProxy._method_to_typeid_ = {"map": "Iterator", "submit": "Future", "__enter__": "Executor"}
class ExecutorProxy(BaseExecutorProxy):
def __exit__(self, *_):
self._callmethod("__exit__", (None, None, None))
# Demo Usage
if __name__ == "__main__":
def count(num):
return list(range(num))
def whereami(fut):
import multiprocessing
print("Got result", fut.result(), "From", multiprocessing.current_process())
class ExecutorManager(SyncManager):
pass
register_executors(ExecutorManager)
manager = ExecutorManager()
manager.start()
with manager.ThreadPoolExecutor() as pool:
numbers = list(range(8))
for seq in pool.map(count, numbers):
print(">>", seq)
fut = pool.submit(count, 4)
print(fut.result())
fut.add_done_callback(whereami)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment