Skip to content

Instantly share code, notes, and snippets.

@KotRikD
Created April 17, 2022 18:03
Show Gist options
  • Save KotRikD/1f36a3675d64467340bdde94aa529048 to your computer and use it in GitHub Desktop.
Save KotRikD/1f36a3675d64467340bdde94aa529048 to your computer and use it in GitHub Desktop.
simple asyncio streams api
'''
original script was written by @lenforiee
'''
import asyncio, time
import re
import traceback
from typing import TYPE_CHECKING, Union
NAME = "kuriso!irc"
WHITE_SPACE = re.compile(r"\r?\n")
def safe_name(string: Union[str, bytes]):
if isinstance(string, bytes):
return string.lower().replace(b" ", b"_").rstrip()
return string.lower().replace(" ", "_").rstrip()
class BanchoChannel:
"""Represetnts a one bancho text channel."""
def __init__(self, name: str, desc: str, destruct: bool = True) -> None:
self.name: str = name
self.destructable: bool = destruct
self.safe_name: str = safe_name(self.name)
self.description: str = desc
self.users = set()
self._key: bytes = b""
def on_user_join(self, user) -> None:
self.users.add(user)
def on_user_leave(self, user) -> None:
self.users.remove(user)
CHANNELS = {
"#osu": BanchoChannel(name="#osu", desc="osu main channel")
}
# Custom Bancho IRC exception.
class BanchoIRCException(Exception):
"""Custom expection."""
def __init__(self, code_error: int, error: str):
self.code: int = code_error
self.error: str = error
def __str__(self):
return repr(self.error)
class IRCClient():
def __init__(self, reader = None, writer = None):
self.loop = loop or asyncio.get_event_loop()
self.nickname: str = "KotRik"
self.ping_time: int = int(time.time())
self.queue: bytearray = bytearray() # Bytearray is much faster than bytes.
self.safe_nick: str = ""
self.socket = writer
self.channels = []
def __str__(self):
return f"{self.nickname}!{self.nickname}@{NAME}"
def dequeue(self):
buffer = bytearray()
buffer += self.queue
self.queue = bytearray()
return buffer
def add_queue(self, message: str):
self.socket.write((message + "\r\n").encode())
def simple_hello(self):
self.add_queue(f":{NAME} 001 {self.nickname} :Welcome to the Internet Relay Network {str(self)}!")
self.add_queue(f":{NAME} 251 :There are 0 users and 0 services on 1 server")
self.add_queue(f":{NAME} 375 :- {NAME} Message of the day -")
self.add_queue(f":{NAME} 372 {self.nickname} :- Please irc alive {time.time()}")
self.add_queue(f":{NAME} 376 :End of MOTD command")
async def data_received(self, data):
message = data.decode("utf-8")
try:
client_data = WHITE_SPACE.split(message)[:-1]
for cmd in client_data:
print(cmd)
if len(cmd) > 0:
command, args = cmd.split(" ", 1)
else:
command, args = (cmd, "")
if command == "CAP":
continue
handler = getattr(self, f"handler_{command.lower()}", None)
if not handler:
raise BanchoIRCException(421, f"{command} :Unknown Command!")
await handler(args)
except BanchoIRCException as e:
self.socket.write(f":{NAME} {e.code} {e.error}\r\n".encode())
except Exception as e:
self.socket.write(f":{NAME} ERROR {repr(e)}".encode())
print(traceback.print_exc())
async def handler_nick(self, nickname):
if not nickname:
return self.add_queue(f":{NAME} 431 :No nickname was found!")
self.nickname = nickname
self.safe_nick = safe_name(self.nickname)
# if self.safe_nick in self.server.clients:
# raise BanchoIRCException(432, f"NICK :{nickname}")
self.add_queue(f":{NAME} 001 {self.nickname} :Welcome to the Internet Relay Network {str(self)}!")
self.add_queue(f":{NAME} 251 :There are 1 users and 0 services on 1 server")
self.add_queue(f":{NAME} 375 :- {NAME} Message of the day -")
self.add_queue(f":{NAME} 372 {self.nickname} :- {time.time()}")
self.add_queue(f":{NAME} 376 :End of MOTD command")
async def handler_ping(self, _):
self.ping_time = int(time.time())
self.add_queue(f":{NAME} PONG :{NAME}")
async def handler_privmsg(self, args):
channel, msg = args.split(" ", 1)
if channel.startswith("#") or channel.startswith("$"):
chan = CHANNELS.get(channel)
if not chan:
raise BanchoIRCException(403, f"{channel} :Cannot send message to not existing channel")
if not chan in self.channels:
raise BanchoIRCException(404, f"{channel} :Cannot send the message to channel!")
print("adding to queue!")
# self.add_queue(f":{str(self)} PRIVMSG {channel} {msg}")
for client in filter(lambda u: u != self, chan.users):
client.add_queue(f":{str(self)} PRIVMSG {channel} {msg}")
# else:
# user = self.server.clients.get(channel, None)
# if not user:
# raise BanchoIRCException(401, f"PRIVMSG :{channel}")
# user.add_queue(f":{str(self)} PRIVMSG {channel} {msg}")
async def handler_part(self, channel: str):
chan = CHANNELS.get(channel, None)
if chan in self.channels:
if not chan:
pass
for client in chan.users:
client.add_queue(f":{str(self)} PART :{channel}")
self.part_channel(chan)
else:
self.add_queue(f":{NAME} 403 {channel} {channel}")
async def handler_join(self, channel: str):
chan = CHANNELS.get(channel, None)
if not chan:
raise BanchoIRCException(403, f"{channel} :No channel named {channel} has been found!")
self.join_channel(chan)
#self.add_queue(f":Unknown TOPIC {chan.name} :{chan.description}")
for client in chan.users:
client.add_queue(f":{str(self)} JOIN :{chan.name}")
nicks = " ".join([client.nickname for client in chan.users])
self.add_queue(f":{NAME} 353 {self.nickname} = {chan.name} :{nicks}")
self.add_queue(f":{NAME} 366 {self.nickname} {chan.name} :End of /NAMES list")
async def handler_user(self, args):
# Not really useful for me.
pass
async def handler_quit(self, args):
resp = f":{str(self)} QUIT :{args.lstrip(':')}"
for chan in self.channels:
for client in chan.users:
client.add_queue(resp)
chan.on_user_leave(self)
self.socket.close()
def join_channel(self, channel):
if channel and channel not in self.channels:
self.channels.append(channel)
channel.on_user_join(self)
def part_channel(self, channel):
if channel and channel in self.channels:
self.channels.remove(channel)
channel.on_user_leave(self)
def connection_lost(self) -> None:
print("connection lost")
self.socket.close()
async def simple_streams_server(reader, writer):
client = IRCClient(reader, writer)
try:
print("peer connected")
while True:
dequeue = client.dequeue()
if dequeue:
writer.write(dequeue)
data = await reader.read(1024)
message = data.decode()
try:
await client.data_received(data)
except BanchoIRCException as e:
writer.write(e.error)
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
await writer.drain()
except ConnectionResetError:
print("peer forcely disconnected")
finally:
client.connection_lost()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(simple_streams_server, '127.0.0.1', 6667, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment