Skip to content

Instantly share code, notes, and snippets.

@delivrance
Last active June 13, 2024 09:18
Show Gist options
  • Save delivrance/675a4295ce7dc70f0ce0b164fcdbd798 to your computer and use it in GitHub Desktop.
Save delivrance/675a4295ce7dc70f0ce0b164fcdbd798 to your computer and use it in GitHub Desktop.
Python async input
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def ainput(prompt: str = "") -> str:
with ThreadPoolExecutor(1, "AsyncInput") as executor:
return await asyncio.get_event_loop().run_in_executor(executor, input, prompt)
async def main():
name = await ainput("What's your name? ")
print(f"Hello, {name}!")
asyncio.run(main())
@Trogious
Copy link

My version:

async def ainput(prompt: str = ''):
    with ThreadPoolExecutor(1, 'ainput') as executor:
        return (await asyncio.get_event_loop().run_in_executor(executor, input, prompt)).rstrip()

@delivrance
Copy link
Author

Cannot remember why, as years have passed since.

This snippet is being useful in Pyrogram, in which I indeed eventually switched to using input and getpass as functions to be run in the executor. The use of sys.stdin.readline and the thread initializer definitely looks unnecessary.

Gist updated with a simplified version.

@sosi-deadeye
Copy link

asyncio.to_thread was introduced with Python 3.9.

This makes it much easier to call blocking functions from async code.

async def ainput(prompt: str = ""):
    return await asyncio.to_thread(input, prompt)

@jerrz450
Copy link

very nice idea for asynchrinous inputs, Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment