Last active
July 14, 2024 21:09
-
-
Save ddelange/6517e3267fb74eeee804e3b1490b1c1d to your computer and use it in GitHub Desktop.
Handling the live output stream of a command
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from collections import deque | |
from concurrent.futures import ThreadPoolExecutor | |
from functools import partial | |
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen | |
def stream_command( | |
args, | |
*, | |
stdout_handler=logging.info, | |
stderr_handler=logging.error, | |
check=True, | |
text=True, | |
stdout=PIPE, | |
stderr=PIPE, | |
**kwargs, | |
): | |
"""Mimic subprocess.run, while processing the command output in real time.""" | |
with ( | |
Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process, | |
ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately | |
): | |
exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed | |
exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread | |
exhaust_async(stdout_handler(line[:-1]) for line in process.stdout) | |
exhaust_async(stderr_handler(line[:-1]) for line in process.stderr) | |
retcode = process.poll() # block until both iterables are exhausted (process finished) | |
if check and retcode: | |
raise CalledProcessError(retcode, process.args) | |
return CompletedProcess(process.args, retcode), retcode) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from subprocess import PIPE, STDOUT, CalledProcessError, CompletedProcess, Popen | |
def stream_command( | |
args, | |
*, | |
stdout_handler=logging.info, | |
check=True, | |
text=True, | |
**kwargs, | |
): | |
"""Mimic subprocess.run, while processing the command output in real time.""" | |
with Popen(args, text=text, stdout=PIPE, stderr=STDOUT, **kwargs) as process: | |
for line in process.stdout: | |
stdout_handler(line[:-1]) | |
retcode = process.poll() | |
if check and retcode: | |
raise CalledProcessError(retcode, process.args) | |
return CompletedProcess(process.args, retcode) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stream_command(["echo", "test"]) | |
# INFO:root:test | |
stream_command("cat ./nonexist", shell=True, check=False) | |
# ERROR:root:cat: ./nonexist: No such file or directory | |
stream_command(["echo", "test"], stdout_handler=print) | |
# test | |
stdout_lines = [] | |
def handler(line): | |
print(line) | |
logging.info(line) | |
stdout_lines.append(line) | |
stream_command(["echo", "test"], stdout_handler=handler) | |
# test | |
# INFO:root:test | |
print(stdout_lines) | |
# ['test'] | |
# stream to log file | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format="%(asctime)s:%(levelname)-7s %(filename)20s:%(lineno)-4d %(name)s:%(message)s", | |
filename="./capture.log", | |
filemode="w", | |
encoding="utf-8", | |
) | |
logging.info("test from python") | |
stream_command(["echo", "test from subprocess"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
reference answers: