Skip to content

Instantly share code, notes, and snippets.

@linuxdaemon
Created February 27, 2018 23:01
Show Gist options
  • Save linuxdaemon/dab6824f6a10c5f03bd995927f618fc2 to your computer and use it in GitHub Desktop.
Save linuxdaemon/dab6824f6a10c5f03bd995927f618fc2 to your computer and use it in GitHub Desktop.
###
### UNTESTED - EXAMPLE ONLY
###
import asyncio
import shlex
import sys
import re
from subprocess import PIPE
class SubprocessStreamIO:
def __init__(self, command):
self._command = command
self._proc = None
self._line_futures = []
self.loop = asyncio.get_event_loop()
async def start(self):
self._proc = await asyncio.create_subprocess_exec(shlex.split(self._command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
async def read_loop(self):
while True:
line = await self._proc.stdout.readline()
if not line:
break
asyncio.ensure_future(self.on_line(line)) # Queue up the on_line call but don't block this coroutine
async def error_loop(self):
# Probably want to check stderr as well
raise NotImplementedError
async def await_command(self, command, response_pattern):
# Send a command and await a specific pattern response
future = self.wait_fut(response_pattern)
await self.send_command(command)
return await future # Return the matched pattern
async def send_command(self, command):
await self._proc.stdin.write(command.encode())
async def regex_match(self, regex, text):
# Should probably use loop.run_in_executor() to avoid blocking the main thread with long matches
return regex.match(text)
async def on_line(self, raw_line):
line = raw_line.decode()
for regex, fut in self._line_futures:
match = await self.regex_match(regex, line)
# This pattern matched, give whoever is awaiting this future the match object
if match and not fut.done():
fut.set_result(match)
self.log_line(line)
def log_line(self, line):
# Log the line to stdout
now = datetime.now()
print(now.strftime("[%H:%M:%S]"), line)
async def compile_regex(self, pattern, flags=0):
# Should probably be done in a thread to avoid blocking the event loop
return re.compile(pattern, flags)
def wait_for(self, pattern):
future = self.loop.create_future()
regex = await self.compile_regex(pattern, re.IGNORECASE)
waiter = regex, future
def _callback(fut):
self._line_futures.remove(waiter)
self._line_futures.append(waiter)
future.add_done_callback(_callback)
return future
async def __aenter__(self):
return self
async def __aexit__(self, *exc_info):
await self._proc.stdin.write_eof()
await self._proc.wait()
async def main():
async with SubprocessStreamIO("command arg1 arg2") as proc:
asyncio.ensure_future(proc.read_loop())
await proc.await_command("STOP", r"^Server\sstopping")
# Server shutdown
def run():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment