Skip to content

Instantly share code, notes, and snippets.

@synodriver
Last active May 29, 2023 05:04
Show Gist options
  • Save synodriver/d36217cdc92228b0f1ec463c510e0f50 to your computer and use it in GitHub Desktop.
Save synodriver/d36217cdc92228b0f1ec463c510e0f50 to your computer and use it in GitHub Desktop.
asynchronous ttyd
"""
Copyright (c) 2008-2023 synodriver <[email protected]>
"""
import asyncio
import os
import pty
import sys
async def read_stdout(proc):
while line := await proc.stdout.readline():
print(line.decode())
async def read_stderr(proc):
while line := await proc.stderr.readline():
print(line.decode(), file=sys.stderr)
async def aiowrite(fd: int, data: bytes, loop=None):
if loop is None:
loop = asyncio.get_running_loop()
future = loop.create_future()
def write_cb(fd, data):
os.write(fd, data)
loop.remove_writer(fd)
future.set_result(None)
loop.add_writer(fd, write_cb, fd, data)
await future
async def main():
master, slave = pty.openpty()
proc = await asyncio.create_subprocess_shell("sh", stdin=slave, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, close_fds=True)
asyncio.create_task(read_stdout(proc))
asyncio.create_task(read_stderr(proc))
# proc.stdin.write(b"top\n")
# await proc.stdin.drain()
await aiowrite(master, b"top\n")
# await asyncio.sleep(1)
await proc.wait()
# while True:
# print(os.write(master, b""))
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment