Skip to content

Instantly share code, notes, and snippets.

@taikedz
Created September 2, 2024 10:13
Show Gist options
  • Save taikedz/1c814b7783daead02c76ff7b14b4ddda to your computer and use it in GitHub Desktop.
Save taikedz/1c814b7783daead02c76ff7b14b4ddda to your computer and use it in GitHub Desktop.
TeePipe
""" Utility to behave like "tee"
Similar to the system "tee" program, attempts to write the same output to two different channels
In this present case, attempts to write to stdout/stderr , as well as retain for retrieval as string.
To do this, we wrap the pipe, direct the system pipe to write data to an interim temp file, and we read
that file content back into memory and write it through to the wrapped pipe ourselves.
"""
from io import TextIOWrapper
import tempfile
import threading
import time
class TeePipe(threading.Thread):
def __init__(self, added_stream):
threading.Thread.__init__(self, daemon=True)
# We _need_ to use a tempfile here, because Popen() passes system file handlers/numbers directly to C
# so never calls any `write()` methods on objects for us to intercept.
self._tmp_name = tempfile.mktemp()
self._w_file = None
self._r_file = None
self._running = False
self._data = []
self._lock = threading.Lock()
self._ext_stream:TextIOWrapper = added_stream
def run(self):
self._running = True
while self._running:
with self._lock:
# Read from the temp file - it's where the system process is writing to, progressively
data = self._r_file.read()
# Write the data out to the wrapped stream - real stdout or stderr typically
self._ext_stream.write(data)
self._ext_stream.flush()
# Also retain the data in-memory
self._data.append(data)
time.sleep(0.01)
def __str__(self):
with self._lock:
return ''.join(self._data)
def open(self):
with self._lock:
if self._w_file is None:
self._w_file = open(self._tmp_name, 'w')
if self._r_file is None:
self._r_file = open(self._tmp_name, 'r')
self.start()
def close(self):
self._running = False
with self._lock:
self._w_file.close()
self._w_file = None
self._r_file.close()
self._r_file = None
def __enter__(self):
self.open()
return self
def __exit__(self, *a):
# Typically in the main thread
# We need to allow the buffer temp file to catch up properly
time.sleep(0.015)
self.close()
def fileno(self):
# This is the system file ID number/descriptor which is passed directly into C implementation
# system takes care of writing to the descriptor directly, without calling any python-level stuff
return self._w_file.fileno()
if __name__ == "__main__":
# Demo
import sys
from subprocess import Popen
with TeePipe(sys.stdout) as tsout, TeePipe(sys.stderr) as tserr:
proc = Popen(["bash", "-c", "for x in 1 2 3; do echo out $x; echo err $x >&2; sleep 0.5; done"], stdout=tsout, stderr=tserr)
sout, serr = proc.communicate()
print("==== STDOUT")
print(repr(str(tsout)))
print("==== STDERR")
print(repr(str(tserr)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment