Last active
March 12, 2025 19:37
-
Star
(149)
You must be signed in to star a gist -
Fork
(32)
You must be signed in to fork a gist
-
-
Save schlamar/2311116 to your computer and use it in GitHub Desktop.
processify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import traceback | |
from functools import wraps | |
from multiprocessing import Process, Queue | |
def processify(func): | |
'''Decorator to run a function as a process. | |
Be sure that every argument and the return value | |
is *pickable*. | |
The created process is joined, so the code does not | |
run in parallel. | |
''' | |
def process_func(q, *args, **kwargs): | |
try: | |
ret = func(*args, **kwargs) | |
except Exception: | |
ex_type, ex_value, tb = sys.exc_info() | |
error = ex_type, ex_value, ''.join(traceback.format_tb(tb)) | |
ret = None | |
else: | |
error = None | |
q.put((ret, error)) | |
# register original function with different name | |
# in sys.modules so it is pickable | |
process_func.__name__ = func.__name__ + 'processify_func' | |
setattr(sys.modules[__name__], process_func.__name__, process_func) | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
q = Queue() | |
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs) | |
p.start() | |
ret, error = q.get() | |
p.join() | |
if error: | |
ex_type, ex_value, tb_str = error | |
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str) | |
raise ex_type(message) | |
return ret | |
return wrapper | |
@processify | |
def test_function(): | |
return os.getpid() | |
@processify | |
def test_deadlock(): | |
return range(30000) | |
@processify | |
def test_exception(): | |
raise RuntimeError('xyz') | |
def test(): | |
print os.getpid() | |
print test_function() | |
print len(test_deadlock()) | |
test_exception() | |
if __name__ == '__main__': | |
test() |
Here I added the init and setup files to make it a module: https://github.com/dgerosa/processify
Personally, I've made this, which doesn't require that things can be pickled:
import multiprocessing as mp
def forked(fn):
"""
Does not work on Windows (except WSL2), since the fork syscall is not supported here.
fork creates a new process which inherits all of the memory without it being copied.
Memory is copied on write instead, meaning it is very cheap to create a new process
"""
def call(*args, **kwargs):
ctx = mp.get_context('fork')
q = ctx.Queue(1)
is_error = ctx.Value('b', False)
def target():
try:
q.put(fn(*args, **kwargs))
except BaseException as e:
is_error.value = True
q.put(e)
ctx.Process(target=target).start()
result = q.get()
if is_error.value:
raise result
return result
return call
# You can use forked as a decorator:
@forked
def add(x, y):
return x + y
https://stackoverflow.com/a/72490867/5031798 also works, without needing to do all the lower level forking ourselves, but I'm not sure how to turn it into a decorator
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sadly, this seems to break with an Apple M1 chip :( If you run it with
multiprocessing.set_start_method('fork')
in many situations it simply crashes, and I have not been able to figure out why. If you try with'spawn'
or'forkserver'
, the following happens (because thesys.modules
trick does not work in those instances, as the subprocess does not copy oversys.modules
which is a CPython-level object):Running @Chiron1991's py3 version with Python 3.8.2 under Rosetta:
I've been trying to find a fix but so far no luck.