Created
April 17, 2022 18:03
-
-
Save KotRikD/1f36a3675d64467340bdde94aa529048 to your computer and use it in GitHub Desktop.
simple asyncio streams api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
original script was written by @lenforiee | |
''' | |
import asyncio, time | |
import re | |
import traceback | |
from typing import TYPE_CHECKING, Union | |
NAME = "kuriso!irc" | |
WHITE_SPACE = re.compile(r"\r?\n") | |
def safe_name(string: Union[str, bytes]): | |
if isinstance(string, bytes): | |
return string.lower().replace(b" ", b"_").rstrip() | |
return string.lower().replace(" ", "_").rstrip() | |
class BanchoChannel: | |
"""Represetnts a one bancho text channel.""" | |
def __init__(self, name: str, desc: str, destruct: bool = True) -> None: | |
self.name: str = name | |
self.destructable: bool = destruct | |
self.safe_name: str = safe_name(self.name) | |
self.description: str = desc | |
self.users = set() | |
self._key: bytes = b"" | |
def on_user_join(self, user) -> None: | |
self.users.add(user) | |
def on_user_leave(self, user) -> None: | |
self.users.remove(user) | |
CHANNELS = { | |
"#osu": BanchoChannel(name="#osu", desc="osu main channel") | |
} | |
# Custom Bancho IRC exception. | |
class BanchoIRCException(Exception): | |
"""Custom expection.""" | |
def __init__(self, code_error: int, error: str): | |
self.code: int = code_error | |
self.error: str = error | |
def __str__(self): | |
return repr(self.error) | |
class IRCClient(): | |
def __init__(self, reader = None, writer = None): | |
self.loop = loop or asyncio.get_event_loop() | |
self.nickname: str = "KotRik" | |
self.ping_time: int = int(time.time()) | |
self.queue: bytearray = bytearray() # Bytearray is much faster than bytes. | |
self.safe_nick: str = "" | |
self.socket = writer | |
self.channels = [] | |
def __str__(self): | |
return f"{self.nickname}!{self.nickname}@{NAME}" | |
def dequeue(self): | |
buffer = bytearray() | |
buffer += self.queue | |
self.queue = bytearray() | |
return buffer | |
def add_queue(self, message: str): | |
self.socket.write((message + "\r\n").encode()) | |
def simple_hello(self): | |
self.add_queue(f":{NAME} 001 {self.nickname} :Welcome to the Internet Relay Network {str(self)}!") | |
self.add_queue(f":{NAME} 251 :There are 0 users and 0 services on 1 server") | |
self.add_queue(f":{NAME} 375 :- {NAME} Message of the day -") | |
self.add_queue(f":{NAME} 372 {self.nickname} :- Please irc alive {time.time()}") | |
self.add_queue(f":{NAME} 376 :End of MOTD command") | |
async def data_received(self, data): | |
message = data.decode("utf-8") | |
try: | |
client_data = WHITE_SPACE.split(message)[:-1] | |
for cmd in client_data: | |
print(cmd) | |
if len(cmd) > 0: | |
command, args = cmd.split(" ", 1) | |
else: | |
command, args = (cmd, "") | |
if command == "CAP": | |
continue | |
handler = getattr(self, f"handler_{command.lower()}", None) | |
if not handler: | |
raise BanchoIRCException(421, f"{command} :Unknown Command!") | |
await handler(args) | |
except BanchoIRCException as e: | |
self.socket.write(f":{NAME} {e.code} {e.error}\r\n".encode()) | |
except Exception as e: | |
self.socket.write(f":{NAME} ERROR {repr(e)}".encode()) | |
print(traceback.print_exc()) | |
async def handler_nick(self, nickname): | |
if not nickname: | |
return self.add_queue(f":{NAME} 431 :No nickname was found!") | |
self.nickname = nickname | |
self.safe_nick = safe_name(self.nickname) | |
# if self.safe_nick in self.server.clients: | |
# raise BanchoIRCException(432, f"NICK :{nickname}") | |
self.add_queue(f":{NAME} 001 {self.nickname} :Welcome to the Internet Relay Network {str(self)}!") | |
self.add_queue(f":{NAME} 251 :There are 1 users and 0 services on 1 server") | |
self.add_queue(f":{NAME} 375 :- {NAME} Message of the day -") | |
self.add_queue(f":{NAME} 372 {self.nickname} :- {time.time()}") | |
self.add_queue(f":{NAME} 376 :End of MOTD command") | |
async def handler_ping(self, _): | |
self.ping_time = int(time.time()) | |
self.add_queue(f":{NAME} PONG :{NAME}") | |
async def handler_privmsg(self, args): | |
channel, msg = args.split(" ", 1) | |
if channel.startswith("#") or channel.startswith("$"): | |
chan = CHANNELS.get(channel) | |
if not chan: | |
raise BanchoIRCException(403, f"{channel} :Cannot send message to not existing channel") | |
if not chan in self.channels: | |
raise BanchoIRCException(404, f"{channel} :Cannot send the message to channel!") | |
print("adding to queue!") | |
# self.add_queue(f":{str(self)} PRIVMSG {channel} {msg}") | |
for client in filter(lambda u: u != self, chan.users): | |
client.add_queue(f":{str(self)} PRIVMSG {channel} {msg}") | |
# else: | |
# user = self.server.clients.get(channel, None) | |
# if not user: | |
# raise BanchoIRCException(401, f"PRIVMSG :{channel}") | |
# user.add_queue(f":{str(self)} PRIVMSG {channel} {msg}") | |
async def handler_part(self, channel: str): | |
chan = CHANNELS.get(channel, None) | |
if chan in self.channels: | |
if not chan: | |
pass | |
for client in chan.users: | |
client.add_queue(f":{str(self)} PART :{channel}") | |
self.part_channel(chan) | |
else: | |
self.add_queue(f":{NAME} 403 {channel} {channel}") | |
async def handler_join(self, channel: str): | |
chan = CHANNELS.get(channel, None) | |
if not chan: | |
raise BanchoIRCException(403, f"{channel} :No channel named {channel} has been found!") | |
self.join_channel(chan) | |
#self.add_queue(f":Unknown TOPIC {chan.name} :{chan.description}") | |
for client in chan.users: | |
client.add_queue(f":{str(self)} JOIN :{chan.name}") | |
nicks = " ".join([client.nickname for client in chan.users]) | |
self.add_queue(f":{NAME} 353 {self.nickname} = {chan.name} :{nicks}") | |
self.add_queue(f":{NAME} 366 {self.nickname} {chan.name} :End of /NAMES list") | |
async def handler_user(self, args): | |
# Not really useful for me. | |
pass | |
async def handler_quit(self, args): | |
resp = f":{str(self)} QUIT :{args.lstrip(':')}" | |
for chan in self.channels: | |
for client in chan.users: | |
client.add_queue(resp) | |
chan.on_user_leave(self) | |
self.socket.close() | |
def join_channel(self, channel): | |
if channel and channel not in self.channels: | |
self.channels.append(channel) | |
channel.on_user_join(self) | |
def part_channel(self, channel): | |
if channel and channel in self.channels: | |
self.channels.remove(channel) | |
channel.on_user_leave(self) | |
def connection_lost(self) -> None: | |
print("connection lost") | |
self.socket.close() | |
async def simple_streams_server(reader, writer): | |
client = IRCClient(reader, writer) | |
try: | |
print("peer connected") | |
while True: | |
dequeue = client.dequeue() | |
if dequeue: | |
writer.write(dequeue) | |
data = await reader.read(1024) | |
message = data.decode() | |
try: | |
await client.data_received(data) | |
except BanchoIRCException as e: | |
writer.write(e.error) | |
addr = writer.get_extra_info('peername') | |
print(f"Received {message!r} from {addr!r}") | |
await writer.drain() | |
except ConnectionResetError: | |
print("peer forcely disconnected") | |
finally: | |
client.connection_lost() | |
loop = asyncio.get_event_loop() | |
coro = asyncio.start_server(simple_streams_server, '127.0.0.1', 6667, loop=loop) | |
server = loop.run_until_complete(coro) | |
# Serve requests until Ctrl+C is pressed | |
print('Serving on {}'.format(server.sockets[0].getsockname())) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
# Close the server | |
server.close() | |
loop.run_until_complete(server.wait_closed()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment