Last active
November 11, 2019 17:29
-
-
Save nat-n/4db690e10eb833dd654bdae2c0ca22d0 to your computer and use it in GitHub Desktop.
A proof of concept for using asyncio to manage and dynamically interact with a subprocess via stdio.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
class Worker: | |
def __init__(self, cmd, limit=None, separator=b"\n"): | |
self.cmd = cmd | |
self.limit = limit | |
self.separator = separator | |
async def send(self, data): | |
self._proc.stdin.write(data) | |
await self._proc.stdin.drain() | |
async def start(self, on_stdout=None, on_stderr=None): | |
if on_stdout is None and hasattr(self, "_handle_stdout"): | |
on_stdout = self._handle_stdout | |
if on_stderr is None and hasattr(self, "_handle_stderr"): | |
on_stderr = self._handle_stderr | |
proc_kwargs = {"stdin": asyncio.subprocess.PIPE} | |
if on_stdout: | |
proc_kwargs["stdout"] = asyncio.subprocess.PIPE | |
if on_stderr: | |
proc_kwargs["stderr"] = asyncio.subprocess.PIPE | |
if self.limit: | |
proc_kwargs["limit"] = self.limit | |
self._proc = await asyncio.create_subprocess_exec(*self.cmd, **proc_kwargs) | |
if on_stdout: | |
self._stdout_task = asyncio.get_running_loop().create_task( | |
self._buffer_stream(self._proc.stdout, on_stdout) | |
) | |
if on_stderr: | |
self._stderr_task = asyncio.get_running_loop().create_task( | |
self._buffer_stream(self._proc.stderr, on_stderr) | |
) | |
async def _buffer_stream(self, stream, callback): | |
while self._proc.returncode is None: | |
try: | |
if isinstance(self.separator, bytes): | |
data = await stream.readuntil(separator=self.separator) | |
else: | |
data = await stream.readexactly(self.separator) | |
except asyncio.IncompleteReadError as error: | |
data = error.partial | |
if data: | |
await callback(data) | |
return | |
await callback(data) | |
async def stop(self, grace_period=1): | |
self._proc.terminate() | |
if hasattr(self, "_stdout_task"): | |
self._stdout_task.cancel() | |
if hasattr(self, "_stderr_task"): | |
self._stderr_task.cancel() | |
await asyncio.sleep(grace_period) | |
if self._proc.returncode is None: | |
self._proc.kill() | |
class Lancelot(Worker): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.answers = ( | |
"My name is 'Sir Lancelot of Camelot'.\n", | |
"To seek the Holy Grail.\n", | |
"Blue.\n", | |
) | |
self.question_number = 0 | |
async def _handle_stdout(self, data): | |
print("OUT:", data.decode()) | |
if len(self.answers) > self.question_number: | |
await self.send(self.answers[self.question_number].encode()) | |
self.question_number += 1 | |
async def _handle_stderr(self, data): | |
print("ERR:", data.decode()) | |
async def main(): | |
# Configure a new worker | |
knight = Lancelot(("python", "./bridgekeeper.py"), separator=b"?") | |
# Start the subprocess in this worker | |
await knight.start() | |
# Gotta keep the main coro alive to give the worker time to work | |
for _ in range(100): | |
if knight._proc.returncode is not None: | |
print("returncode:", knight._proc.returncode) | |
break | |
await asyncio.sleep(0.01) | |
asyncio.get_event_loop().run_until_complete(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name = input("What... is your name?") | |
quest = input("What... is your quest?") | |
color = input("What... is your favorite color?") | |
raise Exception(f"You answered: {name!r}, {quest!r}, {color!r}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment