Last active
November 4, 2025 21:44
-
-
Save luizomf/9ec7533c6bafdc80352aa252cb01294e to your computer and use it in GitHub Desktop.
Esse é um código Python que pode ser usado como um digitador em apresentações. Você adiciona um arquivo qualquer, fala em LEXER qual a linguagem deseja syntax highlight e ele digita para você. Os comandos são ctrl+s, inicia o stream da digitação, p pausa, c continua, q termina o stream, ctrl+x sai do aplicativo. Foi feito com Python e Textual. E…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| import pathlib | |
| import random | |
| import re | |
| import sys | |
| from collections.abc import AsyncIterator | |
| from contextlib import asynccontextmanager | |
| from datetime import UTC, datetime, timedelta | |
| from enum import Enum | |
| from typing import Any, ClassVar | |
| from textual.app import App, ComposeResult | |
| from textual.binding import Binding, BindingType | |
| from textual.widgets import TextArea | |
| from textual.widgets.text_area import Edit, EditResult | |
| os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide" # why???? | |
| import pygame | |
| from pygame import mixer | |
| from pynput.keyboard import Key, KeyCode, Listener | |
| from textual import events, work | |
| # Utils | |
| RE_SPACE = re.compile(r"\s+") | |
| RE_SPACES_TO_TAB = re.compile(r" {4}") | |
| # Você pode mudar o caminho, estou assumindo a pasta do arquivo atual | |
| ROOT_DIR = pathlib.Path(__file__).parent.resolve() | |
| # Quanto maior o número, mais rápido (o som não muda) | |
| SPEED = 2.13 | |
| LEXER = "python" # qualquer um do Pygments ao meu ver | |
| # IMPORTANTE | |
| # Este é o caminho do arquivo que queremos digitar | |
| CODE_PATH = pathlib.Path(__file__) | |
| # Vou ler o arquivo e trocar 4 espaços para tabs | |
| # Fica mais natural na digitação, se não ele fica pressionando espaço até | |
| # indentar. | |
| CODE = CODE_PATH.read_text().strip() | |
| CODE = RE_SPACES_TO_TAB.sub(" ", CODE) | |
| system_random = random.SystemRandom() | |
| # Você vai precisar adicionar algum arquivo .wav para o som do teclado, não | |
| # consegui colocar isso no gist (pasta ./sound/keyboard2.wav) | |
| SOUNDS_DIR = ROOT_DIR / "sound" | |
| KEY_SOUND = SOUNDS_DIR / "keyboard2.wav" | |
| SOUND_VOLUME = 0.99 # 0 a 1 | |
| # Um valor que será enviado sempre que o producer terminar de produzir valores | |
| # exhausted. Tem esses caracteres loucos para nunca confundir com algum | |
| # caractere real | |
| PRODUCER_ENDED_HINT = "--__PRODUCER_-_ENDED__--" | |
| # A única coisa que conseguiu tocar e pausar o áudio de maneira simples foi isso | |
| pygame.init() | |
| mixer.init() | |
| # Configurações dos valores das mensagens que vamos enviar | |
| class MessageValues(Enum): | |
| QUIT = "QUIT" | |
| UNPAUSE = "UNPAUSE" | |
| PAUSE = "PAUSE" | |
| MUTE = "MUTE" | |
| INCREASE_VOL = "INCREASE_VOL" | |
| DECREASE_VOL = "DECREASE_VOL" | |
| # Configurações do estado atual | |
| class StreamStates(Enum): | |
| STOPPED = "STOPPED" | |
| RUNNING = "RUNNING" | |
| PAUSED = "PAUSED" | |
| IDLE = "IDLE" | |
| # Teclas de atalho | |
| class KeyValues(Enum): | |
| p = "p" # pause | |
| q = "q" # quit | |
| c = "c" # continue / unpause | |
| u = "u" # continue / unpause | |
| m = "m" # mute / unmute | |
| v = "v" # decrease_volume | |
| V = "V" # increase_volume | |
| # Essa mensagens serão usadas no pub/sub para indicar o status atual do app | |
| # O owner só está ali porque eu estava depurando quem enviou a mensagem. | |
| class Message: | |
| def __init__(self, value: MessageValues, *, owner: str) -> None: | |
| self.value = value | |
| self.owner = owner | |
| @classmethod | |
| def send_quit(cls, owner: str) -> "Message": | |
| return cls(value=MessageValues.QUIT, owner=owner) | |
| @classmethod | |
| def send_pause(cls, owner: str) -> "Message": | |
| return cls(value=MessageValues.PAUSE, owner=owner) | |
| @classmethod | |
| def send_unpause(cls, owner: str) -> "Message": | |
| return cls(value=MessageValues.UNPAUSE, owner=owner) | |
| @classmethod | |
| def mute(cls, owner: str) -> "Message": | |
| return cls(value=MessageValues.MUTE, owner=owner) | |
| @classmethod | |
| def increase_volume(cls, owner: str) -> "Message": | |
| return cls(value=MessageValues.INCREASE_VOL, owner=owner) | |
| @classmethod | |
| def decrease_volume(cls, owner: str) -> "Message": | |
| return cls(value=MessageValues.DECREASE_VOL, owner=owner) | |
| # Nosso broadcaster para o pub/sub | |
| class Broadcast: | |
| def __init__(self) -> None: | |
| self._subscribers: set[asyncio.Queue[Message]] = set() | |
| self._lock = asyncio.Lock() | |
| self._closed = False | |
| async def publish(self, message: Message) -> None: | |
| async with self._lock: | |
| if self._closed: | |
| return | |
| await asyncio.gather(*(q.put(message) for q in self._subscribers)) | |
| @asynccontextmanager | |
| async def subscribe(self) -> AsyncIterator[asyncio.Queue[Message]]: | |
| q: asyncio.Queue[Message] = asyncio.Queue() | |
| async with self._lock: | |
| if self._closed: | |
| msg = "Broadcast is already closed" | |
| raise RuntimeError(msg) | |
| self._subscribers.add(q) | |
| try: | |
| yield q | |
| finally: | |
| async with self._lock: | |
| self._subscribers.discard(q) | |
| async def close(self) -> None: | |
| self._closed = True | |
| async with self._lock: | |
| for sub in self._subscribers: | |
| await sub.put(Message.send_quit("Broadcast")) | |
| self._subscribers.clear() | |
| # IMPORTANTE: qualquer sistema operacional que se prese, detecta isso como | |
| # vírus. Faz sentido, por que estamos capturando o teclado (daria um keylogger). | |
| # A única lib que encontrei que fez um bom trabalho no Mac para capturar as | |
| # teclas foi o pynput. Tome cuidado, porque isso captura as teclas no sistema | |
| # como um todo. Então, você vai estar digitando no navegador e mandando | |
| # comandos para o app se estiver com ele rodando. | |
| # capture_keys aqui não vai permitir digitação de nenhuma tecla enquanto | |
| # o listener estiver ouvindo. | |
| def keys_pressed_sync( | |
| keys_queue: asyncio.Queue[str], *, capture_keys: bool = False | |
| ) -> Listener: | |
| loop = asyncio.get_event_loop() | |
| queue: asyncio.Queue[str] = keys_queue | |
| def on_press(key: KeyCode | Key | None) -> Any: | |
| if isinstance(key, KeyCode) and key.char == KeyValues.q.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.q.value) | |
| if isinstance(key, KeyCode) and key.char == KeyValues.p.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.p.value) | |
| if isinstance(key, KeyCode) and key.char == KeyValues.c.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.c.value) | |
| if isinstance(key, KeyCode) and key.char == KeyValues.u.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.u.value) | |
| if isinstance(key, KeyCode) and key.char == KeyValues.m.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.m.value) | |
| if isinstance(key, KeyCode) and key.char == KeyValues.v.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.v.value) | |
| if isinstance(key, KeyCode) and key.char == KeyValues.V.value: | |
| loop.call_soon_threadsafe(queue.put_nowait, KeyValues.V.value) | |
| def on_release(key: KeyCode | Key | None) -> Any: | |
| if isinstance(key, KeyCode) and key.char == "q": | |
| return False | |
| return None | |
| return Listener(on_press=on_press, on_release=on_release, suppress=capture_keys) | |
| # Como o pynput usar thread e este app usa asyncio, fiz um certo malabarismo para | |
| # tentar manter tudo threadsafe. Ao meu ver, tudo funcionando até o momento. | |
| async def keys_queue_consumer( | |
| broadcast: Broadcast, keys_queue: asyncio.Queue[str] | |
| ) -> None: | |
| async with broadcast.subscribe() as q: | |
| while True: | |
| if not q.empty(): | |
| message = q.get_nowait() | |
| if message.value == MessageValues.QUIT: | |
| break | |
| q.task_done() | |
| if not keys_queue.empty(): | |
| key = keys_queue.get_nowait() | |
| keys_queue.task_done() | |
| if key == KeyValues.q.value: | |
| await broadcast.publish(Message.send_quit("keys_queue_consumer")) | |
| break | |
| if key == KeyValues.p.value: | |
| await broadcast.publish(Message.send_pause("keys_queue_consumer")) | |
| elif key in [KeyValues.c.value, KeyValues.u.value]: | |
| await broadcast.publish(Message.send_unpause("keys_queue_consumer")) | |
| elif key == KeyValues.m.value: | |
| await broadcast.publish(Message.mute("keys_queue_consumer")) | |
| elif key == KeyValues.V.value: | |
| await broadcast.publish( | |
| Message.increase_volume("keys_queue_consumer") | |
| ) | |
| elif key == KeyValues.v.value: | |
| await broadcast.publish( | |
| Message.decrease_volume("keys_queue_consumer") | |
| ) | |
| await asyncio.sleep(0) | |
| class KeyboardSound: | |
| def __init__( | |
| self, | |
| volume: float, | |
| sound_path: pathlib.Path, | |
| *, | |
| mute: bool = False, | |
| volume_step: float = 0.01, | |
| ) -> None: | |
| self._volume: float = volume | |
| self._last_volume = volume | |
| self.sound_path = sound_path | |
| self._mute = mute | |
| self.music = pygame.mixer.music | |
| self._volume_step = volume_step | |
| self.load() | |
| @property | |
| def last_volume(self) -> float: | |
| return self._last_volume | |
| @property | |
| def is_muted(self) -> float: | |
| return self._mute | |
| @property | |
| def volume(self) -> float: | |
| return self._volume | |
| @volume.setter | |
| def volume(self, volume: float) -> None: | |
| volume = min(1, volume) | |
| volume = max(0, volume) | |
| if self._volume == volume: | |
| return | |
| self._volume = volume | |
| self.music.set_volume(volume) | |
| if self._volume > 0: | |
| self._mute = False | |
| self._last_volume = volume | |
| else: | |
| self._mute = True | |
| def load(self) -> None: | |
| self.music.load(str(KEY_SOUND)) | |
| self.music.set_volume(SOUND_VOLUME) | |
| def toggle_mute(self) -> None: | |
| self._mute = not self._mute | |
| if self._mute: | |
| self.volume = 0 | |
| else: | |
| self.volume = self._last_volume | |
| def play(self, loops: int = -1) -> None: | |
| self.music.play(loops=loops) | |
| def stop(self) -> None: | |
| self.music.stop() | |
| def pause(self) -> None: | |
| self.music.pause() | |
| def unpause(self) -> None: | |
| self.music.unpause() | |
| def increase_volume(self) -> float: | |
| self.volume += self._volume_step | |
| return self.volume | |
| def decrease_volume(self) -> float: | |
| self.volume -= self._volume_step | |
| return self.volume | |
| # Aqui tocamos o som com o pygame e também sinalizamos o evento. Se o evento | |
| # estiver com set(), o app está liberado para rodar, se estiver em clear() | |
| # o app vai estar pausado. Foi a maneira mais simples de montar um play e pause | |
| # global. | |
| async def play_keyboard_sound( | |
| broadcast: Broadcast, play_pause_event: asyncio.Event, keyboard_sound: KeyboardSound | |
| ) -> None: | |
| keyboard_sound.play() | |
| play_pause_event.set() # set() está liberado, clear() está pausado | |
| async with broadcast.subscribe() as messages_queue: | |
| while True: | |
| message = await messages_queue.get() | |
| if message.value == MessageValues.QUIT: | |
| keyboard_sound.stop() | |
| messages_queue.task_done() | |
| play_pause_event.set() | |
| break | |
| if message.value == MessageValues.PAUSE: | |
| keyboard_sound.pause() | |
| play_pause_event.clear() | |
| elif message.value == MessageValues.UNPAUSE: | |
| keyboard_sound.unpause() | |
| play_pause_event.set() | |
| elif message.value == MessageValues.MUTE: | |
| keyboard_sound.toggle_mute() | |
| elif message.value == MessageValues.INCREASE_VOL: | |
| keyboard_sound.increase_volume() | |
| elif message.value == MessageValues.DECREASE_VOL: | |
| keyboard_sound.decrease_volume() | |
| messages_queue.task_done() | |
| # Aqui, vamos ler letra por letra do arquivo e jogar elas em uma async queue | |
| # posteriormente, outra função vai ler dessa queue e informar que já terminou de | |
| # ler. | |
| async def stream_out( | |
| stream_queue: asyncio.Queue[str], | |
| broadcast: Broadcast, | |
| play_pause_event: asyncio.Event, | |
| ) -> str: | |
| should_stop = False | |
| memory_buffer = "" # vou recriar o arquivo a partir do stream | |
| async with broadcast.subscribe() as q: | |
| async def read_keys() -> None: | |
| nonlocal should_stop | |
| while True: | |
| await asyncio.sleep(0) | |
| if not q.empty(): | |
| message = await q.get() | |
| q.task_done() | |
| if message.value == MessageValues.QUIT: | |
| should_stop = True | |
| break | |
| async def read_chars() -> None: | |
| nonlocal memory_buffer | |
| current_index = 0 | |
| while True: | |
| if current_index >= len(CODE): | |
| await broadcast.publish(Message.send_quit("stream_out")) | |
| break | |
| if should_stop: | |
| await broadcast.publish(Message.send_quit("stream_out")) | |
| break | |
| char = CODE[current_index] | |
| memory_buffer += char | |
| rand = system_random.uniform( | |
| 0.000001, system_random.uniform(0.00001, 0.5) | |
| ) | |
| await asyncio.sleep(rand / SPEED) | |
| await stream_queue.put(char) | |
| current_index += 1 | |
| await play_pause_event.wait() # Isso é o que pausa ou libera | |
| await asyncio.gather(read_keys(), read_chars()) | |
| await stream_queue.put(PRODUCER_ENDED_HINT) # Manda a última mensagem | |
| return memory_buffer | |
| # Este é um leitor bem simples da async queue onde vai ter as letras do arquivo. | |
| # O trabalho dele é ficar em loop conferindo constantemente se existe alguma | |
| # letra na queue, se tiver ele remove e lê. | |
| async def simple_read_stream( | |
| stream_queue: asyncio.Queue[str], | |
| broadcast: Broadcast, | |
| play_pause_event: asyncio.Event, | |
| ) -> None: | |
| memory_buffer = "" | |
| async with broadcast.subscribe() as messages_queue: | |
| while True: | |
| char = await stream_queue.get() | |
| stream_queue.task_done() | |
| # Se for a última mensagem, não queremos este valor. | |
| if char == PRODUCER_ENDED_HINT: | |
| break | |
| memory_buffer += char | |
| sys.stdout.write(char) | |
| sys.stdout.flush() | |
| if not messages_queue.empty(): | |
| message = await messages_queue.get() | |
| messages_queue.task_done() | |
| if message.value == MessageValues.QUIT: | |
| break | |
| await play_pause_event.wait() | |
| # Isso é a função principal que coloca tudo junto. | |
| async def start_streaming( | |
| stream_queue: asyncio.Queue[str], # Queue das letras do arquivo | |
| keys_queue: asyncio.Queue[str], # Queue das teclas pressionadas | |
| broadcast: Broadcast, # Nosso broadcaster manda mensagem para todo mundo | |
| play_pause_event: asyncio.Event, # O evento para pause e play | |
| keyboard_sound: KeyboardSound, # A classe com os controles de som | |
| *, | |
| # Desative o pynput para você mesmo gerenciar as teclas de atalho. | |
| # Por exemplo: o textual faz isso também, então não precisamos dos dois aqui. | |
| disable_pynput: bool = True, | |
| capture_keys: bool = False, # Captura as teclas pressionadas com pynput | |
| ) -> str: # Retorna tudo o que foi digitado | |
| if not disable_pynput: | |
| # Listener do pynput (ele não é sync, usa thread, mas minha função é sync) | |
| listener = keys_pressed_sync(keys_queue, capture_keys=capture_keys) | |
| listener.start() | |
| # Reunimos as queues | |
| await stream_queue.join() | |
| await keys_queue.join() | |
| # Roda tudo de uma vez | |
| async with asyncio.TaskGroup() as tg: | |
| tg.create_task(keys_queue_consumer(broadcast, keys_queue)) | |
| buffer = tg.create_task(stream_out(stream_queue, broadcast, play_pause_event)) | |
| tg.create_task(simple_read_stream(stream_queue, broadcast, play_pause_event)) | |
| tg.create_task(play_keyboard_sound(broadcast, play_pause_event, keyboard_sound)) | |
| # Fecha o broadcast | |
| await broadcast.close() | |
| # Retorna tudo o que foi digitado | |
| return await buffer | |
| # Peguei essa textarea na doc. Eu só queria desativar qualquer possibilidade | |
| # de teclas pressionadas serem capturadas. Por exemplo: mesmo com read_only, | |
| # o mouse e as setas funcionavam, isso muda o cursor de lugar e faz o stream | |
| # digitar onde o cursor estiver. | |
| class ExtendedTextArea(TextArea): | |
| def on_mount(self) -> None: | |
| # Garantindo que não poderemos clicar, selecionar nem mudar o cursor | |
| # de lugar | |
| res = super().on_mount() | |
| self.can_focus = False | |
| self.can_focus_children = False | |
| self._set_highlight_next_update() | |
| return res | |
| def _set_highlight_next_update(self) -> None: | |
| self._secs_to_next_update = 3 | |
| self._last_update = datetime.now(tz=UTC) | |
| self._next_update = self._last_update + timedelta( | |
| seconds=self._secs_to_next_update | |
| ) | |
| async def _on_mouse_down(self, event: events.MouseDown) -> None: | |
| event.prevent_default() | |
| def edit(self, edit: Edit) -> EditResult: | |
| # Tentando amenizar o uso de memória (ao meu ver pelo tree-sitter) | |
| should_update = datetime.now(tz=UTC) >= self._next_update | |
| result = edit.do(self, record_selection=False) | |
| self.wrapped_document.wrap(self.wrap_width, self.indent_width) | |
| self._refresh_size() | |
| edit.after(self) | |
| if should_update: | |
| self._build_highlight_map() | |
| self._set_highlight_next_update() | |
| return result | |
| # A UI feita com textual | |
| class Main(App): | |
| ENABLE_COMMAND_PALETTE = False | |
| # Teclas de atalho para a UI | |
| BINDINGS: ClassVar[list[BindingType]] = [ | |
| Binding("ctrl+q", "exit", "Exit", priority=True, show=False), | |
| Binding("ctrl+x", "exit", "Exit", priority=True), | |
| Binding("ctrl+s", "start_stream", "Start", show=True, priority=False), | |
| Binding(KeyValues.q.value, "stop_stream", "Stop", priority=False), | |
| Binding(KeyValues.q.value, "clear_editor", "Stop", priority=False), | |
| Binding(KeyValues.p.value, "pause", "Pause", priority=False), | |
| Binding(KeyValues.c.value, "continue", "Continue", priority=False), | |
| Binding(KeyValues.m.value, "toggle_mute", "Toggle Mute", priority=False), | |
| Binding(KeyValues.v.value, "decrease_volume", "- vol", priority=False), | |
| Binding(KeyValues.V.value, "increase_volume", "+ vol", priority=False), | |
| ] | |
| CSS = """ | |
| #editor { | |
| border: none; | |
| padding: 1 1 2 1; | |
| background: rgba(0, 0, 0, 0); | |
| } | |
| """ | |
| def compose(self) -> ComposeResult: | |
| self.editor = ExtendedTextArea( | |
| "", | |
| id="editor", | |
| language=LEXER, | |
| tab_behavior="indent", | |
| show_line_numbers=True, | |
| line_number_start=1, | |
| max_checkpoints=50, | |
| compact=True, | |
| highlight_cursor_line=True, | |
| read_only=True, | |
| theme="dracula", | |
| ) | |
| yield self.editor | |
| def on_mount(self) -> None: | |
| self._stream_state: StreamStates = StreamStates.IDLE | |
| self.theme = "tokyo-night" | |
| self.stream_queue = asyncio.Queue() | |
| self.keys_queue = asyncio.Queue() | |
| self.play_pause_event = asyncio.Event() | |
| self.broadcast = Broadcast() | |
| self.keyboard_sound = KeyboardSound( | |
| volume=SOUND_VOLUME, sound_path=KEY_SOUND, mute=False | |
| ) | |
| self.editor.styles.set_rule("background", None) | |
| @work | |
| async def action_continue(self) -> None: | |
| # Tecla: c | |
| await self.keys_queue.put(KeyValues.c.value) | |
| if self._stream_state in {StreamStates.IDLE, StreamStates.STOPPED}: | |
| self.clear_notifications() | |
| self.notify( | |
| "Stream is not running ☠️", | |
| title="Stream", | |
| severity="warning", | |
| ) | |
| return | |
| self.editor._build_highlight_map() # noqa: SLF001 | |
| @work | |
| async def action_pause(self) -> None: | |
| # Tecla: p | |
| await self.keys_queue.put(KeyValues.p.value) | |
| if self._stream_state in {StreamStates.IDLE, StreamStates.STOPPED}: | |
| self.clear_notifications() | |
| self.notify( | |
| "Nothing streaming 👀", | |
| title="Stream", | |
| severity="warning", | |
| ) | |
| return | |
| self.editor._build_highlight_map() # noqa: SLF001 | |
| @work | |
| async def action_stop_stream(self) -> None: | |
| # Tecla: q | |
| await self.keys_queue.put(KeyValues.q.value) | |
| if self._stream_state in {StreamStates.STOPPED}: | |
| self.clear_notifications() | |
| self.notify( | |
| "Your stream is already stopped 🚨", | |
| title="Stream", | |
| severity="warning", | |
| ) | |
| return | |
| self.editor._build_highlight_map() # noqa: SLF001 | |
| self._stream_state = StreamStates.STOPPED | |
| def action_exit(self) -> None: | |
| # Teclas: ctrl+x | |
| self.exit() | |
| @work | |
| async def action_toggle_mute(self) -> None: | |
| self.clear_notifications() | |
| if self._stream_state not in [StreamStates.RUNNING, StreamStates.PAUSED]: | |
| self.notify("Nothing playing 🔕", title="Volume", severity="warning") | |
| return | |
| # Tecla: m | |
| prev_mute_state = self.keyboard_sound.is_muted | |
| if not prev_mute_state: | |
| self.notify( | |
| "Keyboard sound is muted 🔇", | |
| title="Volume", | |
| severity="information", | |
| ) | |
| else: | |
| self.notify( | |
| "Keyboard sound is unmuted 🔈", | |
| title="Volume", | |
| severity="information", | |
| ) | |
| await self.keys_queue.put(KeyValues.m.value) | |
| @work | |
| async def action_increase_volume(self) -> None: | |
| self.clear_notifications() | |
| if self._stream_state not in [StreamStates.RUNNING, StreamStates.PAUSED]: | |
| self.notify( | |
| "Start a stream to change volume 🙏", title="Volume", severity="warning" | |
| ) | |
| return | |
| # Tecla: V | |
| await self.keys_queue.put(KeyValues.V.value) | |
| self.clear_notifications() | |
| self.notify( | |
| f"Volume {self.keyboard_sound.volume:.0%}", | |
| title="Volume", | |
| severity="information", | |
| ) | |
| @work | |
| async def action_decrease_volume(self) -> None: | |
| self.clear_notifications() | |
| if self._stream_state not in [StreamStates.RUNNING, StreamStates.PAUSED]: | |
| self.notify( | |
| "You need to start a stream 🤣", title="Volume", severity="warning" | |
| ) | |
| return | |
| # Tecla: v | |
| await self.keys_queue.put(KeyValues.v.value) | |
| self.notify( | |
| f"Volume {self.keyboard_sound.volume:.0%}", | |
| title="Volume", | |
| severity="information", | |
| ) | |
| @work(name="start_stream", group="stream", exclusive=True) | |
| async def action_start_stream(self) -> None: | |
| self.editor.clear() | |
| if self._stream_state in {StreamStates.RUNNING, StreamStates.PAUSED}: | |
| self.clear_notifications() | |
| self.notify( | |
| "The is a stream already running ❌", | |
| title="Stream", | |
| severity="error", | |
| ) | |
| return | |
| self.clear_notifications() | |
| self.notify("Streaming 😃", title="Stream", severity="information") | |
| self._stream_state = StreamStates.RUNNING | |
| self.stream_queue = asyncio.Queue() | |
| self.keys_queue = asyncio.Queue() | |
| self.play_pause_event = asyncio.Event() | |
| self.broadcast = Broadcast() | |
| self.run_worker( | |
| start_streaming( | |
| self.stream_queue, | |
| self.keys_queue, | |
| self.broadcast, | |
| self.play_pause_event, | |
| self.keyboard_sound, | |
| disable_pynput=True, # Vamos gerenciar as teclas de atalho | |
| capture_keys=False, # Nem temos pynput aqui | |
| ), | |
| name="stream_producer", | |
| group="stream_running", | |
| ) | |
| self.run_worker( | |
| self.read_stream(), name="stream_consumer", group="stream_running" | |
| ) | |
| async def read_stream(self) -> None: | |
| async with self.broadcast.subscribe() as messages_queue: | |
| while True: | |
| await asyncio.sleep(0) | |
| if not messages_queue.empty(): | |
| message = messages_queue.get_nowait() | |
| messages_queue.task_done() | |
| if message.value == MessageValues.QUIT: | |
| self._stream_state = StreamStates.IDLE | |
| self.clear_notifications() | |
| self.notify( | |
| "Stream stopped ✅", | |
| title="Stream", | |
| severity="information", | |
| ) | |
| self.editor._build_highlight_map() # noqa: SLF001 | |
| break | |
| if message.value == MessageValues.PAUSE: | |
| self._stream_state = StreamStates.PAUSED | |
| self.clear_notifications() | |
| self.notify( | |
| "Stream paused 👌", | |
| title="Stream", | |
| severity="information", | |
| ) | |
| if message.value == MessageValues.UNPAUSE: | |
| self._stream_state = StreamStates.RUNNING | |
| self.clear_notifications() | |
| self.notify( | |
| "Stream playing 😊", | |
| title="Stream", | |
| severity="information", | |
| ) | |
| if not self.stream_queue.empty(): | |
| char = self.stream_queue.get_nowait() | |
| self.editor.insert(char) | |
| self.stream_queue.task_done() | |
| if __name__ == "__main__": | |
| # COM UI | |
| # Roda o app com UI | |
| app = Main() | |
| result = app.run() | |
| # ALERTA | |
| # Não use com e sem UI ao mesmo tempo | |
| # SEM UI | |
| # Estava testando a velocidade. Isso roda sem UI (mais fácil) | |
| # Isso abaixo eram testes, não removi porque ainda vou olhar isso com calma | |
| # import time | |
| # from datetime import UTC, datetime, timedelta | |
| # | |
| # start = time.perf_counter() | |
| # | |
| # memory_buffer = asyncio.run( | |
| # start_streaming( | |
| # asyncio.Queue(), | |
| # asyncio.Queue(), | |
| # Broadcast(), | |
| # asyncio.Event(), | |
| # keyboard_sound=KeyboardSound( | |
| # volume=SOUND_VOLUME, sound_path=KEY_SOUND, mute=False | |
| # ), | |
| # capture_keys=True, | |
| # disable_pynput=False, # Aqui precisarei do pynput para teclas | |
| # ) | |
| # ) | |
| # num_chars = len(memory_buffer) | |
| # num_words = len(RE_SPACE.split(memory_buffer)) | |
| # | |
| # end = time.perf_counter() | |
| # | |
| # elapsed_sec = end - start | |
| # elapsed_min = elapsed_sec / 60 | |
| # | |
| # chars_per_sec = num_chars / elapsed_sec | |
| # chars_per_min = num_chars / elapsed_min | |
| # | |
| # words_per_sec = num_words / elapsed_sec | |
| # words_per_min = num_words / elapsed_min | |
| # | |
| # fake_date = datetime(1, 1, 1, 0, 0, 0, tzinfo=UTC) + timedelta(seconds=elapsed_sec) | |
| # | |
| # print("\n\n") | |
| # print(f"Time elapsed in seconds: {elapsed_sec:.2f}s.") | |
| # print(f"Time elapsed in minutes: {fake_date:%H:%M:%S}.") | |
| # print(f"Characters per second: {chars_per_sec:.0f}") | |
| # print(f"Characters per minute: {chars_per_min:.0f}") | |
| # print(f"Words per second: {words_per_sec:.0f}") | |
| # print(f"Words per minute: {words_per_min:.0f}") | |
| # print(f"Number of words: {num_words:.0f}") | |
| # print(f"Number of characters: {num_chars:.0f}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [project] | |
| name = "react_agent" | |
| version = "0.0.1" | |
| description = "react_agent" | |
| readme = "README.md" | |
| license = "MIT" | |
| authors = [{ name = "Luiz Otávio" }] | |
| requires-python = ">=3.13" | |
| dependencies = [ | |
| "pygame>=2.6.1", | |
| "pynput>=1.8.1", | |
| "textual[syntax]>=6.1.0", | |
| ] | |
| [project.urls] | |
| Homepage = "https://www.otaviomiranda.com.br/" | |
| [tool.ruff] | |
| line-length = 88 | |
| target-version = "py313" | |
| fix = true | |
| show-fixes = true | |
| indent-width = 4 | |
| exclude = [".venv"] | |
| [tool.ruff.lint] | |
| select = ["ALL"] | |
| ignore = [ | |
| "T201", # Checks for print statements, | |
| "COM812", # Checks for the absence of trailing commas. | |
| "INP001", # Checks for packages that are missing an __init__.py file. | |
| "D", # All pydocstyle (D) | |
| "ANN401", # Checks that function arguments are annotated with a more specific type than Any. | |
| "ERA001", # Checks for commented-out Python code. | |
| "A004", # Shadowing Python Builtin | |
| ] | |
| [tool.ruff.lint.per-file-ignores] | |
| "tests/**/*.py" = ["ANN201", "S101"] | |
| [tool.ruff.format] | |
| quote-style = "double" | |
| indent-style = "space" | |
| line-ending = "lf" | |
| [tool.ruff.lint.isort] | |
| known-first-party = ["react_agent"] | |
| [tool.pyright] | |
| typeCheckingMode = "standard" | |
| pythonVersion = "3.13" | |
| include = ["src", "tests"] | |
| exclude = [ | |
| "**/venv", | |
| "**/.venv", | |
| "**/env", | |
| "**/.env", | |
| "**/node_modules", | |
| "**/__pycache__", | |
| ] | |
| venv = ".venv" | |
| venvPath = "." | |
| executionEnvironments = [{ root = "src" }] | |
| [tool.pytest.ini_options] | |
| addopts = "-s --color=yes --tb=short" | |
| pythonpath = ["src"] | |
| testpaths = ["tests"] | |
| [build-system] | |
| requires = ["setuptools", "wheel"] | |
| build-backend = "setuptools.build_meta" | |
| [tool.setuptools.packages.find] | |
| where = ["src"] | |
| [tool.setuptools] | |
| package-dir = { "" = "src" } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment