Created
November 7, 2020 14:22
-
-
Save gsw945/c6bc7f9ae819ddee0f49a2a1498cacdf to your computer and use it in GitHub Desktop.
python subprocess realtime output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from: https://www.stefaanlippens.net/python-asynchronous-subprocess-pipe-reading/ | |
import sys | |
from subprocess import Popen, PIPE | |
import random | |
import time | |
from threading import Thread | |
from queue import Queue | |
class ClassName(object): | |
"""docstring for ClassName""" | |
def __init__(self, arg): | |
super(ClassName, self).__init__() | |
self.arg = arg | |
class AsynchronousFileReader(Thread): | |
''' | |
Helper class to implement asynchronous reading of a file | |
in a separate thread. Pushes read lines on a queue to | |
be consumed in another thread. | |
''' | |
def __init__(self, fd, queue): | |
assert isinstance(queue, Queue) | |
assert callable(fd.readline) | |
super(AsynchronousFileReader, self).__init__() | |
self._fd = fd | |
self._queue = queue | |
def run(self): | |
'''The body of the tread: read lines and put them on the queue.''' | |
for line in iter(self._fd.readline, ''): | |
self._queue.put(line) | |
def eof(self): | |
'''Check whether there is no more content to expect.''' | |
return not self.is_alive() and self._queue.empty() | |
def consume(command): | |
''' | |
Example of how to consume standard output and standard error of | |
a subprocess asynchronously without risk on deadlocking. | |
''' | |
# Launch the command as subprocess. | |
process = Popen(command, stdout=PIPE, stderr=PIPE) | |
# Launch the asynchronous readers of the process' stdout and stderr. | |
stdout_queue = Queue() | |
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue) | |
stdout_reader.start() | |
stderr_queue = Queue() | |
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue) | |
stderr_reader.start() | |
# Check the queues if we received some output (until there is nothing more to get). | |
while not stdout_reader.eof() or not stderr_reader.eof(): | |
# Show what we received from standard output. | |
while not stdout_queue.empty(): | |
line = stdout_queue.get() | |
print('Received line on standard output: ' + repr(line)) | |
# Show what we received from standard error. | |
while not stderr_queue.empty(): | |
line = stderr_queue.get() | |
print('Received line on standard error: ' + repr(line)) | |
# Sleep a bit before asking the readers again. | |
time.sleep(0.005) | |
# Let's be tidy and join the threads we've started. | |
stdout_reader.join() | |
stderr_reader.join() | |
# Close subprocess' file descriptors. | |
process.stdout.close() | |
process.stderr.close() | |
def produce(items=10): | |
''' | |
Dummy function to randomly render a couple of lines | |
on standard output and standard error. | |
''' | |
for i in range(items): | |
output = random.choice([sys.stdout, sys.stderr]) | |
output.write('Line %d on %s\n' % (i, output)) | |
output.flush() | |
time.sleep(random.uniform(.1, 1)) | |
if __name__ == '__main__': | |
# The main flow: | |
# if there is an command line argument 'produce', act as a producer | |
# otherwise be a consumer (which launches a producer as subprocess). | |
if len(sys.argv) == 2 and sys.argv[1] == 'produce': | |
produce(10) | |
else: | |
consume(['python', sys.argv[0], 'produce']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
from subprocess import Popen, PIPE | |
import random | |
import time | |
from threading import Thread | |
from queue import Queue | |
from multiprocessing import Pool as ProcessPool | |
from multiprocessing.dummy import Pool as ThreadPool | |
class ClassName(object): | |
"""docstring for ClassName""" | |
def __init__(self, arg): | |
super(ClassName, self).__init__() | |
self.arg = arg | |
class AsynchronousFileReader(Thread): | |
''' | |
Helper class to implement asynchronous reading of a file | |
in a separate thread. Pushes read lines on a queue to | |
be consumed in another thread. | |
''' | |
def __init__(self, fd, queue, empty_char): | |
assert isinstance(queue, Queue) | |
assert callable(fd.readline) | |
super(AsynchronousFileReader, self).__init__() | |
self.daemon = True | |
self._fd = fd | |
self._queue = queue | |
self._empty_char = empty_char | |
def run(self): | |
'''The body of the tread: read lines and put them on the queue.''' | |
for line in iter(self._fd.readline, self._empty_char): | |
if line != self._empty_char: | |
self._queue.put(line) | |
def eof(self): | |
'''Check whether there is no more content to expect.''' | |
return not self.is_alive() and self._queue.empty() | |
def consume(command): | |
''' | |
Example of how to consume standard output and standard error of | |
a subprocess asynchronously without risk on deadlocking. | |
''' | |
# Launch the command as subprocess. | |
is_text = True | |
process = Popen(command, stdout=PIPE, stderr=PIPE, universal_newlines=is_text) | |
empty_char = '' if is_text else b'' | |
# Launch the asynchronous readers of the process' stdout and stderr. | |
stdout_queue = Queue() | |
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue, empty_char) | |
stdout_reader.start() | |
def show_stdout(): | |
# Show what we received from standard output. | |
while not stdout_queue.empty(): | |
line = stdout_queue.get() | |
print('Received line on standard output: ' + repr(line), flush=True) | |
stderr_queue = Queue() | |
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue, empty_char) | |
stderr_reader.start() | |
def show_stderr(): | |
# Show what we received from standard error. | |
while not stderr_queue.empty(): | |
line = stderr_queue.get() | |
print('Received line on standard error: ' + repr(line), flush=True) | |
pool = ThreadPool(2) | |
exec_func = lambda func: func() | |
return_code = None | |
# Check the queues if we received some output (until there is nothing more to get). | |
while not stdout_reader.eof() or not stderr_reader.eof(): | |
return_code = process.poll() | |
if return_code is not None: | |
break | |
# show_stdout() | |
# show_stderr() | |
pool.map(exec_func, [show_stdout, show_stderr]) | |
# Sleep a bit before asking the readers again. | |
time.sleep(0.005) | |
# Let's be tidy and join the threads we've started. | |
stdout_reader.join() | |
stderr_reader.join() | |
# Close subprocess' file descriptors. | |
process.stdout.close() | |
process.stderr.close() | |
return return_code | |
def get_crawler_py(): | |
scheduler_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
crawler_dir = os.path.join(os.path.dirname(scheduler_dir), 'crawler') | |
crawler_py = os.path.join(crawler_dir, 'run_all.py') | |
return crawler_py | |
demo_code = '''\ | |
import sys | |
def wait(n=5): | |
for j in range(0, n * 100): | |
for i in range(0, 9999): | |
a = i * 0.33 * i / 0.23 | |
b = a - 2 | |
print('等待') | |
wait(1) | |
ee = None | |
try: | |
raise RuntimeError("demo exception") | |
except Exception as e: | |
# print(e) | |
ee = e | |
pass | |
print('等待') | |
wait(29) | |
print('hello') | |
wait(3) | |
raise ee | |
''' | |
if __name__ == '__main__': | |
command = [sys.executable, "-c", demo_code] | |
# command = [sys.executable, get_crawler_py()] | |
consume(command) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment