Skip to content

Instantly share code, notes, and snippets.

@lihuanshuai
Forked from schlamar/processify.py
Last active November 8, 2018 21:05
Show Gist options
  • Save lihuanshuai/3aacdf99d0ea9d413815 to your computer and use it in GitHub Desktop.
Save lihuanshuai/3aacdf99d0ea9d413815 to your computer and use it in GitHub Desktop.
python multi process processify
import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
def processify(func):
'''Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
'''
def process_func(q, *args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
ret = None
else:
error = None
q.put((ret, error))
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
@wraps(func)
def wrapper(*args, **kwargs):
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
p.start()
p.join()
ret, error = q.get()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
raise ex_type(message)
return ret
return wrapper
@processify
def test_function():
return os.getpid()
@processify
def test_exception():
raise RuntimeError('xyz')
def test():
print os.getpid()
print test_function()
test_exception()
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment