Skip to content

Instantly share code, notes, and snippets.

@EyalAr
Created December 11, 2013 18:17
Show Gist options
  • Save EyalAr/7915597 to your computer and use it in GitHub Desktop.
Save EyalAr/7915597 to your computer and use it in GitHub Desktop.
Demo code for my post about python's blocking stream reading functions.
from subprocess import Popen, PIPE
from time import sleep
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
output = p.stdout.read() # <-- Hangs here!
if not output:
print '[No more data]'
break
print output
from subprocess import Popen, PIPE
from time import sleep
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK, read
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# set the O_NONBLOCK flag of p.stdout file descriptor:
flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags
fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
try:
print read(p.stdout.fileno(), 1024),
except OSError:
# the os throws an exception if there is no data
print '[No more data]'
break
from subprocess import Popen, PIPE
from time import sleep
from nbstreamreader import NonBlockingStreamReader as NBSR
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# wrap p.stdout with a NonBlockingStreamReader object:
nbsr = NBSR(p.stdout)
# issue command:
p.stdin.write('command\n')
# get the output
while True:
output = nbsr.readline(0.1) # 0.1 secs to let the shell output the result
if not output:
print '[No more data]'
break
print output
from threading import Thread
from Queue import Queue, Empty
class NonBlockingStreamReader:
def __init__(self, stream):
'''
stream: the stream to read from.
Usually a process' stdout or stderr.
'''
self._s = stream
self._q = Queue()
def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
while True:
line = stream.readline()
if line:
queue.put(line)
else:
raise UnexpectedEndOfStream
self._t = Thread(target = _populateQueue,
args = (self._s, self._q))
self._t.daemon = True
self._t.start() #start collecting lines from the stream
def readline(self, timeout = None):
try:
return self._q.get(block = timeout is not None,
timeout = timeout)
except Empty:
return None
class UnexpectedEndOfStream(Exception): pass
import sys
while True:
s = raw_input("Enter command: ")
print "You entered: {}".format(s)
sys.stdout.flush()
@gogowitsch
Copy link

Stating the obvious: This will not work on Microsoft Windows ("No module named 'fcntl'").

@JkShah1992
Copy link

JkShah1992 commented Mar 26, 2018

Hi, So I was trying your code. And it ended without even waiting to get some input from a user. There seems to be wrong about your code.
Please can you take a look?
I ran client_thread.py. I have shell.py and nbstreamreader.py.

Output i received:

Enter command: You entered: command

[No more data]

Process finished with exit code 0

@sinamr88
Copy link

Same problem here, how can we use this code?

@luipir
Copy link

luipir commented Sep 28, 2018

Hi @EyalAr I used your code with reference in GPL2 code here:
https://gitlab.com/cartolab/geomove_pipelines/blob/master/processing_scripts_and_models/pdal_pipeline_executor.py#L340
Thanks for shareing your solution.

@zroach
Copy link

zroach commented Jun 7, 2019

Thanks for the code. It seems there is a small File Descriptor leak that is revealed when using multiprocessing. See below for example:

import subprocess
from multiprocessing import Pool
from nbstreamreader import NonBlockingStreamReader as NBSR

def test(x):
    p = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    nbsr_out = NBSR(p.stdout)
    nbsr_err = NBSR(p.stderr)

    # with these lines commented out, FDs will leak
    #p.stdout.close()
    #p.stderr.close()

def error_callback(e):
    print("err:", e)

with Pool(processes=4) as p:
    i = p.map_async(test, [x for x in range(int(1e6))], error_callback=error_callback)
    while not i.ready():
        pass
print('success')

@chumbichubago
Copy link

In Windows, nbstreamreader.py still blocks at the readline() if the stream stays open. The fcntl option also is not available. Seems like I'm sh * t out of luck.

@br1ansho3
Copy link

@chumbichubago has you came across a solution?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment