Created
October 28, 2025 23:37
-
-
Save jeffmylife/469377fb66b0783e88c95f35eab252bc to your computer and use it in GitHub Desktop.
Talk to n8n AI agent with stremaing through the CLI via python uv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = ["httpx>=0.27", "rich>=13.7", "typer>=0.12"] | |
| # /// | |
| """ | |
| n8n-chat.py | |
| Interactive CLI for n8n Chat Trigger webhooks. | |
| - Keeps a sticky sessionId across messages | |
| - Streams newline-delimited JSON events: {"type":"begin"/"item"/"end"...} | |
| - Pretty output with Rich | |
| - Slash-commands: /quit, /new <id>, /session, /save <path>, /help | |
| Usage: | |
| export N8N_CHAT_URL='https://<your>.n8n.cloud/webhook/<id>/chat' | |
| uv run n8n-chat.py --session demo-abc123 | |
| One-shot: | |
| uv run n8n-chat.py -s demo-abc123 -m "Hello world" | |
| Then type messages at the prompt. Ctrl+C or /quit to exit. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from typing import Optional | |
| import httpx | |
| import typer | |
| from rich.console import Console | |
| from rich.live import Live | |
| from rich.panel import Panel | |
| from rich.spinner import Spinner | |
| from rich.text import Text | |
| app = typer.Typer(add_completion=False) | |
| console = Console() | |
| DEFAULT_URL = os.environ.get("N8N_CHAT_URL", "").strip() | |
| def stream_chat( | |
| url: str, | |
| session_id: str, | |
| message: str, | |
| timeout: float = 60.0, | |
| show_events: bool = False, | |
| ) -> str: | |
| """Send a message and stream back tokens. Returns the final assistant text.""" | |
| payload = {"sessionId": session_id, "chatInput": message} | |
| out_text = Text() | |
| status = Spinner("dots", text="Waiting for reply...") | |
| with Live( | |
| Panel(status, title="Assistant", border_style="cyan"), | |
| console=console, | |
| refresh_per_second=18, | |
| ) as live: | |
| try: | |
| with httpx.stream( | |
| "POST", | |
| url, | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| timeout=timeout, | |
| ) as resp: | |
| if resp.status_code >= 400: | |
| console.print(f"[red]HTTP {resp.status_code}[/red] from n8n") | |
| # Try to show body if present | |
| try: | |
| body = resp.read().decode("utf-8", errors="ignore") | |
| if body: | |
| console.print(body) | |
| except Exception: | |
| pass | |
| return "" | |
| buffer = b"" | |
| saw_item = False | |
| for chunk in resp.iter_bytes(): | |
| if not chunk: | |
| continue | |
| buffer += chunk | |
| while b"\n" in buffer: | |
| line, buffer = buffer.split(b"\n", 1) | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| evt = json.loads(line.decode("utf-8", errors="ignore")) | |
| except json.JSONDecodeError: | |
| # Wait for more bytes | |
| buffer = line + b"\n" + buffer | |
| break | |
| if show_events: | |
| console.stderr.write(f"{evt}\n") | |
| etype = evt.get("type") | |
| if etype == "begin": | |
| live.update( | |
| Panel(status, title="Assistant", border_style="cyan") | |
| ) | |
| elif etype == "item": | |
| content = evt.get("content", "") | |
| if content: | |
| out_text.append(content) | |
| live.update( | |
| Panel( | |
| out_text, title="Assistant", border_style="cyan" | |
| ) | |
| ) | |
| saw_item = True | |
| elif etype in ("end", "final", "done"): | |
| live.update( | |
| Panel( | |
| out_text if saw_item else status, | |
| title="Assistant", | |
| border_style="cyan", | |
| ) | |
| ) | |
| elif etype == "error": | |
| msg = evt.get("message") or "Unknown error" | |
| out_text.append(f"\n[red]Error:[/red] {msg}\n") | |
| live.update( | |
| Panel(out_text, title="Assistant", border_style="red") | |
| ) | |
| else: | |
| # ignore unknown event types | |
| pass | |
| if not saw_item and not out_text.plain: | |
| out_text.append( | |
| "[dim]No streamed tokens received. Check workflow output.[/dim]\n" | |
| ) | |
| live.update( | |
| Panel(out_text, title="Assistant", border_style="yellow") | |
| ) | |
| except httpx.TimeoutException: | |
| console.print("[red]Request timed out.[/red] Try --timeout higher.") | |
| return "" | |
| except httpx.HTTPError as e: | |
| console.print(f"[red]HTTP error:[/red] {e}") | |
| return "" | |
| return out_text.plain | |
| def header(url: str, session_id: str, first_user_msg: Optional[str] = None) -> Panel: | |
| t = Text() | |
| t.append("n8n chat session\n", style="bold") | |
| t.append(f"URL: {url}\n", style="dim") | |
| t.append(f"Session: {session_id}\n", style="dim") | |
| if first_user_msg: | |
| t.append("\nYou: ", style="italic").append(first_user_msg, style="italic") | |
| return Panel(t, border_style="green") | |
| def save_transcript(path: Path, convo: list[tuple[str, str]]) -> None: | |
| """ | |
| Save conversation as a simple text log: | |
| [ | |
| ("user", "..."), | |
| ("assistant", "...") | |
| ] | |
| """ | |
| lines: list[str] = [] | |
| for role, content in convo: | |
| lines.append(f"{role.upper()}: {content}") | |
| lines.append("") # blank line | |
| path.write_text("\n".join(lines), encoding="utf-8") | |
| @app.command() | |
| def chat( | |
| session: str = typer.Option( | |
| ..., "--session", "-s", help="Sticky sessionId to use." | |
| ), | |
| url: str = typer.Option( | |
| DEFAULT_URL, "--url", "-u", help="n8n Chat Trigger URL (or env N8N_CHAT_URL)." | |
| ), | |
| message: Optional[str] = typer.Option( | |
| None, "--message", "-m", help="Send one message, then drop into REPL." | |
| ), | |
| timeout: float = typer.Option(60.0, help="HTTP timeout seconds."), | |
| show_events: bool = typer.Option( | |
| False, "--show-events", help="Print raw event JSON lines to stderr." | |
| ), | |
| ): | |
| if not url: | |
| console.print("[red]Missing URL.[/red] Set --url or N8N_CHAT_URL.") | |
| raise typer.Exit(code=2) | |
| convo: list[tuple[str, str]] = [] # (role, content) | |
| console.print(header(url, session, message)) | |
| # Optional first turn | |
| if message: | |
| convo.append(("user", message)) | |
| reply = stream_chat( | |
| url, session, message, timeout=timeout, show_events=show_events | |
| ) | |
| if reply: | |
| convo.append(("assistant", reply)) | |
| # REPL loop | |
| console.print( | |
| "[dim]Type your message. Commands: /quit, /new <id>, /session, /save <path>, /help[/dim]" | |
| ) | |
| try: | |
| while True: | |
| user = console.input("[bold]You>[/bold] ").strip() | |
| if not user: | |
| continue | |
| # Commands | |
| if user in ("/quit", "/exit"): | |
| break | |
| if user.startswith("/new "): | |
| new_id = user.split(" ", 1)[1].strip() | |
| if new_id: | |
| session = new_id | |
| console.print(f"[green]Session switched to[/green] {session}") | |
| continue | |
| console.print("[yellow]Usage:[/yellow] /new <sessionId>") | |
| continue | |
| if user == "/session": | |
| console.print(f"Current session: [bold]{session}[/bold]") | |
| continue | |
| if user.startswith("/save"): | |
| arg = user.split(" ", 1)[1].strip() if " " in user else "transcript.txt" | |
| path = Path(arg).expanduser().resolve() | |
| save_transcript(path, convo) | |
| console.print(f"[green]Saved transcript to[/green] {path}") | |
| continue | |
| if user == "/help": | |
| console.print("[dim]/quit, /new <id>, /session, /save <path>[/dim]") | |
| continue | |
| # Regular turn | |
| convo.append(("user", user)) | |
| reply = stream_chat( | |
| url, session, user, timeout=timeout, show_events=show_events | |
| ) | |
| if reply: | |
| convo.append(("assistant", reply)) | |
| except KeyboardInterrupt: | |
| console.print("\n[dim]Bye.[/dim]") | |
| if __name__ == "__main__": | |
| app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment