Last active
June 27, 2022 09:26
-
-
Save internetimagery/13d95e16239c0759012dfb9231086869 to your computer and use it in GitHub Desktop.
Spawn manager. Launch a manager in another process. Useful when python is embedded in another application (ie maya) and cannot fork.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Permission to use, copy, modify, and/or distribute this software for any purpose with or without | |
# fee is hereby granted. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO | |
# THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE | |
# AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER | |
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE | |
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
# Example usage: python /path/to/this/file/spawn_manager.py --demo | |
# Or from something like maya... | |
# >>> from spawn_manager import launch_manager | |
# >>> from contextlib import closing | |
# >>> manager = launch_manager(executable="mayapy") | |
# >>> with closing(manager.Pool()) as pool: | |
# >>> print(pool.map(str, range(100))) | |
import os | |
import sys | |
import time | |
import pickle | |
import signal | |
import logging | |
import subprocess | |
import threading | |
import multiprocessing | |
from contextlib import closing | |
from multiprocessing.util import Finalize | |
from multiprocessing.reduction import ForkingPickler | |
from multiprocessing.managers import SyncManager, dispatch, listener_client | |
LOG = logging.getLogger(__name__) | |
def launch_manager( | |
initializer=None, | |
initargs=(), | |
manager_cls=SyncManager, | |
executable="", | |
env=None, | |
timeout=10, | |
): | |
""" | |
Spawn a manager in another process. | |
>>> from contextlib import closing | |
>>> from spawn_manager import launch_manager | |
>>> with closing(launch_manager(executable="mayapy") as manager: | |
>>> with closing(manager.Pool()) as pool: | |
>>> print(pool.map(str, range(100))) | |
""" | |
if not executable: | |
executable = sys.executable | |
# Launch server subprocess | |
proc = subprocess.Popen( | |
( | |
executable, | |
"-c", | |
"import imp, sys;imp.load_source(sys.argv[1], sys.argv[2])._launch_server()", | |
__name__, | |
__file__, | |
), | |
stdin=subprocess.PIPE, | |
env=env, | |
) | |
# Set up our connection to the subprocess. Send across our shared authorization and pipe. | |
here, there = multiprocessing.Pipe() | |
pickler = ForkingPickler(proc.stdin) | |
pickler.dump(bytes(multiprocessing.current_process().authkey)) | |
pickler.dump(there) | |
proc.stdin.flush() | |
proc.stdin.close() | |
# Talk to our subprocess. Send over things we need to run. | |
# Recieve back an address to connect to the server. | |
with closing(here): | |
here.send( | |
( | |
initializer, | |
initargs, | |
manager_cls, | |
os.getpid(), | |
logging.getLogger().level, | |
), | |
) | |
if not here.poll(timeout): | |
raise RuntimeError("No server response") | |
address = here.recv() | |
# Start up our client | |
manager = manager_cls(address=address) | |
manager.connect() | |
# When we lose reference to the manager, or main process is complete. Shut down the server | |
def shutdown(): | |
if proc.poll() is not None: | |
return | |
for msg, kill, args in ( | |
("Stopping Server", _shutdown_server, (address,)), | |
("Interrupting Server", proc.send_signal, (signal.SIGINT,)), | |
("Terminating Server", proc.terminate, ()), | |
("Killing Server", proc.kill, ()), | |
): | |
LOG.debug(msg) | |
kill(*args) | |
for _ in range(50): | |
if proc.poll() is not None: | |
return | |
time.sleep(0.1) | |
# Register our finalizer as the shutdown method so manager can be optionally used | |
# as a context manager, and so it shuts the process down after GC. | |
manager.shutdown = Finalize(manager, shutdown) | |
return manager | |
def _shutdown_server(address, serializer="pickle"): | |
_, client = listener_client[serializer] | |
authkey = multiprocessing.current_process().authkey | |
with closing(client(address, authkey=authkey)) as conn: | |
dispatch(conn, None, "shutdown") | |
def _launch_server(): | |
"""Run in the new process to kick off the manager""" | |
unpickler = pickle.Unpickler(getattr(sys.stdin, "buffer", sys.stdin)) | |
# Authorize this process to access connections from our parent process. | |
# Then join the conversation... | |
process = multiprocessing.current_process() | |
process.authkey = unpickler.load() | |
with closing(unpickler.load()) as conn: | |
# Take our types etc to kick things off | |
initializer, initargs, manager_cls, parent, log_level = conn.recv() | |
multiprocessing.log_to_stderr(log_level) | |
process.name = "{0.__name__}Process".format(manager_cls) | |
# Build the manager server | |
manager = manager_cls() | |
server = manager.get_server() | |
# Send back connection details so the parent can connect | |
conn.send(server.address) | |
# Close our connection, and fire off any initializer | |
# Useful to set up an environment / config / import any slow to load frequently used modules etc | |
if initializer: | |
initializer(*initargs) | |
# Periodically check the parent process still lives. | |
# In case of sudden failure, and improper shutdown. | |
def parent_monitor(): | |
while True: | |
time.sleep(10) | |
try: | |
os.kill(parent, 0) | |
except OSError: | |
LOG.warning("Parent dead. Shutting down") | |
_shutdown_server(server.address) | |
sys.exit() # server shutdown does this, but to be safe... | |
th = threading.Thread(target=parent_monitor) | |
th.setDaemon(True) | |
th.start() | |
# ... and start the server. | |
LOG.debug("Server Starting") | |
server.serve_forever() | |
LOG.debug("Server Shutting Down") | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
import argparse | |
parser = argparse.ArgumentParser(description="Spawn remote process manager") | |
parser.add_argument("--demo", action="store_true", default=False) | |
args = parser.parse_args() | |
if args.demo: # Run a demo | |
# Quick demo using the stuff | |
with launch_manager() as manager: | |
with closing(manager.Pool()) as pool: | |
print(pool.map(str, range(10))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment