-
-
Save schlamar/2311116 to your computer and use it in GitHub Desktop.
import os | |
import sys | |
import traceback | |
from functools import wraps | |
from multiprocessing import Process, Queue | |
def processify(func): | |
'''Decorator to run a function as a process. | |
Be sure that every argument and the return value | |
is *pickable*. | |
The created process is joined, so the code does not | |
run in parallel. | |
''' | |
def process_func(q, *args, **kwargs): | |
try: | |
ret = func(*args, **kwargs) | |
except Exception: | |
ex_type, ex_value, tb = sys.exc_info() | |
error = ex_type, ex_value, ''.join(traceback.format_tb(tb)) | |
ret = None | |
else: | |
error = None | |
q.put((ret, error)) | |
# register original function with different name | |
# in sys.modules so it is pickable | |
process_func.__name__ = func.__name__ + 'processify_func' | |
setattr(sys.modules[__name__], process_func.__name__, process_func) | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
q = Queue() | |
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs) | |
p.start() | |
ret, error = q.get() | |
p.join() | |
if error: | |
ex_type, ex_value, tb_str = error | |
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str) | |
raise ex_type(message) | |
return ret | |
return wrapper | |
@processify | |
def test_function(): | |
return os.getpid() | |
@processify | |
def test_deadlock(): | |
return range(30000) | |
@processify | |
def test_exception(): | |
raise RuntimeError('xyz') | |
def test(): | |
print os.getpid() | |
print test_function() | |
print len(test_deadlock()) | |
test_exception() | |
if __name__ == '__main__': | |
test() |
Sadly, this seems to break with an Apple M1 chip :( If you run it with multiprocessing.set_start_method('fork')
in many situations it simply crashes, and I have not been able to figure out why. If you try with 'spawn'
or 'forkserver'
, the following happens (because the sys.modules
trick does not work in those instances, as the subprocess does not copy over sys.modules
which is a CPython-level object):
Running @Chiron1991's py3 version with Python 3.8.2 under Rosetta:
(venv) george@MacBook-Pro autocomplete % python processify.py
29242
Traceback (most recent call last):
File "processify.py", line 80, in <module>
test()
File "processify.py", line 74, in test
print(test_function())
File "processify.py", line 43, in wrapper
p.start()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 283, in _Popen
return Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'processify.<locals>.process_func'
I've been trying to find a fix but so far no luck.
Here I added the init and setup files to make it a module: https://github.com/dgerosa/processify
Personally, I've made this, which doesn't require that things can be pickled:
import multiprocessing as mp
def forked(fn):
"""
Does not work on Windows (except WSL2), since the fork syscall is not supported here.
fork creates a new process which inherits all of the memory without it being copied.
Memory is copied on write instead, meaning it is very cheap to create a new process
"""
def call(*args, **kwargs):
ctx = mp.get_context('fork')
q = ctx.Queue(1)
is_error = ctx.Value('b', False)
def target():
try:
q.put(fn(*args, **kwargs))
except BaseException as e:
is_error.value = True
q.put(e)
ctx.Process(target=target).start()
result = q.get()
if is_error.value:
raise result
return result
return call
# You can use forked as a decorator:
@forked
def add(x, y):
return x + y
https://stackoverflow.com/a/72490867/5031798 also works, without needing to do all the lower level forking ourselves, but I'm not sure how to turn it into a decorator
Fork for Python 3 here: https://gist.github.com/flbraun/8199fc1a41c2107982053aba809838c6