Skip to content

Instantly share code, notes, and snippets.

@gsw945
Created November 7, 2020 14:22
Show Gist options
  • Save gsw945/c6bc7f9ae819ddee0f49a2a1498cacdf to your computer and use it in GitHub Desktop.
Save gsw945/c6bc7f9ae819ddee0f49a2a1498cacdf to your computer and use it in GitHub Desktop.
python subprocess realtime output
# from: https://www.stefaanlippens.net/python-asynchronous-subprocess-pipe-reading/
import sys
from subprocess import Popen, PIPE
import random
import time
from threading import Thread
from queue import Queue
class ClassName(object):
"""docstring for ClassName"""
def __init__(self, arg):
super(ClassName, self).__init__()
self.arg = arg
class AsynchronousFileReader(Thread):
'''
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
'''
def __init__(self, fd, queue):
assert isinstance(queue, Queue)
assert callable(fd.readline)
super(AsynchronousFileReader, self).__init__()
self._fd = fd
self._queue = queue
def run(self):
'''The body of the tread: read lines and put them on the queue.'''
for line in iter(self._fd.readline, ''):
self._queue.put(line)
def eof(self):
'''Check whether there is no more content to expect.'''
return not self.is_alive() and self._queue.empty()
def consume(command):
'''
Example of how to consume standard output and standard error of
a subprocess asynchronously without risk on deadlocking.
'''
# Launch the command as subprocess.
process = Popen(command, stdout=PIPE, stderr=PIPE)
# Launch the asynchronous readers of the process' stdout and stderr.
stdout_queue = Queue()
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
stdout_reader.start()
stderr_queue = Queue()
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue)
stderr_reader.start()
# Check the queues if we received some output (until there is nothing more to get).
while not stdout_reader.eof() or not stderr_reader.eof():
# Show what we received from standard output.
while not stdout_queue.empty():
line = stdout_queue.get()
print('Received line on standard output: ' + repr(line))
# Show what we received from standard error.
while not stderr_queue.empty():
line = stderr_queue.get()
print('Received line on standard error: ' + repr(line))
# Sleep a bit before asking the readers again.
time.sleep(0.005)
# Let's be tidy and join the threads we've started.
stdout_reader.join()
stderr_reader.join()
# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()
def produce(items=10):
'''
Dummy function to randomly render a couple of lines
on standard output and standard error.
'''
for i in range(items):
output = random.choice([sys.stdout, sys.stderr])
output.write('Line %d on %s\n' % (i, output))
output.flush()
time.sleep(random.uniform(.1, 1))
if __name__ == '__main__':
# The main flow:
# if there is an command line argument 'produce', act as a producer
# otherwise be a consumer (which launches a producer as subprocess).
if len(sys.argv) == 2 and sys.argv[1] == 'produce':
produce(10)
else:
consume(['python', sys.argv[0], 'produce'])
import os
import sys
from subprocess import Popen, PIPE
import random
import time
from threading import Thread
from queue import Queue
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
class ClassName(object):
"""docstring for ClassName"""
def __init__(self, arg):
super(ClassName, self).__init__()
self.arg = arg
class AsynchronousFileReader(Thread):
'''
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
'''
def __init__(self, fd, queue, empty_char):
assert isinstance(queue, Queue)
assert callable(fd.readline)
super(AsynchronousFileReader, self).__init__()
self.daemon = True
self._fd = fd
self._queue = queue
self._empty_char = empty_char
def run(self):
'''The body of the tread: read lines and put them on the queue.'''
for line in iter(self._fd.readline, self._empty_char):
if line != self._empty_char:
self._queue.put(line)
def eof(self):
'''Check whether there is no more content to expect.'''
return not self.is_alive() and self._queue.empty()
def consume(command):
'''
Example of how to consume standard output and standard error of
a subprocess asynchronously without risk on deadlocking.
'''
# Launch the command as subprocess.
is_text = True
process = Popen(command, stdout=PIPE, stderr=PIPE, universal_newlines=is_text)
empty_char = '' if is_text else b''
# Launch the asynchronous readers of the process' stdout and stderr.
stdout_queue = Queue()
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue, empty_char)
stdout_reader.start()
def show_stdout():
# Show what we received from standard output.
while not stdout_queue.empty():
line = stdout_queue.get()
print('Received line on standard output: ' + repr(line), flush=True)
stderr_queue = Queue()
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue, empty_char)
stderr_reader.start()
def show_stderr():
# Show what we received from standard error.
while not stderr_queue.empty():
line = stderr_queue.get()
print('Received line on standard error: ' + repr(line), flush=True)
pool = ThreadPool(2)
exec_func = lambda func: func()
return_code = None
# Check the queues if we received some output (until there is nothing more to get).
while not stdout_reader.eof() or not stderr_reader.eof():
return_code = process.poll()
if return_code is not None:
break
# show_stdout()
# show_stderr()
pool.map(exec_func, [show_stdout, show_stderr])
# Sleep a bit before asking the readers again.
time.sleep(0.005)
# Let's be tidy and join the threads we've started.
stdout_reader.join()
stderr_reader.join()
# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()
return return_code
def get_crawler_py():
scheduler_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
crawler_dir = os.path.join(os.path.dirname(scheduler_dir), 'crawler')
crawler_py = os.path.join(crawler_dir, 'run_all.py')
return crawler_py
demo_code = '''\
import sys
def wait(n=5):
for j in range(0, n * 100):
for i in range(0, 9999):
a = i * 0.33 * i / 0.23
b = a - 2
print('等待')
wait(1)
ee = None
try:
raise RuntimeError("demo exception")
except Exception as e:
# print(e)
ee = e
pass
print('等待')
wait(29)
print('hello')
wait(3)
raise ee
'''
if __name__ == '__main__':
command = [sys.executable, "-c", demo_code]
# command = [sys.executable, get_crawler_py()]
consume(command)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment