-
-
Save EyalAr/7915597 to your computer and use it in GitHub Desktop.
from subprocess import Popen, PIPE | |
from time import sleep | |
# run the shell as a subprocess: | |
p = Popen(['python', 'shell.py'], | |
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False) | |
# issue command: | |
p.stdin.write('command\n') | |
# let the shell output the result: | |
sleep(0.1) | |
# get the output | |
while True: | |
output = p.stdout.read() # <-- Hangs here! | |
if not output: | |
print '[No more data]' | |
break | |
print output |
from subprocess import Popen, PIPE | |
from time import sleep | |
from fcntl import fcntl, F_GETFL, F_SETFL | |
from os import O_NONBLOCK, read | |
# run the shell as a subprocess: | |
p = Popen(['python', 'shell.py'], | |
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False) | |
# set the O_NONBLOCK flag of p.stdout file descriptor: | |
flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags | |
fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK) | |
# issue command: | |
p.stdin.write('command\n') | |
# let the shell output the result: | |
sleep(0.1) | |
# get the output | |
while True: | |
try: | |
print read(p.stdout.fileno(), 1024), | |
except OSError: | |
# the os throws an exception if there is no data | |
print '[No more data]' | |
break |
from subprocess import Popen, PIPE | |
from time import sleep | |
from nbstreamreader import NonBlockingStreamReader as NBSR | |
# run the shell as a subprocess: | |
p = Popen(['python', 'shell.py'], | |
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False) | |
# wrap p.stdout with a NonBlockingStreamReader object: | |
nbsr = NBSR(p.stdout) | |
# issue command: | |
p.stdin.write('command\n') | |
# get the output | |
while True: | |
output = nbsr.readline(0.1) # 0.1 secs to let the shell output the result | |
if not output: | |
print '[No more data]' | |
break | |
print output |
from threading import Thread | |
from Queue import Queue, Empty | |
class NonBlockingStreamReader: | |
def __init__(self, stream): | |
''' | |
stream: the stream to read from. | |
Usually a process' stdout or stderr. | |
''' | |
self._s = stream | |
self._q = Queue() | |
def _populateQueue(stream, queue): | |
''' | |
Collect lines from 'stream' and put them in 'quque'. | |
''' | |
while True: | |
line = stream.readline() | |
if line: | |
queue.put(line) | |
else: | |
raise UnexpectedEndOfStream | |
self._t = Thread(target = _populateQueue, | |
args = (self._s, self._q)) | |
self._t.daemon = True | |
self._t.start() #start collecting lines from the stream | |
def readline(self, timeout = None): | |
try: | |
return self._q.get(block = timeout is not None, | |
timeout = timeout) | |
except Empty: | |
return None | |
class UnexpectedEndOfStream(Exception): pass |
import sys | |
while True: | |
s = raw_input("Enter command: ") | |
print "You entered: {}".format(s) | |
sys.stdout.flush() |
Stating the obvious: This will not work on Microsoft Windows ("No module named 'fcntl'").
Hi, So I was trying your code. And it ended without even waiting to get some input from a user. There seems to be wrong about your code.
Please can you take a look?
I ran client_thread.py. I have shell.py and nbstreamreader.py.
Output i received:
Enter command: You entered: command
[No more data]
Process finished with exit code 0
Same problem here, how can we use this code?
Hi @EyalAr I used your code with reference in GPL2 code here:
https://gitlab.com/cartolab/geomove_pipelines/blob/master/processing_scripts_and_models/pdal_pipeline_executor.py#L340
Thanks for shareing your solution.
Thanks for the code. It seems there is a small File Descriptor leak that is revealed when using multiprocessing. See below for example:
import subprocess
from multiprocessing import Pool
from nbstreamreader import NonBlockingStreamReader as NBSR
def test(x):
p = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
nbsr_out = NBSR(p.stdout)
nbsr_err = NBSR(p.stderr)
# with these lines commented out, FDs will leak
#p.stdout.close()
#p.stderr.close()
def error_callback(e):
print("err:", e)
with Pool(processes=4) as p:
i = p.map_async(test, [x for x in range(int(1e6))], error_callback=error_callback)
while not i.ready():
pass
print('success')
In Windows, nbstreamreader.py still blocks at the readline() if the stream stays open. The fcntl option also is not available. Seems like I'm sh * t out of luck.
@chumbichubago has you came across a solution?
Thank you so much