Created
September 2, 2024 10:13
-
-
Save taikedz/1c814b7783daead02c76ff7b14b4ddda to your computer and use it in GitHub Desktop.
TeePipe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Utility to behave like "tee" | |
Similar to the system "tee" program, attempts to write the same output to two different channels | |
In this present case, attempts to write to stdout/stderr , as well as retain for retrieval as string. | |
To do this, we wrap the pipe, direct the system pipe to write data to an interim temp file, and we read | |
that file content back into memory and write it through to the wrapped pipe ourselves. | |
""" | |
from io import TextIOWrapper | |
import tempfile | |
import threading | |
import time | |
class TeePipe(threading.Thread): | |
def __init__(self, added_stream): | |
threading.Thread.__init__(self, daemon=True) | |
# We _need_ to use a tempfile here, because Popen() passes system file handlers/numbers directly to C | |
# so never calls any `write()` methods on objects for us to intercept. | |
self._tmp_name = tempfile.mktemp() | |
self._w_file = None | |
self._r_file = None | |
self._running = False | |
self._data = [] | |
self._lock = threading.Lock() | |
self._ext_stream:TextIOWrapper = added_stream | |
def run(self): | |
self._running = True | |
while self._running: | |
with self._lock: | |
# Read from the temp file - it's where the system process is writing to, progressively | |
data = self._r_file.read() | |
# Write the data out to the wrapped stream - real stdout or stderr typically | |
self._ext_stream.write(data) | |
self._ext_stream.flush() | |
# Also retain the data in-memory | |
self._data.append(data) | |
time.sleep(0.01) | |
def __str__(self): | |
with self._lock: | |
return ''.join(self._data) | |
def open(self): | |
with self._lock: | |
if self._w_file is None: | |
self._w_file = open(self._tmp_name, 'w') | |
if self._r_file is None: | |
self._r_file = open(self._tmp_name, 'r') | |
self.start() | |
def close(self): | |
self._running = False | |
with self._lock: | |
self._w_file.close() | |
self._w_file = None | |
self._r_file.close() | |
self._r_file = None | |
def __enter__(self): | |
self.open() | |
return self | |
def __exit__(self, *a): | |
# Typically in the main thread | |
# We need to allow the buffer temp file to catch up properly | |
time.sleep(0.015) | |
self.close() | |
def fileno(self): | |
# This is the system file ID number/descriptor which is passed directly into C implementation | |
# system takes care of writing to the descriptor directly, without calling any python-level stuff | |
return self._w_file.fileno() | |
if __name__ == "__main__": | |
# Demo | |
import sys | |
from subprocess import Popen | |
with TeePipe(sys.stdout) as tsout, TeePipe(sys.stderr) as tserr: | |
proc = Popen(["bash", "-c", "for x in 1 2 3; do echo out $x; echo err $x >&2; sleep 0.5; done"], stdout=tsout, stderr=tserr) | |
sout, serr = proc.communicate() | |
print("==== STDOUT") | |
print(repr(str(tsout))) | |
print("==== STDERR") | |
print(repr(str(tserr))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment