Last active
September 4, 2022 13:33
-
-
Save nitori/82d2019b44f9d7cb955c03d732a09259 to your computer and use it in GitHub Desktop.
Simple asyncio twitch bot with a textual based TUI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import asyncio | |
from rich.text import Text | |
from textual.app import App | |
from textual.widgets import ScrollView, Footer | |
from textual_inputs import TextInput | |
try: | |
import tomllib | |
except ImportError: | |
import toml as tomllib | |
class TwitchUi(App): | |
body: ScrollView | |
input: TextInput | |
lines: list[str] | |
async def on_load(self, event): | |
await self.bind("escape", "quit", "Quit") | |
await self.bind("enter", "submit", "Submit") | |
async def on_mount(self) -> None: | |
self.lines = [] | |
await self.view.dock(Footer(), edge="bottom") | |
grid = await self.view.dock_grid(edge="left", name="left") | |
grid.add_column(fraction=1, name='main') | |
grid.add_row(fraction=1, name="content") | |
grid.add_row(size=3, name="input") | |
grid.add_areas( | |
area1="main,content", | |
area2="main,input", | |
) | |
self.body = ScrollView(gutter=1, name='content') | |
self.input = TextInput(name='input') | |
grid.place( | |
area1=self.body, | |
area2=self.input, | |
) | |
await self.input.focus() | |
async def action_submit(self): | |
if not self.input.value: | |
return | |
await self.send_message_callback(self.input.value) | |
self.input.value = '' | |
async def append_line(self, line): | |
self.lines.append(line) | |
self.set_timer(0, self.update_body) | |
async def update_body(self): | |
await self.body.update(Text('\n'.join(self.lines))) | |
def set_send_message_callback(self, callback): | |
self.send_message_callback = callback | |
class TwitchBot: | |
reader: asyncio.StreamReader = None | |
writer: asyncio.StreamWriter = None | |
def __init__(self, view: TwitchUi, username, oauth_token, channel): | |
self.view = view | |
self.view.set_send_message_callback(self.send_view_message) | |
self.username = username | |
self.oauth_token = oauth_token | |
self.channel = '#' + channel.lstrip('#') | |
self._shutdown_event = asyncio.Event() | |
self._writer_task = None | |
self._reader_task = None | |
async def start(self): | |
self.reader, self.writer = await asyncio.open_connection('irc.chat.twitch.tv', 6667) | |
self._writer_task = asyncio.create_task(self._write_loop()) | |
self._reader_task = asyncio.create_task(self._read_loop()) | |
await asyncio.wait([self._writer_task, self._reader_task]) | |
async def _write_loop(self): | |
self._outgoing = asyncio.Queue() | |
while not self._shutdown_event.is_set(): | |
line = await self._outgoing.get() | |
if line is None: | |
continue | |
self.writer.write(f'{line}\r\n'.encode('utf-8')) | |
await self.writer.drain() | |
async def sendline(self, line): | |
self._outgoing.put_nowait(line) | |
async def _read_loop(self): | |
await self.sendline(f'PASS {self.oauth_token}') | |
await self.sendline(f'NICK {self.username}') | |
await self.sendline(f'USER {self.username} * * :{self.username}') | |
while not self._shutdown_event.is_set(): | |
try: | |
line = await self.reader.readuntil(b'\r\n') | |
except asyncio.IncompleteReadError: | |
await self.sendline(None) | |
self._shutdown_event.set() | |
break | |
if line == b'': | |
await self.sendline(None) | |
self._shutdown_event.set() | |
break | |
orig_line = line = line.decode('utf-8').strip('\r\n') | |
nickname = None | |
if line.startswith(':'): | |
prefix, line = line.split(' ', maxsplit=1) | |
prefix = prefix[1:] | |
if '!' in prefix: | |
nickname, _ = prefix.split('!', maxsplit=1) | |
if ' :' in line: | |
line, trailing = line.split(' :', maxsplit=1) | |
args = line.split(' ') | |
args.append(trailing) | |
else: | |
args = line.split(' ') | |
command = args.pop(0).upper() | |
if command in ('353', '372', '375', '376', '002', '003', '004'): | |
# some informational headers, that can be ignored | |
pass | |
elif command == '001': | |
if args[0].casefold() != self.username.casefold(): | |
await self.view.append_line(f'Name was changed to {args[0]}') | |
self.username = args[0] | |
await self.view.append_line(f'Connected. Joining {self.channel} ... ') | |
await self.sendline(f'JOIN {self.channel}') | |
elif command == '366' \ | |
and args[0].casefold() == self.username.casefold() \ | |
and args[1].casefold() == self.channel.casefold(): | |
await self.view.append_line(f'Joined {self.channel}') | |
elif command == 'PRIVMSG': | |
channel = args[0] | |
message = args[1] | |
await self.view.append_line(f'[{channel}] <{nickname}> {message}') | |
else: | |
self.view.log('[UNHANDLED CHAT MESSAGE]: ' + orig_line) | |
async def send_view_message(self, line): | |
await self.sendline(f'PRIVMSG {self.channel} :{line}') | |
await self.view.append_line(f'[{self.channel}] <{self.username}> {line}') | |
async def close(self): | |
self._shutdown_event.set() | |
self.writer.close() | |
await self.writer.wait_closed() | |
async def main(): | |
with open('conf.toml', 'r', encoding='utf-8') as f: | |
config = tomllib.load(f) | |
twitch = config['twitch'] | |
view = TwitchUi(log="textual.log") | |
bot = TwitchBot(view, twitch['username'], twitch['oauth_token'], twitch['channel']) | |
bot_task = asyncio.create_task(bot.start()) | |
view_task = asyncio.create_task(view.process_messages()) | |
_, pending = await asyncio.wait([bot_task, view_task], return_when=asyncio.FIRST_COMPLETED) | |
if bot_task in pending: | |
await bot.close() | |
await bot_task | |
if view_task in pending: | |
await view.shutdown() | |
await view_task | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment