Last active
January 1, 2016 01:06
-
-
Save zackw/d9e6afcef851929b0d45 to your computer and use it in GitHub Desktop.
subprocess.Popen wrapper that kills descendants on subprocess exit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# jobspawntest.py - experimental subprocess.Popen wrapper that kills | |
# descendants on subprocess exit. Windows testing needed. | |
# Copyright 2015 Zack Weinberg <[email protected]> | |
# Copying and distribution of this file, with or without modification, | |
# are permitted in any medium without royalty provided the copyright | |
# notice and this notice are preserved. This file is offered as-is, | |
# without any warranty. | |
__all__ = ('Job',) | |
import os | |
import subprocess | |
import signal | |
if os.name == 'posix': | |
import errno | |
def _do_popen(args, kwargs): | |
if 'preexec_fn' in kwargs: | |
caller_preexec = kwargs['preexec_fn'] | |
else: | |
caller_preexec = lambda: None | |
def child_call_setpgid_and_chain(): | |
os.setpgid(0, 0) | |
caller_preexec() | |
kwargs['preexec_fn'] = child_call_setpgid_and_chain | |
proc = subprocess.Popen(*args, **kwargs) | |
# The parent process must _also_ call setpgid() to prevent a race. | |
# See https://www.gnu.org/software/libc/manual/html_node/Launching-Jobs.html | |
# We may get EACCES here if the child has already called execve(); | |
# in that case it has also already called setpgid() so no worries. | |
pgid = proc.pid | |
try: | |
os.setpgid(pgid, pgid) | |
except OSError as e: | |
if e.errno != errno.EACCES: | |
raise | |
return (proc, pgid) | |
def _do_send_signal(job, signal): | |
os.killpg(job, signal) | |
def _do_terminate(job): | |
_do_send_signal(job, signal.SIGTERM) | |
def _do_kill(job): | |
_do_send_signal(job, signal.SIGKILL) | |
elif os.name == 'nt': | |
import sys | |
# Nested job objects were added in Windows 8, which identifies | |
# itself as 6.2 in getwindowsversion(). | |
ver = sys.getwindowsversion() | |
if ver.major > 6 or (ver.major == 6 and ver.minor >= 2): | |
_ADD_CREATIONFLAGS = 0x00000004 # CREATE_SUSPENDED | |
else: | |
_ADD_CREATIONFLAGS = 0x01000004 # CREATE_SUSPENDED|CREATE_BREAKAWAY | |
import ctypes | |
from ctypes.wintypes import HANDLE, LPVOID, UINT, BOOL, DWORD, LONG | |
def _ec_falsy_winerror(result, *etc): | |
if not result: | |
raise ctypes.WinError() | |
return result | |
def _ec_m1_winerror(result, *etc): | |
if result < 0: | |
raise ctypes.WinError() | |
return result | |
_kernel32 = ctypes.WinDLL("kernel32.dll") | |
_kernel32.CreateJobObjectW.argtypes = (LPVOID, LPVOID) | |
_kernel32.CreateJobObjectW.restype = HANDLE | |
_kernel32.CreateJobObjectW.errcheck = _ec_falsy_winerror | |
_kernel32.TerminateJobObject.argtypes = (HANDLE, UINT) | |
_kernel32.TerminateJobObject.restype = BOOL | |
_kernel32.TerminateJobObject.errcheck = _ec_falsy_winerror | |
_kernel32.AssignProcessToJobObject.argtypes = (HANDLE, HANDLE) | |
_kernel32.AssignProcessToJobObject.restype = BOOL | |
_kernel32.AssignProcessToJobObject.errcheck = _ec_falsy_winerror | |
_kernel32.CloseHandle.argtypes = (HANDLE,) | |
_kernel32.CloseHandle.restype = BOOL | |
_kernel32.CloseHandle.errcheck = _ec_falsy_winerror | |
# defensiveness against handle leakage | |
class wrap_HANDLE(object): | |
__slots__ = ('_h',) | |
def __init__(self, h): self._h = h | |
def __int__(self): return self._h | |
def __nonzero__(self): return bool(self._h) | |
def __del__(self, _CloseHandle=_kernel32.CloseHandle): | |
if self._h: | |
_CloseHandle(self._h) | |
self._h = 0 | |
close = __del__ | |
# subprocess.Popen retains the process handle but not the | |
# thread handle, which we need to resume the suspended thread. | |
# The only documented way to recover a thread handle appears | |
# to be using the "tool help" API, which, fortunately, is in | |
# kernel32 since XP. | |
class THREADENTRY32(ctypes.Structure): | |
_fields_ = [ | |
("dwSize", DWORD), | |
("cntUsage", DWORD), | |
("th32ThreadID", DWORD), | |
("th32OwnerProcessID", DWORD), | |
("tpBasePri", LONG), | |
("tpDeltaPri", LONG), | |
("dwFlags", DWORD), | |
] | |
LPTHREADENTRY32 = ctypes.POINTER(THREADENTRY32) | |
_kernel32.CreateToolhelp32Snapshot.argtypes = (DWORD, DWORD) | |
_kernel32.CreateToolhelp32Snapshot.restype = HANDLE | |
_kernel32.CreateToolhelp32Snapshot.errcheck = _ec_falsy_winerror | |
_kernel32.Thread32First.argtypes = (HANDLE, LPTHREADENTRY32) | |
_kernel32.Thread32First.restype = BOOL | |
_kernel32.Thread32First.errcheck = _ec_falsy_winerror | |
_kernel32.Thread32Next.argtypes = (HANDLE, LPTHREADENTRY32) | |
_kernel32.Thread32Next.restype = BOOL | |
#_kernel32.Thread32Next cannot fail | |
_kernel32.OpenThread.argtypes = (DWORD, BOOL, DWORD) | |
_kernel32.OpenThread.restype = HANDLE | |
_kernel32.OpenThread.errcheck = _ec_falsy_winerror | |
_kernel32.ResumeThread.argtypes = (HANDLE,) | |
_kernel32.ResumeThread.restype = DWORD | |
_kernel32.ResumeThread.errcheck = _ec_m1_winerror | |
def _resume_threads(pid): | |
thblock = THREADENTRY32() | |
thblock.dwSize = ctypes.sizeof(thblock) | |
pthblock = ctypes.pointer(thblock) | |
try: | |
# TH32CS_SNAPTHREAD (0x4) gives us all threads on the whole | |
# system, and we have to filter them. There's no way to get | |
# kernel32 to do that for us. | |
hsnap = _kernel32.CreateToolhelp32Snapshot(0x4, 0) | |
_kernel32.Thread32First(hsnap, pthblock) | |
while True: | |
if pthblock.th32OwnerProcessID == pid: | |
try: | |
hthread = _kernel32.OpenThread( | |
0x0002, # THREAD_SUSPEND_RESUME | |
False, pthblock.th32ThreadID) | |
_kernel32.ResumeThread(hthread) | |
finally: | |
_kernel32.CloseHandle(hthread) | |
if not _kernel32.Thread32Next(hsnap, pthblock): | |
break | |
finally: | |
_kernel32.CloseHandle(hsnap) | |
def _do_popen(args, kwargs): | |
job = _kernel32.CreateJobObjectW(None, None) | |
flags = kwargs.get('creationflags', 0) | |
flags |= _ADD_CREATIONFLAGS | |
kwargs['creationflags'] = flags | |
proc = subprocess.Popen(*args, **kwargs) | |
_kernel32.AssignProcessToJobObject(job, int(proc._handle)) | |
_resume_threads(proc.pid) | |
return (proc, wrap_HANDLE(job)) | |
def _do_send_signal(job, sig): | |
if sig == signal.SIGTERM: | |
_do_terminate(job) | |
else: | |
# There's no way to send CTRL_C_EVENT or CTRL_BREAK_EVENT to an | |
# entire job, as far as I can tell. | |
raise ValueError("Unsupported signal: {}".format(sig)) | |
def _do_terminate(job): | |
try: | |
hjob = int(job) | |
if hjob: | |
_kernel32.TerminateJobObject(hjob, 1) | |
job.close() | |
except OSError as e: | |
# Comments in Windows subprocess.terminate() say that | |
# "ERROR_ACCESS_DENIED (winerror 5) is received when the | |
# process already died." MSDN does not document whether | |
# this is true for job objects, but it seems plausible. | |
if e.winerror != 5: | |
raise | |
# We are not in a position to call GetExitCodeProcess here. | |
# Just leave it to subprocess.poll(). | |
def _do_kill(job): | |
_do_terminate(job) | |
else: | |
raise ValueError('sorry, not implemented: process groups for ostype "{}"' | |
.format(os.name)) | |
class Job(object): | |
"""A Job object wraps a subprocess.Popen object; it is functionally | |
identical, except that terminate() and kill() are applied to | |
all child processes _of_ the child process, as well as the | |
child process itself. Moreover, when the child process exits, | |
all of its children are killed. | |
On Unix, this is accomplished with process groups; on Windows, | |
with job objects. Descendant processes _can_ escape containment; | |
on Unix, by using setpgid(); on Windows, by being created as | |
"breakaway" processes. | |
On Unix, send_signal() is also applied to the process group; on | |
Windows, this only works for signal.SIGTERM (which is mapped to | |
terminate()). | |
""" | |
def __init__(self, *args, **kwargs): | |
if len(args) > 1: | |
raise TypeError("Job() optional arguments must be specified as " | |
"keyword arguments") | |
self._proc, self._job = _do_popen(args, kwargs) | |
def send_signal(self, signal): | |
return _do_send_signal(self._job, signal) | |
def terminate(self): | |
return _do_terminate(self._job) | |
def kill(self): | |
return _do_kill(self._job) | |
def poll(self): | |
rv = self._proc.poll() | |
if rv is not None and self._job is not None: | |
_do_terminate(self._job) | |
self._job = None | |
# Forward all other actions to _proc. | |
def __getattr__(self, aname): | |
return getattr(self._proc, aname) | |
if __name__ == '__main__': | |
import sys | |
import time | |
def test_main(): | |
if len(sys.argv) > 1 and sys.argv[1] == 'grandchild': | |
sys.stdout.write('grandchild: started\n') | |
time.sleep(2) | |
sys.stdout.write('grandchild: message should not appear\n') | |
sys.exit(1) | |
elif len(sys.argv) > 1 and sys.argv[1] == 'child': | |
sys.stdout.write('child: started\n') | |
proc = subprocess.Popen([ | |
sys.executable, __file__, 'grandchild']) | |
rc = proc.wait() | |
sys.stdout.write('child: grandchild exit {} (should not appear)\n' | |
.format(rc)) | |
else: | |
sys.stdout.write('parent: started\n') | |
job = Job([ | |
sys.executable, __file__, 'child']) | |
time.sleep(1) | |
job.terminate() | |
rc = job.wait() | |
sys.stdout.write('parent: child exit {}\n'.format(rc)) | |
test_main() |
Python 2.7.10, Win10 X64:
Traceback (most recent call last):
File "jobspawntest.py", line 84, in <module>
_kernel32.CreateJobObject.argtypes = (LPVOID, LPVOID)
File "C:\Python27\lib\ctypes\__init__.py", line 378, in __getattr__
func = self.__getitem__(name)
File "C:\Python27\lib\ctypes\__init__.py", line 383, in __getitem__
func = self._FuncPtr((name_or_ordinal, self))
AttributeError: function 'CreateJobObject' not found
> python jobspawntest.py
parent: started
Traceback (most recent call last):
File "jobspawntest.py", line 292, in <module>
test_main()
File "jobspawntest.py", line 286, in test_main
sys.executable, __file__, 'child'])
File "jobspawntest.py", line 244, in __init__
self._proc, self._job = _do_popen(args, kwargs)
File "jobspawntest.py", line 187, in _do_popen
_kernel32.AssignProcessToJobObject(job, proc._handle)
ctypes.ArgumentError: argument 2: <type 'exceptions.TypeError'>: wrong type
whack one mole, up comes the next one
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If this is working correctly, it will print
when run as a standalone. The number on the last line might vary.