Created
August 25, 2012 22:42
-
-
Save zed/3471699 to your computer and use it in GitHub Desktop.
call Python callback from a non-python thread in a C extension module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/c_extension_module*.so | |
/build/* | |
/.do_built* | |
/log | |
/all.did |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test using 2.4-3.3 python | |
for XY in 2.4 2.5 2.6 2.7 3.0 3.1 3.2 3.3; do | |
PYTHON=python$XY redo test | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define Py_LIMITED_API | |
#define PY_SSIZE_T_CLEAN | |
// should be before any standard headers | |
#include "Python.h" | |
#include <omp.h> | |
#ifdef __cplusplus | |
extern "C" { | |
#endif | |
static void* | |
non_python_thread(void *python_callback) { | |
int error_flag = 0; | |
int tid = omp_get_thread_num(); | |
PyObject* tmp; | |
#ifdef WITH_THREAD | |
PyGILState_STATE state = PyGILState_Ensure(); | |
#else // serialize access if python compiled without threads | |
#pragma omp critical | |
{ | |
#endif | |
// call python callback | |
tmp = PyObject_CallFunction((PyObject*)python_callback, "i", tid); | |
if (tmp == NULL) { | |
PyErr_PrintEx(0); | |
error_flag = 1; // error occured | |
} | |
Py_XDECREF(tmp); | |
#ifdef WITH_THREAD | |
PyGILState_Release(state); | |
#else | |
} // end omp critical | |
#endif | |
return error_flag ? NULL : python_callback; | |
} | |
static PyObject * | |
spawn_non_python_thread(PyObject * self_unused, PyObject *args) { | |
PyObject* ret; | |
PyObject* python_callback; | |
int i, N, error_flag = 0; | |
/* it should be safe to call PyEval_InitThreads(): either current | |
function holds GIL or it is started from the main thread (and | |
there is no GIL yet) */ | |
#ifdef WITH_THREAD | |
PyEval_InitThreads(); | |
#endif | |
if (!PyArg_ParseTuple(args, "Oi:spawn_non_python_thread", | |
&python_callback, &N)) | |
return NULL; // propagate exception | |
Py_INCREF(python_callback); // hold on to it until threads are finished | |
Py_BEGIN_ALLOW_THREADS | |
omp_set_num_threads(N); | |
#pragma omp parallel for | |
for (i = 0; i < N; ++i) | |
if (!non_python_thread(python_callback)) | |
error_flag = 1; | |
Py_END_ALLOW_THREADS | |
// check whether non_python_thread failed | |
if (!error_flag) { // no error | |
ret = Py_None; Py_INCREF(ret); | |
} | |
else { | |
ret = NULL; | |
PyErr_SetString(PyExc_RuntimeError, "callback failed"); | |
} | |
Py_DECREF(python_callback); | |
return ret; | |
} | |
static PyMethodDef | |
module_functions[] = { | |
{ "spawn_non_python_thread", spawn_non_python_thread, | |
METH_VARARGS, "func docstring" }, | |
{ NULL, NULL, 0, NULL } | |
}; | |
// http://python3porting.com/cextensions.html | |
#if PY_MAJOR_VERSION >= 3 | |
#define MOD_ERROR_VAL NULL | |
#define MOD_SUCCESS_VAL(val) val | |
#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) | |
#define MOD_DEF(ob, name, doc, methods) \ | |
static struct PyModuleDef moduledef = { \ | |
PyModuleDef_HEAD_INIT, name, doc, -1, methods, }; \ | |
ob = PyModule_Create(&moduledef); | |
#else | |
#define MOD_ERROR_VAL | |
#define MOD_SUCCESS_VAL(val) | |
#define MOD_INIT(name) void init##name(void) | |
#define MOD_DEF(ob, name, doc, methods) \ | |
ob = Py_InitModule3(name, methods, doc); | |
#endif | |
MOD_INIT(c_extension_module) | |
{ | |
PyObject *m = NULL; | |
MOD_DEF(m, "c_extension_module", "module docstring", module_functions) | |
if (m == NULL) | |
return MOD_ERROR_VAL; | |
return MOD_SUCCESS_VAL(m); | |
} | |
#ifdef __cplusplus | |
} | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# remove non-git files | |
git clean -x -f -d |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# | |
# A minimal alternative to djb redo that doesn't support incremental builds. | |
# For the full version, visit http://github.com/apenwarr/redo | |
# | |
# The author disclaims copyright to this source file and hereby places it in | |
# the public domain. (2010 12 14) | |
# | |
# By default, no output coloring. | |
green="" | |
bold="" | |
plain="" | |
if [ -n "$TERM" -a "$TERM" != "dumb" ] && tty <&2 >/dev/null 2>&1; then | |
green="$(printf '\033[32m')" | |
bold="$(printf '\033[1m')" | |
plain="$(printf '\033[m')" | |
fi | |
_dirsplit() | |
{ | |
base=${1##*/} | |
dir=${1%$base} | |
} | |
dirname() | |
( | |
_dirsplit "$1" | |
dir=${dir%/} | |
echo "${dir:-.}" | |
) | |
_dirsplit "$0" | |
export REDO=$(cd "${dir:-.}" && echo "$PWD/$base") | |
DO_TOP= | |
if [ -z "$DO_BUILT" ]; then | |
DO_TOP=1 | |
[ -n "$*" ] || set all # only toplevel redo has a default target | |
export DO_BUILT=$PWD/.do_built | |
: >>"$DO_BUILT" | |
echo "Removing previously built files..." >&2 | |
sort -u "$DO_BUILT" | tee "$DO_BUILT.new" | | |
while read f; do printf "%s\0%s.did\0" "$f" "$f"; done | | |
xargs -0 rm -f 2>/dev/null | |
mv "$DO_BUILT.new" "$DO_BUILT" | |
DO_PATH=$DO_BUILT.dir | |
export PATH=$DO_PATH:$PATH | |
rm -rf "$DO_PATH" | |
mkdir "$DO_PATH" | |
for d in redo redo-ifchange; do | |
ln -s "$REDO" "$DO_PATH/$d"; | |
done | |
[ -e /bin/true ] && TRUE=/bin/true || TRUE=/usr/bin/true | |
for d in redo-ifcreate redo-stamp redo-always; do | |
ln -s $TRUE "$DO_PATH/$d"; | |
done | |
fi | |
_find_dofile_pwd() | |
{ | |
dofile=default.$1.do | |
while :; do | |
dofile=default.${dofile#default.*.} | |
[ -e "$dofile" -o "$dofile" = default.do ] && break | |
done | |
ext=${dofile#default} | |
ext=${ext%.do} | |
base=${1%$ext} | |
} | |
_find_dofile() | |
{ | |
local prefix= | |
while :; do | |
_find_dofile_pwd "$1" | |
[ -e "$dofile" ] && break | |
[ "$PWD" = "/" ] && break | |
target=${PWD##*/}/$target | |
tmp=${PWD##*/}/$tmp | |
prefix=${PWD##*/}/$prefix | |
cd .. | |
done | |
base=$prefix$base | |
} | |
_run_dofile() | |
{ | |
export DO_DEPTH="$DO_DEPTH " | |
export REDO_TARGET=$PWD/$target | |
local line1 | |
set -e | |
read line1 <"$PWD/$dofile" || true | |
cmd=${line1#"#!/"} | |
if [ "$cmd" != "$line1" ]; then | |
/$cmd "$PWD/$dofile" "$@" >"$tmp.tmp2" | |
else | |
:; . "$PWD/$dofile" >"$tmp.tmp2" | |
fi | |
} | |
_do() | |
{ | |
local dir=$1 target=$2 tmp=$3 | |
if [ ! -e "$target" ] || [ -d "$target" -a ! -e "$target.did" ]; then | |
printf '%sdo %s%s%s%s\n' \ | |
"$green" "$DO_DEPTH" "$bold" "$dir$target" "$plain" >&2 | |
echo "$PWD/$target" >>"$DO_BUILT" | |
dofile=$target.do | |
base=$target | |
ext= | |
[ -e "$target.do" ] || _find_dofile "$target" | |
if [ ! -e "$dofile" ]; then | |
echo "do: $target: no .do file" >&2 | |
return 1 | |
fi | |
[ ! -e "$DO_BUILT" ] || [ ! -d "$(dirname "$target")" ] || | |
: >>"$target.did" | |
( _run_dofile "$target" "$base" "$tmp.tmp" ) | |
rv=$? | |
if [ $rv != 0 ]; then | |
printf "do: %s%s\n" "$DO_DEPTH" \ | |
"$dir$target: got exit code $rv" >&2 | |
rm -f "$tmp.tmp" "$tmp.tmp2" | |
return $rv | |
fi | |
mv "$tmp.tmp" "$target" 2>/dev/null || | |
! test -s "$tmp.tmp2" || | |
mv "$tmp.tmp2" "$target" 2>/dev/null | |
rm -f "$tmp.tmp2" | |
else | |
echo "do $DO_DEPTH$target exists." >&2 | |
fi | |
} | |
# Make corrections for directories that don't actually exist yet. | |
_dir_shovel() | |
{ | |
local dir base | |
xdir=$1 xbase=$2 xbasetmp=$2 | |
while [ ! -d "$xdir" -a -n "$xdir" ]; do | |
_dirsplit "${xdir%/}" | |
xbasetmp=${base}__$xbase | |
xdir=$dir xbase=$base/$xbase | |
echo "xbasetmp='$xbasetmp'" >&2 | |
done | |
} | |
_redo() | |
{ | |
set +e | |
for i in "$@"; do | |
_dirsplit "$i" | |
_dir_shovel "$dir" "$base" | |
dir=$xdir base=$xbase basetmp=$xbasetmp | |
( cd "$dir" && _do "$dir" "$base" "$basetmp" ) | |
[ "$?" = 0 ] || return 1 | |
done | |
} | |
_redo "$@" | |
[ "$?" = 0 ] || exit 1 | |
if [ -n "$DO_TOP" ]; then | |
echo "Removing stamp files..." >&2 | |
[ ! -e "$DO_BUILT" ] || | |
while read f; do printf "%s.did\0" "$f"; done <"$DO_BUILT" | | |
xargs -0 rm -f 2>/dev/null | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import c_extension_module | |
def python_callback(tid): | |
print("python callback called %d" % tid) | |
def python_thread(): | |
print("python thread started %d" % tid) | |
print("python thread ended %d" % tid) | |
# `threading` might implicitly call PyEval_InitThreads() | |
try: | |
import threading | |
except ImportError: | |
print("threadless python %d" % tid) | |
else: | |
threading.Thread(target=python_thread).start() | |
# there should be no GIL yet (we have only one main thread) | |
c_extension_module.spawn_non_python_thread(python_callback, 4) | |
print("exit main thread") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from distutils.core import setup | |
from distutils.extension import Extension | |
setup( | |
name = 'Test python callback in a non-python thread', | |
ext_modules = [Extension("c_extension_module", | |
["c_extension_module.c"], | |
extra_compile_args=["-fopenmp"], | |
extra_link_args=["-fopenmp"] | |
)] | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# run main.py until expected error encountered or threadless python used | |
exec >&2 | |
: ${PYTHON:=python} | |
$PYTHON -V | |
# build C extension | |
$PYTHON setup.py build_ext --inplace --force >/dev/null | |
# run | |
for count in `seq 0 1000` | |
do | |
echo -n . | |
if $PYTHON main.py 2>&1 >/dev/null | tee log | | |
grep 'RuntimeError\|KeyError\|threadless\|Exception' | |
then | |
break | |
fi | |
done | |
cat log |
I came up with a simple workaround that can hide dirty tricks from end users (https://gist.github.com/3473376). We can import the threading module in the initializer of c_extension_module, i.e. PyImport_ImportModule("threading"). This can enforce Python's MainThread to be correctly logged by the threading module thus avoiding the KeyError issue. On OpenMP threads, my guess is that omp threads may have some special synchronization activities when initialized, and these additional activities forced Python's MainThread to schedule so that it is always captured by the standard threading module.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The KeyError() and deadlock cases can be avoided, check this out https://gist.github.com/3473376