Skip to content

Instantly share code, notes, and snippets.

@jvkersch
Last active July 8, 2020 07:00
Show Gist options
  • Save jvkersch/656a935d012be7d41fa662930ef936ea to your computer and use it in GitHub Desktop.
Save jvkersch/656a935d012be7d41fa662930ef936ea to your computer and use it in GitHub Desktop.
traits-futures implementation of a future tied to a running process
import functools
import os
import stat
import subprocess
import threading
from traits.api import (
Any, Bool, Button, Dict, Event, HasStrictTraits,
Instance, Int, List, Property, Str, Tuple, Union,
observe
)
from traitsui.api import CodeEditor, HGroup, Label, UItem, View
from traits_futures.api import TraitsExecutor
from traits_futures.i_job_specification import IJobSpecification
from traits_futures.base_future import BaseFuture
STARTED = "started"
STDOUT = "stdout"
STDERR = "stderr"
CANCELLED = "cancelled"
FINISHED = "finished"
class BackgroundProcessTask:
""" Start a process and monitor its standard output.
"""
DEFAULT_KWARGS = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"encoding": "utf-8",
}
def __init__(self, cmd, args, kwargs):
kwargs = {**kwargs, **self.DEFAULT_KWARGS} # ehh..
self.cmd = cmd
self.args = args
self.kwargs = kwargs
def __call__(self, send, cancelled):
self._proc = proc = subprocess.Popen(
self.cmd,
*self.args,
**self.kwargs,
)
stdout_send = functools.partial(send, STDOUT)
stderr_send = functools.partial(send, STDERR)
stdout_thread = threading.Thread(
target=self._output_thread,
args=(proc.stdout, stdout_send),
daemon=True, # How to avoid this?
)
stderr_thread = threading.Thread(
target=self._output_thread,
args=(proc.stderr, stderr_send),
daemon=True,
)
stdout_thread.start()
stderr_thread.start()
send(STARTED, proc.pid)
# TODO how to handle cancel event?
proc.wait()
stdout_thread.join() # Needs a timeout
stderr_thread.join()
send(FINISHED, proc.returncode)
def _output_thread(self, stream, send):
for line in stream:
send(line)
stream.close()
@IJobSpecification.register
class BackgroundProcess(HasStrictTraits):
cmd = List(Str)
args = Tuple()
kwargs = Dict(Str(), Any())
def background_job(self):
return BackgroundProcessTask(
cmd=self.cmd,
args=self.args,
# Convert TraitsDict to a regular dict
kwargs=dict(self.kwargs),
)
def future(self, cancel, message_receiver):
return ProcessFuture(
_cancel=cancel,
_message_receiver=message_receiver,
)
class ProcessFuture(BaseFuture):
""" A future associated to a subprocess Popen object.
"""
done = Bool()
#: The PID of the process, or None if the process has not been started yet.
pid = Union(None, Int)
#: Fired when new output is available on stdout.
stdout = Event(Str)
#: Fired when new output is available on stderr.
stderr = Event(Str)
#: The return code of the process, or None if the process is still running.
returncode = Union(None, Int)
@property
def result(self):
return self._result
@property
def exception(self):
raise NotImplementedError()
def _process_started(self, message):
self.pid = message
def _process_stdout(self, message):
self.stdout = message
def _process_stderr(self, message):
self.stderr = message
def _process_finished(self, message):
self.done = True
self.returncode = message
def _process_cancelled(self, message):
self.done = True
self.returncode = -1
class ExtendedTraitsExecutor(TraitsExecutor):
def submit_process(self, cmd, *args, **kwargs):
task = BackgroundProcess(
cmd=cmd,
args=args,
kwargs=kwargs,
)
return self.submit(task)
class ProcessViewer(HasStrictTraits):
start = Button("Start new process")
kill = Button("Zap!")
executor = Instance(ExtendedTraitsExecutor, ())
n_running = Property(Int, depends_on="futures")
futures = List(Instance(ProcessFuture))
#: The output from all processes, combined.
output = Str()
view = View(
HGroup(
UItem("start"),
UItem("kill"),
),
UItem("output", editor=CodeEditor(lexer="text")),
HGroup(
Label("# of processes running:"),
UItem("n_running"),
),
resizable=True,
width=800,
height=600,
)
def _get_n_running(self):
return len(self.futures)
@observe("start")
def _start_job(self, event):
future = self.executor.submit_process(
["./process_with_lots_of_output.py"]
)
self.futures.append(future)
@observe("kill")
def _stop_job(self, event):
if len(self.futures) > 0:
self.futures[-1].cancel()
@observe("futures:items:done")
def _remove_future(self, event):
pid = event.object.pid
returncode = event.object.returncode
self.output += f"Process {pid} finished with code {returncode}\n"
del self.futures[self.futures.index(event.object)]
@observe("futures:items:[stdout,stderr]")
def _update_output(self, event):
pid = event.object.pid
message = event.new
if len(message) > 8:
message = message[:8] + "(...)"
stream = event.name
self.output += f"Process {pid} {stream}: {message}\n"
@observe("futures:items:pid")
def _update_start_message(self, event):
pid = event.object.pid
self.output += f"Process started: {pid}\n"
if __name__ == '__main__':
import textwrap
with open("process_with_lots_of_output.py", "w", encoding="utf-8") as fp:
fp.write(textwrap.dedent("""\
#!/usr/bin/env python
import sys
import time
for letter in "abcdefgh":
print(letter*10000, file=sys.stdout)
print(letter*10000, file=sys.stderr)
time.sleep(1.0)
"""))
st = os.stat('process_with_lots_of_output.py')
os.chmod('process_with_lots_of_output.py', st.st_mode | stat.S_IEXEC)
try:
viewer = ProcessViewer()
viewer.configure_traits()
finally:
os.unlink("process_with_lots_of_output.py")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment