Created
December 11, 2013 18:17
-
-
Save EyalAr/7915597 to your computer and use it in GitHub Desktop.
Demo code for my post about python's blocking stream reading functions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from subprocess import Popen, PIPE | |
from time import sleep | |
# run the shell as a subprocess: | |
p = Popen(['python', 'shell.py'], | |
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False) | |
# issue command: | |
p.stdin.write('command\n') | |
# let the shell output the result: | |
sleep(0.1) | |
# get the output | |
while True: | |
output = p.stdout.read() # <-- Hangs here! | |
if not output: | |
print '[No more data]' | |
break | |
print output |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from subprocess import Popen, PIPE | |
from time import sleep | |
from fcntl import fcntl, F_GETFL, F_SETFL | |
from os import O_NONBLOCK, read | |
# run the shell as a subprocess: | |
p = Popen(['python', 'shell.py'], | |
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False) | |
# set the O_NONBLOCK flag of p.stdout file descriptor: | |
flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags | |
fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK) | |
# issue command: | |
p.stdin.write('command\n') | |
# let the shell output the result: | |
sleep(0.1) | |
# get the output | |
while True: | |
try: | |
print read(p.stdout.fileno(), 1024), | |
except OSError: | |
# the os throws an exception if there is no data | |
print '[No more data]' | |
break |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from subprocess import Popen, PIPE | |
from time import sleep | |
from nbstreamreader import NonBlockingStreamReader as NBSR | |
# run the shell as a subprocess: | |
p = Popen(['python', 'shell.py'], | |
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False) | |
# wrap p.stdout with a NonBlockingStreamReader object: | |
nbsr = NBSR(p.stdout) | |
# issue command: | |
p.stdin.write('command\n') | |
# get the output | |
while True: | |
output = nbsr.readline(0.1) # 0.1 secs to let the shell output the result | |
if not output: | |
print '[No more data]' | |
break | |
print output |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
from Queue import Queue, Empty | |
class NonBlockingStreamReader: | |
def __init__(self, stream): | |
''' | |
stream: the stream to read from. | |
Usually a process' stdout or stderr. | |
''' | |
self._s = stream | |
self._q = Queue() | |
def _populateQueue(stream, queue): | |
''' | |
Collect lines from 'stream' and put them in 'quque'. | |
''' | |
while True: | |
line = stream.readline() | |
if line: | |
queue.put(line) | |
else: | |
raise UnexpectedEndOfStream | |
self._t = Thread(target = _populateQueue, | |
args = (self._s, self._q)) | |
self._t.daemon = True | |
self._t.start() #start collecting lines from the stream | |
def readline(self, timeout = None): | |
try: | |
return self._q.get(block = timeout is not None, | |
timeout = timeout) | |
except Empty: | |
return None | |
class UnexpectedEndOfStream(Exception): pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
while True: | |
s = raw_input("Enter command: ") | |
print "You entered: {}".format(s) | |
sys.stdout.flush() |
Thanks for the code. It seems there is a small File Descriptor leak that is revealed when using multiprocessing. See below for example:
import subprocess
from multiprocessing import Pool
from nbstreamreader import NonBlockingStreamReader as NBSR
def test(x):
p = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
nbsr_out = NBSR(p.stdout)
nbsr_err = NBSR(p.stderr)
# with these lines commented out, FDs will leak
#p.stdout.close()
#p.stderr.close()
def error_callback(e):
print("err:", e)
with Pool(processes=4) as p:
i = p.map_async(test, [x for x in range(int(1e6))], error_callback=error_callback)
while not i.ready():
pass
print('success')
In Windows, nbstreamreader.py still blocks at the readline() if the stream stays open. The fcntl option also is not available. Seems like I'm sh * t out of luck.
@chumbichubago has you came across a solution?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @EyalAr I used your code with reference in GPL2 code here:
https://gitlab.com/cartolab/geomove_pipelines/blob/master/processing_scripts_and_models/pdal_pipeline_executor.py#L340
Thanks for shareing your solution.