Skip to content

Instantly share code, notes, and snippets.

@internetimagery
Last active June 27, 2022 09:26
Show Gist options
  • Save internetimagery/13d95e16239c0759012dfb9231086869 to your computer and use it in GitHub Desktop.
Save internetimagery/13d95e16239c0759012dfb9231086869 to your computer and use it in GitHub Desktop.
Spawn manager. Launch a manager in another process. Useful when python is embedded in another application (ie maya) and cannot fork.
# Permission to use, copy, modify, and/or distribute this software for any purpose with or without
# fee is hereby granted.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
# AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# Example usage: python /path/to/this/file/spawn_manager.py --demo
# Or from something like maya...
# >>> from spawn_manager import launch_manager
# >>> from contextlib import closing
# >>> manager = launch_manager(executable="mayapy")
# >>> with closing(manager.Pool()) as pool:
# >>> print(pool.map(str, range(100)))
import os
import sys
import time
import pickle
import signal
import logging
import subprocess
import threading
import multiprocessing
from contextlib import closing
from multiprocessing.util import Finalize
from multiprocessing.reduction import ForkingPickler
from multiprocessing.managers import SyncManager, dispatch, listener_client
LOG = logging.getLogger(__name__)
def launch_manager(
initializer=None,
initargs=(),
manager_cls=SyncManager,
executable="",
env=None,
timeout=10,
):
"""
Spawn a manager in another process.
>>> from contextlib import closing
>>> from spawn_manager import launch_manager
>>> with closing(launch_manager(executable="mayapy") as manager:
>>> with closing(manager.Pool()) as pool:
>>> print(pool.map(str, range(100)))
"""
if not executable:
executable = sys.executable
# Launch server subprocess
proc = subprocess.Popen(
(
executable,
"-c",
"import imp, sys;imp.load_source(sys.argv[1], sys.argv[2])._launch_server()",
__name__,
__file__,
),
stdin=subprocess.PIPE,
env=env,
)
# Set up our connection to the subprocess. Send across our shared authorization and pipe.
here, there = multiprocessing.Pipe()
pickler = ForkingPickler(proc.stdin)
pickler.dump(bytes(multiprocessing.current_process().authkey))
pickler.dump(there)
proc.stdin.flush()
proc.stdin.close()
# Talk to our subprocess. Send over things we need to run.
# Recieve back an address to connect to the server.
with closing(here):
here.send(
(
initializer,
initargs,
manager_cls,
os.getpid(),
logging.getLogger().level,
),
)
if not here.poll(timeout):
raise RuntimeError("No server response")
address = here.recv()
# Start up our client
manager = manager_cls(address=address)
manager.connect()
# When we lose reference to the manager, or main process is complete. Shut down the server
def shutdown():
if proc.poll() is not None:
return
for msg, kill, args in (
("Stopping Server", _shutdown_server, (address,)),
("Interrupting Server", proc.send_signal, (signal.SIGINT,)),
("Terminating Server", proc.terminate, ()),
("Killing Server", proc.kill, ()),
):
LOG.debug(msg)
kill(*args)
for _ in range(50):
if proc.poll() is not None:
return
time.sleep(0.1)
# Register our finalizer as the shutdown method so manager can be optionally used
# as a context manager, and so it shuts the process down after GC.
manager.shutdown = Finalize(manager, shutdown)
return manager
def _shutdown_server(address, serializer="pickle"):
_, client = listener_client[serializer]
authkey = multiprocessing.current_process().authkey
with closing(client(address, authkey=authkey)) as conn:
dispatch(conn, None, "shutdown")
def _launch_server():
"""Run in the new process to kick off the manager"""
unpickler = pickle.Unpickler(getattr(sys.stdin, "buffer", sys.stdin))
# Authorize this process to access connections from our parent process.
# Then join the conversation...
process = multiprocessing.current_process()
process.authkey = unpickler.load()
with closing(unpickler.load()) as conn:
# Take our types etc to kick things off
initializer, initargs, manager_cls, parent, log_level = conn.recv()
multiprocessing.log_to_stderr(log_level)
process.name = "{0.__name__}Process".format(manager_cls)
# Build the manager server
manager = manager_cls()
server = manager.get_server()
# Send back connection details so the parent can connect
conn.send(server.address)
# Close our connection, and fire off any initializer
# Useful to set up an environment / config / import any slow to load frequently used modules etc
if initializer:
initializer(*initargs)
# Periodically check the parent process still lives.
# In case of sudden failure, and improper shutdown.
def parent_monitor():
while True:
time.sleep(10)
try:
os.kill(parent, 0)
except OSError:
LOG.warning("Parent dead. Shutting down")
_shutdown_server(server.address)
sys.exit() # server shutdown does this, but to be safe...
th = threading.Thread(target=parent_monitor)
th.setDaemon(True)
th.start()
# ... and start the server.
LOG.debug("Server Starting")
server.serve_forever()
LOG.debug("Server Shutting Down")
if __name__ == "__main__":
if len(sys.argv) != 3:
import argparse
parser = argparse.ArgumentParser(description="Spawn remote process manager")
parser.add_argument("--demo", action="store_true", default=False)
args = parser.parse_args()
if args.demo: # Run a demo
# Quick demo using the stuff
with launch_manager() as manager:
with closing(manager.Pool()) as pool:
print(pool.map(str, range(10)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment