Skip to content

Instantly share code, notes, and snippets.

@DiTo97
Last active February 21, 2025 12:41
Show Gist options
  • Save DiTo97/4ad61adfaea0c03fbf68bf9d8a0d755c to your computer and use it in GitHub Desktop.
Save DiTo97/4ad61adfaea0c03fbf68bf9d8a0d755c to your computer and use it in GitHub Desktop.
A collection of safe process and system utilities
import logging
import typing
import psutil
T = typing.TypeVar("T")
def safe_execute(function: typing.Callable[..., T], *args, **kwargs, timeout: int = 60) -> T:
"""safely executes a function, even if it's spawning child processes.
The function's child processes are terminated or killed even on function error.
"""
try:
return function(*args, **kwargs)
finally:
children = psutil.Process().children(recursive=True)
for process in children:
process.terminate()
_, children = psutil.wait_procs(children, timeout=timeout)
for process in children:
message = f"The process with PID {process.pid} is being killed"
logging.warning(message)
process.kill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment