Skip to content

Instantly share code, notes, and snippets.

@SCP002
Created October 23, 2025 20:36
Show Gist options
  • Save SCP002/626757100d5bd4197f6cf5728a34677d to your computer and use it in GitHub Desktop.
Save SCP002/626757100d5bd4197f6cf5728a34677d to your computer and use it in GitHub Desktop.
Python: Process runner utility for Windows with pseudoterminal support
import process
def main() -> None:
process.start_process(
"python",
["--version"],
raise_on_exitcode=True,
)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(e)
finally:
print("Press <Enter> to exit...")
input()
"""
Process runner utility for Windows with pseudoterminal support
Tested with Python 3.13.9
Package dependencies:
- pywinpty: For pseudoterminal support
"""
import asyncio
import dataclasses
import os
import pathlib
import subprocess
import sys
from collections.abc import Callable, Generator
from typing import Literal
@dataclasses.dataclass
class ProcessResult:
exit_code: int
stdout: str
stderr: str
combined_out: str
def start_process(
command: str | pathlib.Path,
args: list[str | pathlib.Path] = [],
*,
cwd: str | pathlib.Path | None = None,
on_line_callback: Callable | None = None,
print_stdout: bool = True,
print_stderr: bool = True,
capture_stdout: bool = False,
capture_stderr: bool = False,
raise_on_exitcode: bool = False,
pty: bool = False,
wait: bool = True,
new_console: bool = False,
) -> ProcessResult | None:
"""
Synchronously start a subprocess.
Args:
command: The command to execute (e.g., 'python').
args: A list of arguments for the command.
cwd: Set current working directory of the process to the given path.
on_line_callback: Optional callback function to call for each complete line of output.
print_stdout: Whether to print output to stdout
print_stderr: Whether to print output to stderr
capture_stdout: Whether to store stdout in return value
capture_stderr: Whether to store stderr in return value
raise_on_exitcode: If True, raises CalledProcessError for non-zero exit codes.
pty: Whether to run process in pseudo-terminal.
wait: If True, waits for process completion; if False, returns immediately.
new_console: If True, starts process in a new detached console window.
Returns:
ProcessResult: The return code and output of the subprocess.
Or None if `new_console` is True or `wait` is False.
Raises:
subprocess.CalledProcessError: On non-zero exit code if `raise_on_exitcode` is True.
"""
command_str = str(command)
args_str = [str(a) for a in args]
cwd_str = None if cwd is None else str(cwd)
if new_console or not wait:
return _start_subprocess(
command_str, args_str, cwd=cwd_str, raise_on_exitcode=raise_on_exitcode, wait=wait, new_console=new_console
)
if pty:
return _win_pty_start_process(
command_str,
args_str,
cwd=cwd_str,
on_line_callback=on_line_callback,
print_output=print_stdout or print_stderr,
capture_output=capture_stdout or capture_stderr,
raise_on_exitcode=raise_on_exitcode,
)
else:
return asyncio.run(
_async_start_process(
command_str,
args_str,
cwd=cwd_str,
on_line_callback=on_line_callback,
print_stdout=print_stdout,
print_stderr=print_stderr,
capture_stdout=capture_stdout,
capture_stderr=capture_stderr,
raise_on_exitcode=raise_on_exitcode,
new_console=new_console,
)
)
def _win_pty_start_process(
command: str,
args: list[str],
*,
cwd: str | None = None,
on_line_callback: Callable | None,
print_output: bool,
capture_output: bool,
raise_on_exitcode: bool,
) -> ProcessResult:
"""
Start a subprocess in windows-specific pseudoterminal.
Args:
command: The command to execute (e.g., 'python').
args: A list of arguments for the command.
cwd: Set current working directory of the process to the given path.
on_line_callback: Optional callback function to call for each complete line of output.
print_output: Whether to print output to stdout.
capture_output: Whether to store process output in return value.
raise_on_exitcode: If True, raises CalledProcessError for non-zero exit codes.
Returns:
ProcessResult: The return code and output of the subprocess.
Raises:
subprocess.CalledProcessError: On non-zero exit code if `raise_on_exitcode` is True.
"""
import msvcrt
import threading
import time
import winpty
cmdline = [command] + args
term_size = os.get_terminal_size()
process = winpty.PtyProcess.spawn(cmdline, cwd=cwd, dimensions=(term_size.lines, term_size.columns), backend=1)
def input_thread_func():
try:
while process.isalive():
if msvcrt.kbhit():
ch = msvcrt.getwch()
try:
process.write(ch)
except (BrokenPipeError, OSError):
break
else:
time.sleep(0.05)
except Exception:
pass
if sys.stdin.isatty():
input_thread = threading.Thread(target=input_thread_func, daemon=True)
input_thread.start()
line_builder = LineBuilder()
combined_chunks = []
while True:
try:
data = process.read(1024)
except Exception:
if not process.isalive():
break
continue
if data == b"":
break
if capture_output:
combined_chunks.append(data)
if print_output:
sys.stdout.write(data)
sys.stdout.flush()
if on_line_callback:
for line in line_builder.feed(data):
on_line_callback(line)
exit_code = process.wait()
result = ProcessResult(
exit_code=exit_code or 0,
stdout="",
stderr="",
combined_out="".join(combined_chunks),
)
if raise_on_exitcode and result.exit_code != 0:
raise subprocess.CalledProcessError(result.exit_code, cmdline, output=result.combined_out, stderr=None)
return result
async def _async_start_process(
command: str,
args: list[str],
*,
cwd: str | None = None,
on_line_callback: Callable | None,
print_stdout: bool = True,
print_stderr: bool = True,
capture_stdout: bool = False,
capture_stderr: bool = False,
raise_on_exitcode: bool,
new_console: bool = False,
) -> ProcessResult:
"""
Asynchronously start a subprocess.
Args:
command: The command to execute (e.g., 'python').
args: A list of arguments for the command.
cwd: Set current working directory of the process to the given path.
on_line_callback: Optional callback function to call for each complete line of output.
print_stdout: Whether to print output to stdout
print_stderr: Whether to print output to stderr
capture_stdout: Whether to store stdout in return value
capture_stderr: Whether to store stderr in return value
raise_on_exitcode: If True, raises CalledProcessError for non-zero exit codes.
new_console: If True, starts process in a new detached console window.
Returns:
ProcessResult: The return code and output of the subprocess.
Raises:
subprocess.CalledProcessError: On non-zero exit code if `raise_on_exitcode` is True.
"""
creation_flags = 0
if new_console:
creation_flags |= subprocess.CREATE_NEW_CONSOLE
process = await asyncio.create_subprocess_exec(
command,
*args,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
creationflags=creation_flags,
)
output_queue = asyncio.Queue[tuple[Literal["stdout", "stderr"], bytes | None]]()
stdout_chunks = []
stderr_chunks = []
combined_chunks = []
async def queue_output(stream: asyncio.StreamReader | None, stream_type: Literal["stdout", "stderr"]) -> None:
if stream is None:
return
while True:
chunk = await stream.read(1024)
if chunk:
await output_queue.put((stream_type, chunk))
else:
await output_queue.put((stream_type, None))
break
stdout_task = asyncio.create_task(queue_output(process.stdout, "stdout"))
stderr_task = asyncio.create_task(queue_output(process.stderr, "stderr"))
line_builder = LineBuilder()
while not output_queue.empty() or not stdout_task.done() or not stderr_task.done():
item = await output_queue.get()
stream_type, chunk = item
if chunk is not None:
decoded_chunk = chunk.decode("utf-8", errors="replace")
if stream_type == "stdout":
if print_stdout:
sys.stdout.write(decoded_chunk)
sys.stdout.flush()
if capture_stdout:
stdout_chunks.append(decoded_chunk)
combined_chunks.append(decoded_chunk)
elif stream_type == "stderr":
if print_stderr:
sys.stderr.write(decoded_chunk)
sys.stderr.flush()
if capture_stderr:
stderr_chunks.append(decoded_chunk)
combined_chunks.append(decoded_chunk)
if on_line_callback:
for line in line_builder.feed(decoded_chunk):
on_line_callback(line)
output_queue.task_done()
await asyncio.gather(stdout_task, stderr_task)
exit_code = await process.wait()
result = ProcessResult(
exit_code=exit_code,
stdout="".join(stdout_chunks),
stderr="".join(stderr_chunks),
combined_out="".join(combined_chunks),
)
if raise_on_exitcode and result.exit_code != 0:
raise subprocess.CalledProcessError(
result.exit_code, [command] + args, output=result.stdout, stderr=result.stderr
)
return result
def _start_subprocess(
command: str,
args: list[str],
*,
cwd: str | None = None,
raise_on_exitcode: bool = False,
wait: bool = True,
new_console: bool = False,
) -> ProcessResult | None:
"""
Start a subprocess.
Args:
command: The command to execute (e.g., 'python').
args: A list of arguments for the command.
cwd: Set current working directory of the process to the given path.
raise_on_exitcode: If True, raises CalledProcessError for non-zero exit codes.
wait: If True, waits for process completion; if False, returns immediately.
new_console: If True, starts process in a new detached console window.
Returns:
ProcessResult: The return code and output of the subprocess.
Or None if `wait` is False.
Raises:
subprocess.CalledProcessError: On non-zero exit code if `raise_on_exitcode` is True.
"""
creation_flags = 0
if new_console:
creation_flags |= subprocess.CREATE_NEW_CONSOLE
process = subprocess.Popen([command] + args, cwd=cwd, creationflags=creation_flags)
if not wait:
return None
exit_code = process.wait()
if raise_on_exitcode and exit_code != 0:
raise subprocess.CalledProcessError(exit_code, [command] + args, output="", stderr="")
return ProcessResult(exit_code, "", "", "")
class LineBuilder:
def __init__(self):
self.buffer = ""
def feed(self, chunk: str) -> Generator[str, None, None]:
self.buffer += chunk
while "\n" in self.buffer:
index = self.buffer.index("\n")
line = self.buffer[:index]
self.buffer = self.buffer[index + 1 :]
yield line
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment