Skip to content

Instantly share code, notes, and snippets.

@James-E-A
Last active August 9, 2025 16:07
Show Gist options
  • Save James-E-A/9d34fd86bee247eb169cdd71ce7e3183 to your computer and use it in GitHub Desktop.
Save James-E-A/9d34fd86bee247eb169cdd71ce7e3183 to your computer and use it in GitHub Desktop.
keep Windows awake until process exits
import argparse
import asyncio
import ctypes
import concurrent
from contextlib import AsyncExitStack, ExitStack, asynccontextmanager, closing, contextmanager
import csv
import functools
import inspect
import logging
import os
from selectors import DefaultSelector, EVENT_READ
import socket
import subprocess
import sys
import threading
import time
try:
import tkinter, tkinter.messagebox, tkinter.simpledialog
except ImportError:
tkinter = None
import traceback
WAKELOCK_SUSPEND = (1 << 0)
WAKELOCK_SCREEN = (1 << 1)
def surrogate_wakelock(target_pid, mode=WAKELOCK_SUSPEND, *, _poll_delay=30):
"Keep the system active until the TARGET process (which does NOT need to be a child) exits."
if not pid_alive(target_pid):
logging.warning("target process was invalid, or already died")
return
with wakelock(mode):
while pid_alive(target_pid):
time.sleep(_poll_delay)
if sys.platform == "win32":
Kernel32 = ctypes.windll.Kernel32
ES_AWAYMODE_REQUIRED = 0x00000040
ES_CONTINUOUS = 0x80000000
ES_DISPLAY_REQUIRED = 0x00000002
ES_SYSTEM_REQUIRED = 0x00000001
ES_USER_PRESENT = 0x00000004 # deprecated
_OEM_CP = Kernel32.GetOEMCP()
OEM_ENCODING = {65000: "utf-8", 1252: "windows-1252", 20127: "us-ascii"}.get(_OEM_CP, f"cp{_OEM_CP:03d}")
@contextmanager
def wakelock(mode=WAKELOCK_SCREEN):
# https://github.com/python/cpython/blob/3.13/Lib/selectors.py#L239-L240
if (not mode) or (mode & ~(WAKELOCK_SUSPEND | WAKELOCK_SCREEN)):
raise ValueError("Invalid wakelock mode: {!r}".format(mode))
# https://github.com/python/cpython/blob/3.13.0/Lib/selectors.py#L406-L407
flags = ((mode & WAKELOCK_SUSPEND and ES_SYSTEM_REQUIRED)
| (mode & WAKELOCK_SCREEN and ES_DISPLAY_REQUIRED))
with ExitStack() as ctx:
@ctx.enter_context
@_threading_contextmanager(flags, _threading_name="wakelock", _threading_daemon=True)
def t(flags):
# this thread will hold the wakelock as long as it exists...
Kernel32.SetThreadExecutionState(flags | ES_CONTINUOUS)
try:
yield threading.current_thread()
finally:
# ...and the wakelock will be released by its death
return
yield
def pid_alive(pid):
# workaround because os.kill(pid, 0) does NOT work for liveness detection on Windows
results = _tasklist(f"PID eq {pid}")
if len(results) < 1:
return False
else:
assert len(results) == 1
result = results[0]
assert int(result["PID"]) == pid
return True
def _tasklist(*filters):
with PopenEx([
"TASKLIST",
*(arg for filter in filters for arg in ["/FI", filter]),
"/FO", "CSV"
], stdout=subprocess.PIPE, check=True, encoding=OEM_ENCODING) as p:
return list(csv.DictReader(p.stdout))
else:
@contextmanager
def wakelock(mode=WAKELOCK_SCREEN):
# https://systemd.io/INHIBITOR_LOCKS/#:~:text=Inhibit(),-is%20the
# https://github.com/elogind/elogind/blob/v255.17/man/org.freedesktop.login1.xml#L688-L702
# https://developer.apple.com/documentation/iokit/1557082-iopmassertioncreatewithpropertie
raise NotImplementedError("Mac & Linux TODO")
def pid_alive(pid):
os.kill(pid, 0)
raise NotImplementedError("Mac & Linux TODO")
@contextmanager
def PopenEx(*args, check=False, **kwargs):
try:
with subprocess.Popen(*args, **kwargs) as p:
yield p
finally:
# https://github.com/python/cpython/blob/v3.13.0/Lib/subprocess.py#L404-L420
if check:
# no need to call wait(), as p.__exit__() already handled that
retcode = p.returncode
if retcode:
cmd = kwargs.get("cmd")
if cmd is None:
cmd = args[0]
raise subprocess.CalledProcessError(retcode, cmd)
@contextmanager
def socketpair(*args, **kwargs):
with ExitStack() as ctx:
sock1, sock2 = map(ctx.enter_context, socket.socketpair())
yield sock1, sock2
def thread_start_immed(*args, _threading_daemon=True, **kwargs):
threading_kw = {
key[11:]: kwargs.pop(key) for key in [
key for key in kwargs.keys()
if key.startswith('_threading_')
]
}
return functools.partial(
_thread_start, args=args, kwargs=kwargs,
daemon=_threading_daemon, **threading_kw
)
def _thread_start(target, _run_coroutine=None, **kwargs):
if _run_coroutine is None:
_run_coroutine = inspect.iscoroutinefunction(target)
if _run_coroutine:
target = _make_self_running(target)
t = threading.Thread(target=target, **kwargs)
t.start()
return t
def _make_self_running(target):
return functools.update_wrapper(
functools.partial(_async_call, target),
target
)
def _async_call(f, *args, **kwargs):
asyncio_kw = {
key[9:]: kwargs.pop(key) for key in [
key for key in kwargs.keys()
if key.startswith('_asyncio_')
]
}
return asyncio.run(
f(*args, **kwargs),
**asyncio_kw
)
def _threading_contextmanager(*args, **kwargs):
@contextmanager
def wrap(func):
if inspect.isgeneratorfunction(func):
func = contextmanager(func)
elif inspect.isasyncgenfunction(func):
func = asynccontextmanager(func)
with socketpair() as (sock1, sock2):
t = None
try:
_exc = None
_result = None
@thread_start_immed(*args, **kwargs)
@functools.wraps(func) # set thread name, if possible
async def t(*args, **kwargs):
loop = asyncio.get_running_loop()
sock2.setblocking(False)
async with AsyncExitStack() as ctx:
try:
nonlocal _result
_result = await _enter_context(ctx, func(*args, **kwargs))
except Exception as exc:
nonlocal _exc
_exc = exc
sock2.send(b"\x01")
else:
sock2.send(b"\x00")
finally:
sys.exit(int.from_bytes(await loop.sock_recv(sock2, 1)))
try:
with DefaultSelector() as selector:
selector.register(sock1, EVENT_READ)
for key, events in selector.select():
assert key.fileobj is sock1
assert events & EVENT_READ
code = sock1.recv(1)[0]
if code:
raise _exc
else:
yield _result
break
else:
assert False
except:
sock1.send(b"\x01")
raise
else:
sock1.send(b"\x00")
finally:
if t is not None:
#sock1.send(b"*")
t.join()
return wrap
async def _enter_context(stack, obj):
if hasattr(obj, '__aenter__'):
return await stack.enter_async_context(obj)
return stack.enter_context(obj)
def main(argv):
rootLogger = logging.getLogger()
parser = argparse.ArgumentParser()
parser.add_argument('--gui', action=argparse.BooleanOptionalAction)
parser.add_argument('--quiet', action=argparse.BooleanOptionalAction, default=False)
parser.add_argument('--poll-delay', type=float, nargs='?', default=10 if not __debug__ else 1)
parser.add_argument('target_pid', type=int, nargs='?')
options = parser.parse_args(argv[1:])
gui = options.gui
target_pid = options.target_pid
if gui is None:
if tkinter is not None:
# if --gui wasn't specified, but target_pid was, assume batch mode
gui = (target_pid is None)
else:
gui = None
poll_delay = options.poll_delay
if not options.quiet:
rootLogger.setLevel(logging.DEBUG)
rootLogger.addFilter(LoggingWarningsAreFatal())
with ExitStack() as ctx:
if gui:
ctx.enter_context(TkReportBlock())
ctx.enter_context(_with_logging_handler(rootLogger, TkLogHandler))
if target_pid is None:
if gui:
askinteger = lambda prompt: tkinter.simpledialog.askinteger(None, prompt)
else:
askinteger = lambda prompt: int(input(f"{prompt}\n> "))
target_pid = askinteger("target pid?")
## if target_pid == -1:
## target_pid = subprocess.Popen(["timeout" if sys.platform == "win32" else "sleep", str(3)], shell=True).pid
surrogate_wakelock(target_pid, _poll_delay=poll_delay)
class LoggingWarningsAreFatal(logging.Filter):
def filter(self, record):
if record.levelno >= logging.WARNING:
raise UserWarning(record.msg)
return True
class TkLogHandler(logging.Handler):
def __init__(self, parent=None, *args, **kwargs):
self.tk_parent = parent
super().__init__(*args, **kwargs)
def emit(self, record):
if record.levelno >= logging.ERROR:
tkinter.messagebox.showerror(message=record.msg, parent=self.tk_parent)
elif record.levelno >= logging.WARNING:
tkinter.messagebox.showwarning(message=record.msg, parent=self.tk_parent)
elif record.levelno >= logging.INFO:
tkinter.messagebox.showinfo(message=record.msg, parent=self.tk_parent)
@contextmanager
def TkReportBlock(parent=None, done_message="done"):
try:
yield
except Exception as exc:
error_message = str(exc) or "".join(traceback.format_exception(exc))
try:
tkinter.messagebox.showerror(message=error_message, parent=parent)
except Exception:
tkinter.messagebox.showerror(message=error_message)
else:
tkinter.messagebox.showinfo(message=done_message, parent=parent)
@contextmanager
def _with_logging_handler(logger_or_handler, handler=None):
if handler is None and (isinstance(logger_or_handler, logging.Handler) or issubclass(logger_or_handler, logging.Handler)):
# 1-argument form
logger, handler = logger.getLogger(), logger_or_handler
else:
# 2-argument form
logger = logger_or_handler
if issubclass(handler, logging.Handler):
handler = handler()
logger.addHandler(handler)
try:
yield handler
finally:
logger.removeHandler(handler)
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment