Created
June 2, 2023 18:42
-
-
Save nitori/6f861aabb57e191f47c895b2c9ef99a1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import asyncio | |
| from textual.app import App, ComposeResult | |
| from textual import events | |
| from textual.widgets import Header, Footer, Input, TextLog | |
| class Prompt(Input): | |
| app: 'Terminal' | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self._history = [] | |
| self._history_pointer = None | |
| def on_key(self, event: events.Key) -> None: | |
| if event.key == "enter" and self.value: | |
| self.app.append(self.value) | |
| if not self._history or self._history[-1] != self.value: | |
| self._history.append(self.value) | |
| self._history_pointer = None | |
| self.value = "" | |
| elif event.key == 'up' and self._history: | |
| if self._history_pointer is None: | |
| self._history_pointer = len(self._history) - 1 | |
| else: | |
| self._history_pointer = max(0, self._history_pointer - 1) | |
| self.value = self._history[self._history_pointer] | |
| self.cursor_position = len(self.value) | |
| elif event.key == 'down' and self._history: | |
| if self._history_pointer is None: | |
| return | |
| self._history_pointer = min(len(self._history), self._history_pointer + 1) | |
| if self._history_pointer == len(self._history): | |
| self.value = "" | |
| else: | |
| self.value = self._history[self._history_pointer] | |
| self.cursor_position = len(self.value) | |
| class OutputWidget(TextLog): | |
| pass | |
| class Terminal(App): | |
| textlog: OutputWidget | |
| prompt: Prompt | |
| network_task: asyncio.Task | |
| CSS_PATH = "vertical_layout.css" | |
| BINDINGS = [ | |
| ("d", "toggle_dark", "Toggle dark mode"), | |
| ("q", "quit", "Quit"), | |
| ("escape", "quit", "Quit"), | |
| ] | |
| def compose(self) -> ComposeResult: | |
| """Create child widgets for the app.""" | |
| self.textlog = OutputWidget(classes='output') | |
| self.prompt = Prompt(classes='input') | |
| yield Header() | |
| yield self.textlog | |
| yield self.prompt | |
| yield Footer() | |
| def on_mount(self) -> None: | |
| self.network_task = asyncio.create_task(fake_network_task(self)) | |
| self.network_task.add_done_callback(self.network_task_done) | |
| self.query_one(Prompt).focus() | |
| def network_task_done(self, task: asyncio.Task) -> None: | |
| try: | |
| task.result() | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as e: | |
| self.append(f"Network task failed: {e}. Restarting") | |
| self.network_task = asyncio.create_task(fake_network_task(self)) | |
| def action_toggle_dark(self) -> None: | |
| """An action to toggle dark mode.""" | |
| self.dark = not self.dark | |
| def append(self, text: str): | |
| self.textlog.write(text) | |
| async def fake_network_task(app: Terminal) -> None: | |
| """A fake network task.""" | |
| for _ in range(3): | |
| await asyncio.sleep(1) | |
| app.append("Hello world") | |
| raise RuntimeError("Something went wrong") | |
| async def main(): | |
| app = Terminal() | |
| await app.run_async() | |
| app.network_task.cancel() | |
| try: | |
| await app.network_task | |
| except asyncio.CancelledError: | |
| pass | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
vertical_layout.css