Last active
August 9, 2025 16:07
-
-
Save James-E-A/9d34fd86bee247eb169cdd71ce7e3183 to your computer and use it in GitHub Desktop.
keep Windows awake until process exits
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import asyncio | |
import ctypes | |
import concurrent | |
from contextlib import AsyncExitStack, ExitStack, asynccontextmanager, closing, contextmanager | |
import csv | |
import functools | |
import inspect | |
import logging | |
import os | |
from selectors import DefaultSelector, EVENT_READ | |
import socket | |
import subprocess | |
import sys | |
import threading | |
import time | |
try: | |
import tkinter, tkinter.messagebox, tkinter.simpledialog | |
except ImportError: | |
tkinter = None | |
import traceback | |
WAKELOCK_SUSPEND = (1 << 0) | |
WAKELOCK_SCREEN = (1 << 1) | |
def surrogate_wakelock(target_pid, mode=WAKELOCK_SUSPEND, *, _poll_delay=30): | |
"Keep the system active until the TARGET process (which does NOT need to be a child) exits." | |
if not pid_alive(target_pid): | |
logging.warning("target process was invalid, or already died") | |
return | |
with wakelock(mode): | |
while pid_alive(target_pid): | |
time.sleep(_poll_delay) | |
if sys.platform == "win32": | |
Kernel32 = ctypes.windll.Kernel32 | |
ES_AWAYMODE_REQUIRED = 0x00000040 | |
ES_CONTINUOUS = 0x80000000 | |
ES_DISPLAY_REQUIRED = 0x00000002 | |
ES_SYSTEM_REQUIRED = 0x00000001 | |
ES_USER_PRESENT = 0x00000004 # deprecated | |
_OEM_CP = Kernel32.GetOEMCP() | |
OEM_ENCODING = {65000: "utf-8", 1252: "windows-1252", 20127: "us-ascii"}.get(_OEM_CP, f"cp{_OEM_CP:03d}") | |
@contextmanager | |
def wakelock(mode=WAKELOCK_SCREEN): | |
# https://github.com/python/cpython/blob/3.13/Lib/selectors.py#L239-L240 | |
if (not mode) or (mode & ~(WAKELOCK_SUSPEND | WAKELOCK_SCREEN)): | |
raise ValueError("Invalid wakelock mode: {!r}".format(mode)) | |
# https://github.com/python/cpython/blob/3.13.0/Lib/selectors.py#L406-L407 | |
flags = ((mode & WAKELOCK_SUSPEND and ES_SYSTEM_REQUIRED) | |
| (mode & WAKELOCK_SCREEN and ES_DISPLAY_REQUIRED)) | |
with ExitStack() as ctx: | |
@ctx.enter_context | |
@_threading_contextmanager(flags, _threading_name="wakelock", _threading_daemon=True) | |
def t(flags): | |
# this thread will hold the wakelock as long as it exists... | |
Kernel32.SetThreadExecutionState(flags | ES_CONTINUOUS) | |
try: | |
yield threading.current_thread() | |
finally: | |
# ...and the wakelock will be released by its death | |
return | |
yield | |
def pid_alive(pid): | |
# workaround because os.kill(pid, 0) does NOT work for liveness detection on Windows | |
results = _tasklist(f"PID eq {pid}") | |
if len(results) < 1: | |
return False | |
else: | |
assert len(results) == 1 | |
result = results[0] | |
assert int(result["PID"]) == pid | |
return True | |
def _tasklist(*filters): | |
with PopenEx([ | |
"TASKLIST", | |
*(arg for filter in filters for arg in ["/FI", filter]), | |
"/FO", "CSV" | |
], stdout=subprocess.PIPE, check=True, encoding=OEM_ENCODING) as p: | |
return list(csv.DictReader(p.stdout)) | |
else: | |
@contextmanager | |
def wakelock(mode=WAKELOCK_SCREEN): | |
# https://systemd.io/INHIBITOR_LOCKS/#:~:text=Inhibit(),-is%20the | |
# https://github.com/elogind/elogind/blob/v255.17/man/org.freedesktop.login1.xml#L688-L702 | |
# https://developer.apple.com/documentation/iokit/1557082-iopmassertioncreatewithpropertie | |
raise NotImplementedError("Mac & Linux TODO") | |
def pid_alive(pid): | |
os.kill(pid, 0) | |
raise NotImplementedError("Mac & Linux TODO") | |
@contextmanager | |
def PopenEx(*args, check=False, **kwargs): | |
try: | |
with subprocess.Popen(*args, **kwargs) as p: | |
yield p | |
finally: | |
# https://github.com/python/cpython/blob/v3.13.0/Lib/subprocess.py#L404-L420 | |
if check: | |
# no need to call wait(), as p.__exit__() already handled that | |
retcode = p.returncode | |
if retcode: | |
cmd = kwargs.get("cmd") | |
if cmd is None: | |
cmd = args[0] | |
raise subprocess.CalledProcessError(retcode, cmd) | |
@contextmanager | |
def socketpair(*args, **kwargs): | |
with ExitStack() as ctx: | |
sock1, sock2 = map(ctx.enter_context, socket.socketpair()) | |
yield sock1, sock2 | |
def thread_start_immed(*args, _threading_daemon=True, **kwargs): | |
threading_kw = { | |
key[11:]: kwargs.pop(key) for key in [ | |
key for key in kwargs.keys() | |
if key.startswith('_threading_') | |
] | |
} | |
return functools.partial( | |
_thread_start, args=args, kwargs=kwargs, | |
daemon=_threading_daemon, **threading_kw | |
) | |
def _thread_start(target, _run_coroutine=None, **kwargs): | |
if _run_coroutine is None: | |
_run_coroutine = inspect.iscoroutinefunction(target) | |
if _run_coroutine: | |
target = _make_self_running(target) | |
t = threading.Thread(target=target, **kwargs) | |
t.start() | |
return t | |
def _make_self_running(target): | |
return functools.update_wrapper( | |
functools.partial(_async_call, target), | |
target | |
) | |
def _async_call(f, *args, **kwargs): | |
asyncio_kw = { | |
key[9:]: kwargs.pop(key) for key in [ | |
key for key in kwargs.keys() | |
if key.startswith('_asyncio_') | |
] | |
} | |
return asyncio.run( | |
f(*args, **kwargs), | |
**asyncio_kw | |
) | |
def _threading_contextmanager(*args, **kwargs): | |
@contextmanager | |
def wrap(func): | |
if inspect.isgeneratorfunction(func): | |
func = contextmanager(func) | |
elif inspect.isasyncgenfunction(func): | |
func = asynccontextmanager(func) | |
with socketpair() as (sock1, sock2): | |
t = None | |
try: | |
_exc = None | |
_result = None | |
@thread_start_immed(*args, **kwargs) | |
@functools.wraps(func) # set thread name, if possible | |
async def t(*args, **kwargs): | |
loop = asyncio.get_running_loop() | |
sock2.setblocking(False) | |
async with AsyncExitStack() as ctx: | |
try: | |
nonlocal _result | |
_result = await _enter_context(ctx, func(*args, **kwargs)) | |
except Exception as exc: | |
nonlocal _exc | |
_exc = exc | |
sock2.send(b"\x01") | |
else: | |
sock2.send(b"\x00") | |
finally: | |
sys.exit(int.from_bytes(await loop.sock_recv(sock2, 1))) | |
try: | |
with DefaultSelector() as selector: | |
selector.register(sock1, EVENT_READ) | |
for key, events in selector.select(): | |
assert key.fileobj is sock1 | |
assert events & EVENT_READ | |
code = sock1.recv(1)[0] | |
if code: | |
raise _exc | |
else: | |
yield _result | |
break | |
else: | |
assert False | |
except: | |
sock1.send(b"\x01") | |
raise | |
else: | |
sock1.send(b"\x00") | |
finally: | |
if t is not None: | |
#sock1.send(b"*") | |
t.join() | |
return wrap | |
async def _enter_context(stack, obj): | |
if hasattr(obj, '__aenter__'): | |
return await stack.enter_async_context(obj) | |
return stack.enter_context(obj) | |
def main(argv): | |
rootLogger = logging.getLogger() | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--gui', action=argparse.BooleanOptionalAction) | |
parser.add_argument('--quiet', action=argparse.BooleanOptionalAction, default=False) | |
parser.add_argument('--poll-delay', type=float, nargs='?', default=10 if not __debug__ else 1) | |
parser.add_argument('target_pid', type=int, nargs='?') | |
options = parser.parse_args(argv[1:]) | |
gui = options.gui | |
target_pid = options.target_pid | |
if gui is None: | |
if tkinter is not None: | |
# if --gui wasn't specified, but target_pid was, assume batch mode | |
gui = (target_pid is None) | |
else: | |
gui = None | |
poll_delay = options.poll_delay | |
if not options.quiet: | |
rootLogger.setLevel(logging.DEBUG) | |
rootLogger.addFilter(LoggingWarningsAreFatal()) | |
with ExitStack() as ctx: | |
if gui: | |
ctx.enter_context(TkReportBlock()) | |
ctx.enter_context(_with_logging_handler(rootLogger, TkLogHandler)) | |
if target_pid is None: | |
if gui: | |
askinteger = lambda prompt: tkinter.simpledialog.askinteger(None, prompt) | |
else: | |
askinteger = lambda prompt: int(input(f"{prompt}\n> ")) | |
target_pid = askinteger("target pid?") | |
## if target_pid == -1: | |
## target_pid = subprocess.Popen(["timeout" if sys.platform == "win32" else "sleep", str(3)], shell=True).pid | |
surrogate_wakelock(target_pid, _poll_delay=poll_delay) | |
class LoggingWarningsAreFatal(logging.Filter): | |
def filter(self, record): | |
if record.levelno >= logging.WARNING: | |
raise UserWarning(record.msg) | |
return True | |
class TkLogHandler(logging.Handler): | |
def __init__(self, parent=None, *args, **kwargs): | |
self.tk_parent = parent | |
super().__init__(*args, **kwargs) | |
def emit(self, record): | |
if record.levelno >= logging.ERROR: | |
tkinter.messagebox.showerror(message=record.msg, parent=self.tk_parent) | |
elif record.levelno >= logging.WARNING: | |
tkinter.messagebox.showwarning(message=record.msg, parent=self.tk_parent) | |
elif record.levelno >= logging.INFO: | |
tkinter.messagebox.showinfo(message=record.msg, parent=self.tk_parent) | |
@contextmanager | |
def TkReportBlock(parent=None, done_message="done"): | |
try: | |
yield | |
except Exception as exc: | |
error_message = str(exc) or "".join(traceback.format_exception(exc)) | |
try: | |
tkinter.messagebox.showerror(message=error_message, parent=parent) | |
except Exception: | |
tkinter.messagebox.showerror(message=error_message) | |
else: | |
tkinter.messagebox.showinfo(message=done_message, parent=parent) | |
@contextmanager | |
def _with_logging_handler(logger_or_handler, handler=None): | |
if handler is None and (isinstance(logger_or_handler, logging.Handler) or issubclass(logger_or_handler, logging.Handler)): | |
# 1-argument form | |
logger, handler = logger.getLogger(), logger_or_handler | |
else: | |
# 2-argument form | |
logger = logger_or_handler | |
if issubclass(handler, logging.Handler): | |
handler = handler() | |
logger.addHandler(handler) | |
try: | |
yield handler | |
finally: | |
logger.removeHandler(handler) | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment