Skip to content

Instantly share code, notes, and snippets.

@nitori
Last active September 4, 2022 13:33
Show Gist options
  • Save nitori/82d2019b44f9d7cb955c03d732a09259 to your computer and use it in GitHub Desktop.
Save nitori/82d2019b44f9d7cb955c03d732a09259 to your computer and use it in GitHub Desktop.
Simple asyncio twitch bot with a textual based TUI
from __future__ import annotations
import asyncio
from rich.text import Text
from textual.app import App
from textual.widgets import ScrollView, Footer
from textual_inputs import TextInput
try:
import tomllib
except ImportError:
import toml as tomllib
class TwitchUi(App):
body: ScrollView
input: TextInput
lines: list[str]
async def on_load(self, event):
await self.bind("escape", "quit", "Quit")
await self.bind("enter", "submit", "Submit")
async def on_mount(self) -> None:
self.lines = []
await self.view.dock(Footer(), edge="bottom")
grid = await self.view.dock_grid(edge="left", name="left")
grid.add_column(fraction=1, name='main')
grid.add_row(fraction=1, name="content")
grid.add_row(size=3, name="input")
grid.add_areas(
area1="main,content",
area2="main,input",
)
self.body = ScrollView(gutter=1, name='content')
self.input = TextInput(name='input')
grid.place(
area1=self.body,
area2=self.input,
)
await self.input.focus()
async def action_submit(self):
if not self.input.value:
return
await self.send_message_callback(self.input.value)
self.input.value = ''
async def append_line(self, line):
self.lines.append(line)
self.set_timer(0, self.update_body)
async def update_body(self):
await self.body.update(Text('\n'.join(self.lines)))
def set_send_message_callback(self, callback):
self.send_message_callback = callback
class TwitchBot:
reader: asyncio.StreamReader = None
writer: asyncio.StreamWriter = None
def __init__(self, view: TwitchUi, username, oauth_token, channel):
self.view = view
self.view.set_send_message_callback(self.send_view_message)
self.username = username
self.oauth_token = oauth_token
self.channel = '#' + channel.lstrip('#')
self._shutdown_event = asyncio.Event()
self._writer_task = None
self._reader_task = None
async def start(self):
self.reader, self.writer = await asyncio.open_connection('irc.chat.twitch.tv', 6667)
self._writer_task = asyncio.create_task(self._write_loop())
self._reader_task = asyncio.create_task(self._read_loop())
await asyncio.wait([self._writer_task, self._reader_task])
async def _write_loop(self):
self._outgoing = asyncio.Queue()
while not self._shutdown_event.is_set():
line = await self._outgoing.get()
if line is None:
continue
self.writer.write(f'{line}\r\n'.encode('utf-8'))
await self.writer.drain()
async def sendline(self, line):
self._outgoing.put_nowait(line)
async def _read_loop(self):
await self.sendline(f'PASS {self.oauth_token}')
await self.sendline(f'NICK {self.username}')
await self.sendline(f'USER {self.username} * * :{self.username}')
while not self._shutdown_event.is_set():
try:
line = await self.reader.readuntil(b'\r\n')
except asyncio.IncompleteReadError:
await self.sendline(None)
self._shutdown_event.set()
break
if line == b'':
await self.sendline(None)
self._shutdown_event.set()
break
orig_line = line = line.decode('utf-8').strip('\r\n')
nickname = None
if line.startswith(':'):
prefix, line = line.split(' ', maxsplit=1)
prefix = prefix[1:]
if '!' in prefix:
nickname, _ = prefix.split('!', maxsplit=1)
if ' :' in line:
line, trailing = line.split(' :', maxsplit=1)
args = line.split(' ')
args.append(trailing)
else:
args = line.split(' ')
command = args.pop(0).upper()
if command in ('353', '372', '375', '376', '002', '003', '004'):
# some informational headers, that can be ignored
pass
elif command == '001':
if args[0].casefold() != self.username.casefold():
await self.view.append_line(f'Name was changed to {args[0]}')
self.username = args[0]
await self.view.append_line(f'Connected. Joining {self.channel} ... ')
await self.sendline(f'JOIN {self.channel}')
elif command == '366' \
and args[0].casefold() == self.username.casefold() \
and args[1].casefold() == self.channel.casefold():
await self.view.append_line(f'Joined {self.channel}')
elif command == 'PRIVMSG':
channel = args[0]
message = args[1]
await self.view.append_line(f'[{channel}] <{nickname}> {message}')
else:
self.view.log('[UNHANDLED CHAT MESSAGE]: ' + orig_line)
async def send_view_message(self, line):
await self.sendline(f'PRIVMSG {self.channel} :{line}')
await self.view.append_line(f'[{self.channel}] <{self.username}> {line}')
async def close(self):
self._shutdown_event.set()
self.writer.close()
await self.writer.wait_closed()
async def main():
with open('conf.toml', 'r', encoding='utf-8') as f:
config = tomllib.load(f)
twitch = config['twitch']
view = TwitchUi(log="textual.log")
bot = TwitchBot(view, twitch['username'], twitch['oauth_token'], twitch['channel'])
bot_task = asyncio.create_task(bot.start())
view_task = asyncio.create_task(view.process_messages())
_, pending = await asyncio.wait([bot_task, view_task], return_when=asyncio.FIRST_COMPLETED)
if bot_task in pending:
await bot.close()
await bot_task
if view_task in pending:
await view.shutdown()
await view_task
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment