Skip to content

Instantly share code, notes, and snippets.

@fphammerle
Last active September 4, 2018 03:49
Show Gist options
  • Save fphammerle/c90562d012454796f91f84becb6a2715 to your computer and use it in GitHub Desktop.
Save fphammerle/c90562d012454796f91f84becb6a2715 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
> sstream = io.StringIO()
> subprocess.run(['hostname'], stdout=sstream, check=True)
io.UnsupportedOperation: fileno
"""
import os
import select
import threading
def rselect(fd, timeout_seconds=None):
""" Wait until file descriptor is ready for reading.
Return True if ready. Return False if timeout was reached.
select.select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
"""
rlist, wlist, xlist = select.select([fd], [], [], timeout_seconds)
return fd in rlist
class SubprocessTee:
# compare with libc's setvbuf / BUFSIZ
BUFFER_SIZE_BYTES = 8196
READ_TIMEOUT_SECONDS = 0.01
def __init__(self, sinks):
self._sinks = sinks
def __enter__(self):
self._read_fd, self._write_fd = os.pipe()
self._thread = threading.Thread(target=self._loop)
self._thread.start()
return self
def _write(self, data):
for sink in self._sinks:
if isinstance(sink, io.TextIOWrapper):
sink.buffer.write(data)
else:
sink.write(data)
def _loop(self):
while True:
try:
if rselect(self._read_fd, self.READ_TIMEOUT_SECONDS):
self._write(os.read(self._read_fd, self.BUFFER_SIZE_BYTES))
except OSError: # fd closed
return
def fileno(self):
return self._write_fd
def __exit__(self, exc_type, exc_value, traceback):
os.close(self._read_fd)
os.close(self._write_fd)
# wait for writes to stop before sinks are being closed
self._thread.join()
# usage / example
import io
import subprocess
import sys
with open('hostname', 'bw') as file_out:
bstream = io.BytesIO()
with SubprocessTee([sys.stdout, sys.stderr, file_out, bstream]) as tee:
subprocess.run(['hostname'], stdout=tee, check=True)
print('hostname bstream: {!r}'.format(bstream.getvalue()))
with open('hostname', 'r') as file_in:
print('hostname file: {!r}'.format(file_in.read()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment