Last active
November 18, 2021 11:26
-
-
Save fxthomas/aa59e25df0bfa15d46a5bcb01b7017dd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
from subprocess import Popen | |
import subprocess | |
import win32job | |
import win32process | |
import win32api | |
class JobPopen(Popen): | |
"""Start a process in a new Win32 job object. | |
This `subprocess.Popen` subclass takes the same arguments as Popen and | |
behaves the same way. In addition to that, created processes will be | |
assigned to a new anonymous Win32 job object on startup, which will | |
guarantee that the processes will be terminated by the OS as soon as | |
either the Popen object, job object handle or parent Python process are | |
closed. | |
""" | |
class _winapijobhandler(object): | |
"""Patches the native CreateProcess function in the subprocess module | |
to assign created threads to the given job""" | |
def __init__(self, oldapi, job): | |
self._oldapi = oldapi | |
self._job = job | |
def __getattr__(self, key): | |
if key != "CreateProcess": | |
return getattr(self._oldapi, key) # Any other function is run as before | |
else: | |
return self.CreateProcess # CreateProcess will call the function below | |
def CreateProcess(self, *args, **kwargs): | |
hp, ht, pid, tid = self._oldapi.CreateProcess(*args, **kwargs) | |
win32job.AssignProcessToJobObject(self._job, hp) | |
win32process.ResumeThread(ht) | |
return hp, ht, pid, tid | |
def __init__(self, *args, **kwargs): | |
"""Start a new process using an anonymous job object. Takes the same arguments as Popen""" | |
# Create a new job object | |
self._win32_job = self._create_job_object() | |
# Temporarily patch the subprocess creation logic to assign created | |
# processes to the new job, then resume execution normally. | |
CREATE_SUSPENDED = 0x00000004 | |
kwargs.setdefault("creationflags", 0) | |
kwargs["creationflags"] |= CREATE_SUSPENDED | |
try: | |
_winapi = subprocess._winapi # Python 3 | |
_winapi_key = "_winapi" | |
except AttributeError: | |
_winapi = subprocess._subprocess # Python 2 | |
_winapi_key = "_subprocess" | |
try: | |
setattr(subprocess, _winapi_key, JobPopen._winapijobhandler(_winapi, self._win32_job)) | |
super(JobPopen, self).__init__(*args, **kwargs) | |
finally: | |
setattr(subprocess, _winapi_key, _winapi) | |
def _create_job_object(self): | |
"""Create a new anonymous job object""" | |
hjob = win32job.CreateJobObject(None, "") | |
extended_info = win32job.QueryInformationJobObject(hjob, win32job.JobObjectExtendedLimitInformation) | |
extended_info['BasicLimitInformation']['LimitFlags'] = win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | |
win32job.SetInformationJobObject(hjob, win32job.JobObjectExtendedLimitInformation, extended_info) | |
return hjob | |
def _close_job_object(self, hjob): | |
"""Close the handle to a job object, terminating all processes inside it""" | |
if self._win32_job: | |
win32api.CloseHandle(self._win32_job) | |
self._win32_job = None | |
# This ensures that no remaining subprocesses are found when the process | |
# exits from a `with JobPopen(...)` block. | |
def __exit__(self, exc_type, value, traceback): | |
super(JobPopen, self).__exit__(exc_type, value, traceback) | |
self._close_job_object(self._win32_job) | |
# Python does not keep a reference outside of the parent class when the | |
# interpreter exits, which is why we keep it here. | |
_Popen = subprocess.Popen | |
def __del__(self): | |
self._Popen.__del__(self) | |
self._close_job_object(self._win32_job) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment