Created
February 25, 2014 20:59
-
-
Save soxofaan/9217628 to your computer and use it in GitHub Desktop.
Asynchronous file/pipe reading from http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading (python 3 version)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import subprocess | |
import random | |
import time | |
import threading | |
from queue import Queue | |
import collections | |
class AsynchronousFileReader(threading.Thread): | |
''' | |
Helper class to implement asynchronous reading of a file | |
in a separate thread. Pushes read lines on a queue to | |
be consumed in another thread. | |
''' | |
def __init__(self, fd, queue): | |
assert isinstance(queue, Queue) | |
assert isinstance(fd.readline, collections.Callable) | |
threading.Thread.__init__(self) | |
self._fd = fd | |
self._queue = queue | |
def run(self): | |
'''The body of the tread: read lines and put them on the queue.''' | |
for line in iter(self._fd.readline, b''): | |
self._queue.put(line) | |
def eof(self): | |
'''Check whether there is no more content to expect.''' | |
return not self.is_alive() and self._queue.empty() | |
def consume(command): | |
''' | |
Example of how to consume standard output and standard error of | |
a subprocess asynchronously without risk on deadlocking. | |
''' | |
# Launch the command as subprocess. | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# Launch the asynchronous readers of the process' stdout and stderr. | |
stdout_queue = Queue() | |
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue) | |
stdout_reader.start() | |
stderr_queue = Queue() | |
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue) | |
stderr_reader.start() | |
# Check the queues if we received some output (until there is nothing more to get). | |
while not stdout_reader.eof() or not stderr_reader.eof(): | |
# Show what we received from standard output. | |
while not stdout_queue.empty(): | |
line = stdout_queue.get() | |
print('Received line on standard output: ' + repr(line)) | |
# Show what we received from standard error. | |
while not stderr_queue.empty(): | |
line = stderr_queue.get() | |
print('Received line on standard error: ' + repr(line)) | |
# Sleep a bit before asking the readers again. | |
time.sleep(.1) | |
# Let's be tidy and join the threads we've started. | |
stdout_reader.join() | |
stderr_reader.join() | |
# Close subprocess' file descriptors. | |
process.stdout.close() | |
process.stderr.close() | |
def produce(items=10): | |
''' | |
Dummy function to randomly render a couple of lines | |
on standard output and standard error. | |
''' | |
for i in range(items): | |
output = random.choice([sys.stdout, sys.stderr]) | |
output.write('Line %d on %s\n' % (i, output)) | |
output.flush() | |
time.sleep(random.uniform(.1, 1)) | |
if __name__ == '__main__': | |
# The main flow: | |
# if there is an command line argument 'produce', act as a producer | |
# otherwise be a consumer (which launches a producer as subprocess). | |
if len(sys.argv) == 2 and sys.argv[1] == 'produce': | |
produce(10) | |
else: | |
consume([sys.executable, sys.argv[0], 'produce']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
Could you help me?
I am looking for something similar but for stdin and stdout. I need to keep running stdin as long as 'quit' is not received. and i need to keep running stdout as long as stdin pipe is still not closed.
Would be a great help if you could guide me