Created
August 30, 2020 19:57
-
-
Save leimao/7d8f30c1832bab8c0dddcf0cd13362d4 to your computer and use it in GitHub Desktop.
EdgeDB AsyncIO Tutorial ChatBot: https://youtu.be/SyiTd4rLb2s
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import IO | |
import asyncio | |
import sys | |
import contextlib | |
import aiofiles.threadpool | |
from chat_streams import split_lines, write, handle_writes | |
async def handle_reads(reader: asyncio.StreamReader) -> None: | |
async for message in split_lines(reader): | |
text = message.decode() | |
print(f"Received {text!r}") | |
if text == "quit\n": | |
break | |
async def stream_file_to_queue(file: IO[str], queue: asyncio.Queue[bytes]) -> None: | |
loop = asyncio.get_event_loop() | |
async for message in aiofiles.threadpool.wrap(file, loop=loop): | |
await queue.put(message.encode()) | |
async def send_file(file: IO[str]) -> None: | |
write_queue: asyncio.Queue[bytes] = asyncio.Queue() | |
reader, writer = await asyncio.open_connection("127.0.0.1", 8888) | |
read_handler = asyncio.create_task(handle_reads(reader)) | |
write_handler = asyncio.create_task(handle_writes(writer, write_queue)) | |
copy_handler = asyncio.create_task(stream_file_to_queue(file, write_queue)) | |
done, pending = await asyncio.wait([read_handler, write_handler, copy_handler], return_when=asyncio.FIRST_COMPLETED) | |
print("Closing the connection") | |
for task in pending: | |
task.cancel() | |
with contextlib.suppress(asyncio.CancelledError): | |
await task | |
if __name__ == "__main__": | |
asyncio.run(send_file(sys.stdin)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import asyncio | |
import contextlib | |
from typing import Dict, Callable | |
from chat_streams import split_lines, handle_writes | |
users: Dict[str, asyncio.Queue[bytes]] = {} | |
async def handle_connection(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: | |
queue: asyncio.Queue[bytes] = asyncio.Queue() | |
write_handler = asyncio.create_task(handle_writes(writer, queue)) | |
ctx = { | |
"addr": str(writer.get_extra_info("peername")), | |
"my_nick": "", | |
} | |
try: | |
await handle_commands(reader, queue, ctx) | |
finally: | |
my_nick = ctx["my_nick"] | |
if my_nick in users: | |
del users[my_nick] | |
print("Closing the connection") | |
await queue.put(b"") | |
with contextlib.suppress(asyncio.CancelledError): | |
await write_handler | |
async def handle_commands(reader: asyncio.StreamReader, queue: asyncio.Queue[bytes], ctx: Dict[str, str]) -> None: | |
addr = ctx["addr"] | |
my_nick = ctx["my_nick"] | |
await queue.put(b"Welcome! Please introduce yourself. \n Format: I'm [username]") | |
async for message in split_lines(reader): | |
text = message.decode() | |
print(f"Received {text!r} from {addr!r}") | |
if text == "quit\n": | |
await queue.put(message) | |
break | |
if text.startswith("I'm "): | |
command, my_nick = text.split(" ", 1) | |
users[my_nick] = queue | |
elif text.startswith("@"): | |
if not my_nick: | |
await queue.put(b"Please introduce yourself.") | |
continue | |
at_nick, user_message = text.split(" ", 1) | |
nick = at_nick[1:] | |
if nick not in users: | |
await queue.put(b"Unknown user: " + nick.encode()) | |
continue | |
user_message = f"<{my_nick}> {user_message}" | |
await users[nick].put(user_message.encode()) | |
async def main() -> None: | |
server = await asyncio.start_server(handle_connection, "127.0.0.1", 8888) | |
addr = server.sockets[0].getsockname() if server.sockets else "unknown" | |
print(f"Serving on {addr}") | |
async with server: | |
await server.serve_forever() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import AsyncIterator | |
import asyncio | |
import sys | |
async def split_lines(reader: asyncio.StreamReader) -> AsyncIterator[bytes]: | |
data = b"" | |
try: | |
while data := data + await reader.read(100): | |
if b"\n" in data: | |
message, data = data.split(b"\n", 1) | |
yield message | |
except ConnectionResetError: | |
pass | |
if data: | |
yield data | |
async def write(writer: asyncio.StreamWriter, message: bytes) -> None: | |
print("Sending bytes: ", end="") | |
if not message.endswith(b"\n"): | |
message += b"\n" | |
# simulate network slowness | |
# sending bytes one by one | |
for ch in message: | |
# simulated latency | |
await asyncio.sleep(0.1) | |
writer.write(bytes([ch])) | |
print(f"{hex(ch)[2:].upper():0>2}", end="") | |
sys.stdout.flush() | |
if ch == 10: | |
print() | |
await writer.drain() | |
async def handle_writes(writer: asyncio.StreamWriter, queue: asyncio.Queue[bytes]) -> None: | |
try: | |
while (message := await queue.get()) != b"": | |
await write(writer, message) | |
finally: | |
await writer.drain() | |
writer.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment