Last active
July 8, 2020 07:00
-
-
Save jvkersch/656a935d012be7d41fa662930ef936ea to your computer and use it in GitHub Desktop.
traits-futures implementation of a future tied to a running process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import os | |
import stat | |
import subprocess | |
import threading | |
from traits.api import ( | |
Any, Bool, Button, Dict, Event, HasStrictTraits, | |
Instance, Int, List, Property, Str, Tuple, Union, | |
observe | |
) | |
from traitsui.api import CodeEditor, HGroup, Label, UItem, View | |
from traits_futures.api import TraitsExecutor | |
from traits_futures.i_job_specification import IJobSpecification | |
from traits_futures.base_future import BaseFuture | |
STARTED = "started" | |
STDOUT = "stdout" | |
STDERR = "stderr" | |
CANCELLED = "cancelled" | |
FINISHED = "finished" | |
class BackgroundProcessTask: | |
""" Start a process and monitor its standard output. | |
""" | |
DEFAULT_KWARGS = { | |
"stdout": subprocess.PIPE, | |
"stderr": subprocess.PIPE, | |
"encoding": "utf-8", | |
} | |
def __init__(self, cmd, args, kwargs): | |
kwargs = {**kwargs, **self.DEFAULT_KWARGS} # ehh.. | |
self.cmd = cmd | |
self.args = args | |
self.kwargs = kwargs | |
def __call__(self, send, cancelled): | |
self._proc = proc = subprocess.Popen( | |
self.cmd, | |
*self.args, | |
**self.kwargs, | |
) | |
stdout_send = functools.partial(send, STDOUT) | |
stderr_send = functools.partial(send, STDERR) | |
stdout_thread = threading.Thread( | |
target=self._output_thread, | |
args=(proc.stdout, stdout_send), | |
daemon=True, # How to avoid this? | |
) | |
stderr_thread = threading.Thread( | |
target=self._output_thread, | |
args=(proc.stderr, stderr_send), | |
daemon=True, | |
) | |
stdout_thread.start() | |
stderr_thread.start() | |
send(STARTED, proc.pid) | |
# TODO how to handle cancel event? | |
proc.wait() | |
stdout_thread.join() # Needs a timeout | |
stderr_thread.join() | |
send(FINISHED, proc.returncode) | |
def _output_thread(self, stream, send): | |
for line in stream: | |
send(line) | |
stream.close() | |
@IJobSpecification.register | |
class BackgroundProcess(HasStrictTraits): | |
cmd = List(Str) | |
args = Tuple() | |
kwargs = Dict(Str(), Any()) | |
def background_job(self): | |
return BackgroundProcessTask( | |
cmd=self.cmd, | |
args=self.args, | |
# Convert TraitsDict to a regular dict | |
kwargs=dict(self.kwargs), | |
) | |
def future(self, cancel, message_receiver): | |
return ProcessFuture( | |
_cancel=cancel, | |
_message_receiver=message_receiver, | |
) | |
class ProcessFuture(BaseFuture): | |
""" A future associated to a subprocess Popen object. | |
""" | |
done = Bool() | |
#: The PID of the process, or None if the process has not been started yet. | |
pid = Union(None, Int) | |
#: Fired when new output is available on stdout. | |
stdout = Event(Str) | |
#: Fired when new output is available on stderr. | |
stderr = Event(Str) | |
#: The return code of the process, or None if the process is still running. | |
returncode = Union(None, Int) | |
@property | |
def result(self): | |
return self._result | |
@property | |
def exception(self): | |
raise NotImplementedError() | |
def _process_started(self, message): | |
self.pid = message | |
def _process_stdout(self, message): | |
self.stdout = message | |
def _process_stderr(self, message): | |
self.stderr = message | |
def _process_finished(self, message): | |
self.done = True | |
self.returncode = message | |
def _process_cancelled(self, message): | |
self.done = True | |
self.returncode = -1 | |
class ExtendedTraitsExecutor(TraitsExecutor): | |
def submit_process(self, cmd, *args, **kwargs): | |
task = BackgroundProcess( | |
cmd=cmd, | |
args=args, | |
kwargs=kwargs, | |
) | |
return self.submit(task) | |
class ProcessViewer(HasStrictTraits): | |
start = Button("Start new process") | |
kill = Button("Zap!") | |
executor = Instance(ExtendedTraitsExecutor, ()) | |
n_running = Property(Int, depends_on="futures") | |
futures = List(Instance(ProcessFuture)) | |
#: The output from all processes, combined. | |
output = Str() | |
view = View( | |
HGroup( | |
UItem("start"), | |
UItem("kill"), | |
), | |
UItem("output", editor=CodeEditor(lexer="text")), | |
HGroup( | |
Label("# of processes running:"), | |
UItem("n_running"), | |
), | |
resizable=True, | |
width=800, | |
height=600, | |
) | |
def _get_n_running(self): | |
return len(self.futures) | |
@observe("start") | |
def _start_job(self, event): | |
future = self.executor.submit_process( | |
["./process_with_lots_of_output.py"] | |
) | |
self.futures.append(future) | |
@observe("kill") | |
def _stop_job(self, event): | |
if len(self.futures) > 0: | |
self.futures[-1].cancel() | |
@observe("futures:items:done") | |
def _remove_future(self, event): | |
pid = event.object.pid | |
returncode = event.object.returncode | |
self.output += f"Process {pid} finished with code {returncode}\n" | |
del self.futures[self.futures.index(event.object)] | |
@observe("futures:items:[stdout,stderr]") | |
def _update_output(self, event): | |
pid = event.object.pid | |
message = event.new | |
if len(message) > 8: | |
message = message[:8] + "(...)" | |
stream = event.name | |
self.output += f"Process {pid} {stream}: {message}\n" | |
@observe("futures:items:pid") | |
def _update_start_message(self, event): | |
pid = event.object.pid | |
self.output += f"Process started: {pid}\n" | |
if __name__ == '__main__': | |
import textwrap | |
with open("process_with_lots_of_output.py", "w", encoding="utf-8") as fp: | |
fp.write(textwrap.dedent("""\ | |
#!/usr/bin/env python | |
import sys | |
import time | |
for letter in "abcdefgh": | |
print(letter*10000, file=sys.stdout) | |
print(letter*10000, file=sys.stderr) | |
time.sleep(1.0) | |
""")) | |
st = os.stat('process_with_lots_of_output.py') | |
os.chmod('process_with_lots_of_output.py', st.st_mode | stat.S_IEXEC) | |
try: | |
viewer = ProcessViewer() | |
viewer.configure_traits() | |
finally: | |
os.unlink("process_with_lots_of_output.py") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment