Skip to content

Instantly share code, notes, and snippets.

@gliviu
Last active May 18, 2018 21:03
Show Gist options
  • Save gliviu/bf5e8ea5271b98a99ff147ee1cb94706 to your computer and use it in GitHub Desktop.
Save gliviu/bf5e8ea5271b98a99ff147ee1cb94706 to your computer and use it in GitHub Desktop.
import asyncio
import sys
import time
import re
cmd_regexp = re.compile('^([a-zA-Z0-9_-]+?)(\\((.*)\\))?$')
async def read_command(loop):
line = await loop.run_in_executor(None, sys.stdin.readline)
line = line.rstrip() # remove new line
cmd = None
params = None
match = cmd_regexp.match(line)
if match:
cmd = match[1]
params = (match[3] or '').split(',')
params = list(filter(None, params))
params = list(map(lambda x: x.strip(), params))
return (cmd, params)
def do_some_work(seconds):
time.sleep(seconds)
return f'{seconds} seconds'
async def main_loop():
command_future = asyncio.ensure_future(read_command(loop))
for i in range(10):
res = await loop.run_in_executor(None, do_some_work, 3)
print(f'exec {res}')
if command_future.done():
(cmd, params) = command_future.result()
print(f'received {cmd} {params}')
if (cmd == 'exit'):
break
command_future = asyncio.ensure_future(read_command(loop))
if not command_future.done():
print('press ENTER to finish')
await command_future
loop.stop()
loop = asyncio.get_event_loop()
asyncio.ensure_future(main_loop())
loop.run_forever()
loop.close()
print('done')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment