Created
February 27, 2018 23:01
-
-
Save linuxdaemon/dab6824f6a10c5f03bd995927f618fc2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### | |
### UNTESTED - EXAMPLE ONLY | |
### | |
import asyncio | |
import shlex | |
import sys | |
import re | |
from subprocess import PIPE | |
class SubprocessStreamIO: | |
def __init__(self, command): | |
self._command = command | |
self._proc = None | |
self._line_futures = [] | |
self.loop = asyncio.get_event_loop() | |
async def start(self): | |
self._proc = await asyncio.create_subprocess_exec(shlex.split(self._command), stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
async def read_loop(self): | |
while True: | |
line = await self._proc.stdout.readline() | |
if not line: | |
break | |
asyncio.ensure_future(self.on_line(line)) # Queue up the on_line call but don't block this coroutine | |
async def error_loop(self): | |
# Probably want to check stderr as well | |
raise NotImplementedError | |
async def await_command(self, command, response_pattern): | |
# Send a command and await a specific pattern response | |
future = self.wait_fut(response_pattern) | |
await self.send_command(command) | |
return await future # Return the matched pattern | |
async def send_command(self, command): | |
await self._proc.stdin.write(command.encode()) | |
async def regex_match(self, regex, text): | |
# Should probably use loop.run_in_executor() to avoid blocking the main thread with long matches | |
return regex.match(text) | |
async def on_line(self, raw_line): | |
line = raw_line.decode() | |
for regex, fut in self._line_futures: | |
match = await self.regex_match(regex, line) | |
# This pattern matched, give whoever is awaiting this future the match object | |
if match and not fut.done(): | |
fut.set_result(match) | |
self.log_line(line) | |
def log_line(self, line): | |
# Log the line to stdout | |
now = datetime.now() | |
print(now.strftime("[%H:%M:%S]"), line) | |
async def compile_regex(self, pattern, flags=0): | |
# Should probably be done in a thread to avoid blocking the event loop | |
return re.compile(pattern, flags) | |
def wait_for(self, pattern): | |
future = self.loop.create_future() | |
regex = await self.compile_regex(pattern, re.IGNORECASE) | |
waiter = regex, future | |
def _callback(fut): | |
self._line_futures.remove(waiter) | |
self._line_futures.append(waiter) | |
future.add_done_callback(_callback) | |
return future | |
async def __aenter__(self): | |
return self | |
async def __aexit__(self, *exc_info): | |
await self._proc.stdin.write_eof() | |
await self._proc.wait() | |
async def main(): | |
async with SubprocessStreamIO("command arg1 arg2") as proc: | |
asyncio.ensure_future(proc.read_loop()) | |
await proc.await_command("STOP", r"^Server\sstopping") | |
# Server shutdown | |
def run(): | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(main()) | |
finally: | |
loop.close() | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment