-
-
Save schlamar/2311116 to your computer and use it in GitHub Desktop.
import os | |
import sys | |
import traceback | |
from functools import wraps | |
from multiprocessing import Process, Queue | |
def processify(func): | |
'''Decorator to run a function as a process. | |
Be sure that every argument and the return value | |
is *pickable*. | |
The created process is joined, so the code does not | |
run in parallel. | |
''' | |
def process_func(q, *args, **kwargs): | |
try: | |
ret = func(*args, **kwargs) | |
except Exception: | |
ex_type, ex_value, tb = sys.exc_info() | |
error = ex_type, ex_value, ''.join(traceback.format_tb(tb)) | |
ret = None | |
else: | |
error = None | |
q.put((ret, error)) | |
# register original function with different name | |
# in sys.modules so it is pickable | |
process_func.__name__ = func.__name__ + 'processify_func' | |
setattr(sys.modules[__name__], process_func.__name__, process_func) | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
q = Queue() | |
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs) | |
p.start() | |
ret, error = q.get() | |
p.join() | |
if error: | |
ex_type, ex_value, tb_str = error | |
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str) | |
raise ex_type(message) | |
return ret | |
return wrapper | |
@processify | |
def test_function(): | |
return os.getpid() | |
@processify | |
def test_deadlock(): | |
return range(30000) | |
@processify | |
def test_exception(): | |
raise RuntimeError('xyz') | |
def test(): | |
print os.getpid() | |
print test_function() | |
print len(test_deadlock()) | |
test_exception() | |
if __name__ == '__main__': | |
test() |
Nice utility, thanks!
It seems that if the decorated function returns a sufficiently large object, a deadlock can occur at the p.join()
line, for example:
@processify
def will_deadlock()
return range(30000)
if __name__ == '__main__':
will_deadlock() # deadlocks here
Simply removing the p.join()
line solves the problem. The multiprocessing doc mentioned a possible deadlock issue regarding joining processes that use queues, which seems to be relevant here.
Hello,
this is incredibly useful! Thanks! You should really upload it to Pypi...
Davide
Thank you for the great work, it is very helpful!
One problem I've had with it is when the original exception type has more parameters than usual (in my case it is sqlalchemy.exc.ProgrammingError
, which takes 4 arguments, so I've had TypeError: __init__() takes at least 4 arguments (2 given)
instead of the original exception). We can go this way:
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
try:
exception = ex_type(message)
except Exception:
# Failed to keep the original exception type
exception = Exception('%s\n(original exception type: %s)' % (message, ex_type))
raise exception
Can you please add a license to this code? I was putting together something like that, with splitting input and mapping it to pool of processes. It would be very helpful to use your code as a base. Thanks.
Thank you! Solved an issue I've been trying to resolve for the past ~4 hours or so in which my server kills a long-running Python script I was running. Wrapped the offending code in processify and it's taken my CPU / Mem usage down significantly (by a factor of 20-30).
Sorry I am pasting error here, I tried to spawn a process inside the flask route, I also removed join() from processify.py:
E00073.274: Exception escaped from start_client
Traceback (most recent call last):
File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/log.py", line 110, in g
return f(*args, **kwargs)
File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/pydevd_hooks.py", line 74, in start_client
sock, start_session = daemon.start_client((host, port))
File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/daemon.py", line 214, in start_client
with self.started():
File "/usr/lib/python3.6/contextlib.py", line 81, in __enter__
return next(self.gen)
File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/daemon.py", line 110, in started
self.start()
File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/daemon.py", line 145, in start
raise RuntimeError('already started')
RuntimeError: already started
I am getting a deadlock while using it.
Great module, very helpful, runs like a magic in my scripts with python 2.7.15. With python 3.7.5 I get following:
Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\ProgramData\Anaconda2\envs\pyt3.7\lib\multiprocessing\spawn.py", line 105, in spawn_main exitcode = _main(fd) File "C:\ProgramData\Anaconda2\envs\pyt3.7\lib\multiprocessing\spawn.py", line 115, in _main self = reduction.pickle.load(from_parent) EOFError: Ran out of input
Any idea/suggestion what can be wrong here?
Fork for Python 3 here: https://gist.github.com/flbraun/8199fc1a41c2107982053aba809838c6
Sadly, this seems to break with an Apple M1 chip :( If you run it with multiprocessing.set_start_method('fork')
in many situations it simply crashes, and I have not been able to figure out why. If you try with 'spawn'
or 'forkserver'
, the following happens (because the sys.modules
trick does not work in those instances, as the subprocess does not copy over sys.modules
which is a CPython-level object):
Running @Chiron1991's py3 version with Python 3.8.2 under Rosetta:
(venv) george@MacBook-Pro autocomplete % python processify.py
29242
Traceback (most recent call last):
File "processify.py", line 80, in <module>
test()
File "processify.py", line 74, in test
print(test_function())
File "processify.py", line 43, in wrapper
p.start()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 283, in _Popen
return Popen(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'processify.<locals>.process_func'
I've been trying to find a fix but so far no luck.
Here I added the init and setup files to make it a module: https://github.com/dgerosa/processify
Personally, I've made this, which doesn't require that things can be pickled:
import multiprocessing as mp
def forked(fn):
"""
Does not work on Windows (except WSL2), since the fork syscall is not supported here.
fork creates a new process which inherits all of the memory without it being copied.
Memory is copied on write instead, meaning it is very cheap to create a new process
"""
def call(*args, **kwargs):
ctx = mp.get_context('fork')
q = ctx.Queue(1)
is_error = ctx.Value('b', False)
def target():
try:
q.put(fn(*args, **kwargs))
except BaseException as e:
is_error.value = True
q.put(e)
ctx.Process(target=target).start()
result = q.get()
if is_error.value:
raise result
return result
return call
# You can use forked as a decorator:
@forked
def add(x, y):
return x + y
https://stackoverflow.com/a/72490867/5031798 also works, without needing to do all the lower level forking ourselves, but I'm not sure how to turn it into a decorator
so helpful! thank you!