-
-
Save stuaxo/889db016e51264581b50 to your computer and use it in GitHub Desktop.
processify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# modified from https://gist.github.com/schlamar/2311116#file-processify-py-L17 | |
# also see http://stackoverflow.com/questions/2046603/is-it-possible-to-run-function-in-a-subprocess-without-threading-or-writing-a-se | |
import inspect | |
import os | |
import sys | |
import traceback | |
from functools import wraps | |
from multiprocessing import Process, Queue | |
class Sentinel: | |
pass | |
def processify(func): | |
'''Decorator to run a function as a process. | |
Be sure that every argument and the return value | |
is *pickable*. | |
The created process is joined, so the code does not | |
run in parallel. | |
''' | |
def process_generator_func(q, *args, **kwargs): | |
result = None | |
error = None | |
it = iter(func()) | |
while error is None and result != Sentinel: | |
try: | |
result = next(it) | |
error = None | |
except StopIteration: | |
result = Sentinel | |
error = None | |
except Exception: | |
ex_type, ex_value, tb = sys.exc_info() | |
error = ex_type, ex_value, ''.join(traceback.format_tb(tb)) | |
result = None | |
q.put((result, error)) | |
def process_func(q, *args, **kwargs): | |
try: | |
result = func(*args, **kwargs) | |
except Exception: | |
ex_type, ex_value, tb = sys.exc_info() | |
error = ex_type, ex_value, ''.join(traceback.format_tb(tb)) | |
result = None | |
else: | |
error = None | |
q.put((result, error)) | |
def wrap_func(*args, **kwargs): | |
# register original function with different name | |
# in sys.modules so it is pickable | |
process_func.__name__ = func.__name__ + 'processify_func' | |
setattr(sys.modules[__name__], process_func.__name__, process_func) | |
q = Queue() | |
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs) | |
p.start() | |
result, error = q.get() | |
p.join() | |
if error: | |
ex_type, ex_value, tb_str = error | |
message = '%s (in subprocess)\n%s' % (str(ex_value), tb_str) | |
raise ex_type(message) | |
return result | |
def wrap_generator_func(*args, **kwargs): | |
# register original function with different name | |
# in sys.modules so it is pickable | |
process_generator_func.__name__ = func.__name__ + 'processify_generator_func' | |
setattr(sys.modules[__name__], process_generator_func.__name__, process_generator_func) | |
q = Queue() | |
p = Process(target=process_generator_func, args=[q] + list(args), kwargs=kwargs) | |
p.start() | |
result = None | |
error = None | |
while error is None: | |
result, error = q.get() | |
if result == Sentinel: | |
break | |
yield result | |
p.join() | |
if error: | |
ex_type, ex_value, tb_str = error | |
message = '%s (in subprocess)\n%s' % (str(ex_value), tb_str) | |
raise ex_type(message) | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
if inspect.isgeneratorfunction(func): | |
return wrap_generator_func(*args, **kwargs) | |
else: | |
return wrap_func(*args, **kwargs) | |
return wrapper | |
@processify | |
def test_function(): | |
return os.getpid() | |
@processify | |
def test_generator_func(): | |
for msg in ["generator", "function"]: | |
yield msg | |
@processify | |
def test_deadlock(): | |
return range(30000) | |
@processify | |
def test_exception(): | |
raise RuntimeError('xyz') | |
def test(): | |
print(os.getpid()) | |
print(test_function()) | |
print(list(test_generator_func())) | |
print(len(test_deadlock())) | |
test_exception() | |
if __name__ == '__main__': | |
test() |
np, it really just built on the original by @schlamar
This should probably be inside a library, I haven't had to use it since I modified it in 2016, but it was handy at the time for providing isolation between my new code and a whole load of knarly old code that I didn't fully trust to not do something weird and break my own.
It does feel like this should be in a library, if that hasn't happened already.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this - proved very useful for hacking around a CUDA memory leak that slowly accrued over many iterations of my code. Launching the code in the subprocess clears the memory when it terminates. Now I should probably figure out what is causing the leak ...