Created
September 17, 2025 13:17
-
-
Save robcowie/e9b6fc1db90de278c33227b2035c37a7 to your computer and use it in GitHub Desktop.
Observe messages on a NATS broker on the command line
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [ | |
| # "nats-py", | |
| # "rich", | |
| # ] | |
| # /// | |
| """NATS Message Subscriber. | |
| Connect to a NATS broker and display messages in the console. | |
| """ | |
| import argparse | |
| import asyncio | |
| import json | |
| import sys | |
| from datetime import datetime | |
| from typing import Any | |
| import nats | |
| from nats.errors import ConnectionClosedError, TimeoutError | |
| from rich.console import Console, Group | |
| from rich.panel import Panel | |
| from rich.syntax import Syntax | |
| from rich.text import Text | |
| from rich.rule import Rule | |
| console = Console() | |
| class NATSSubscriber: | |
| def __init__(self, broker_uri: str, subject: str): | |
| self.broker_uri = broker_uri | |
| self.subject = subject | |
| self.nc = None | |
| self.message_count = 0 | |
| async def message_handler(self, msg): | |
| """Handle incoming NATS messages with pretty printing.""" | |
| self.message_count += 1 | |
| metadata_lines = [] | |
| metadata_lines.append(f"[cyan]Subject:[/cyan] {msg.subject}") | |
| # metadata_lines.append(f"[cyan]Message #:[/cyan] {self.message_count}") | |
| metadata_lines.append( | |
| f"[cyan]Timestamp:[/cyan] {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}" | |
| ) | |
| if msg.reply: | |
| metadata_lines.append(f"[cyan]Reply To:[/cyan] {msg.reply}") | |
| if msg.headers: | |
| metadata_lines.append(f"[cyan]Headers:[/cyan] {msg.headers}") | |
| metadata_text = Text.from_markup("\n".join(metadata_lines)) | |
| data_display = self.format_data(msg.data) | |
| content_items = [ | |
| metadata_text, | |
| Rule(style="dim"), | |
| Text("Payload:", style="bold"), | |
| data_display, | |
| ] | |
| console.print( | |
| Panel( | |
| Group(*content_items), | |
| title=f"[bold blue]Message #{self.message_count}[/bold blue]", | |
| border_style="blue", | |
| expand=False, | |
| ) | |
| ) | |
| console.print() | |
| def format_data(self, data: bytes) -> Any: | |
| """Format message data for display.""" | |
| try: | |
| decoded = data.decode("utf-8") | |
| try: | |
| json_data = json.loads(decoded) | |
| return Syntax( | |
| json.dumps(json_data, indent=2), | |
| "json", | |
| theme="monokai", | |
| line_numbers=False, | |
| background_color="default", | |
| ) | |
| except json.JSONDecodeError: | |
| return Text(decoded, style="yellow") | |
| except UnicodeDecodeError: | |
| hex_data = data.hex() | |
| formatted_hex = " ".join( | |
| hex_data[i : i + 2] for i in range(0, len(hex_data), 2) | |
| ) | |
| return Text( | |
| f"[Binary Data - {len(data)} bytes]\n{formatted_hex}", style="red" | |
| ) | |
| async def run(self): | |
| """Run the main loop for the subscriber.""" | |
| try: | |
| # Connect | |
| console.print( | |
| f"[cyan]Connecting to NATS broker at {self.broker_uri}...[/cyan]" | |
| ) | |
| self.nc = await nats.connect(self.broker_uri) | |
| console.print("[green]Connected successfully![/green]") | |
| # Subscribe | |
| console.print(f"[cyan]Subscribing to subject: {self.subject}[/cyan]") | |
| sub = await self.nc.subscribe(self.subject, cb=self.message_handler) | |
| console.print() | |
| while True: | |
| await asyncio.sleep(1) | |
| except ConnectionClosedError: | |
| console.print("[red]Connection to NATS broker closed![/red]") | |
| except TimeoutError: | |
| console.print("[red]Connection to NATS broker timed out![/red]") | |
| except KeyboardInterrupt: | |
| console.print("\n[yellow]Received interrupt signal...[/yellow]") | |
| except Exception as e: | |
| console.print(f"[red]Error: {e}[/red]") | |
| finally: | |
| if self.nc: | |
| console.print("[cyan]Closing connection...[/cyan]") | |
| await self.nc.close() | |
| console.print( | |
| f"[green]Connection closed. Received {self.message_count} message(s).[/green]" | |
| ) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s --broker-uri nats://localhost:4222 --subject ">" | |
| """, | |
| ) | |
| parser.add_argument( | |
| "--broker-uri", | |
| "-b", | |
| type=str, | |
| default="nats://localhost:4222", | |
| help="NATS broker URI", | |
| ) | |
| parser.add_argument( | |
| "--subject", | |
| "-s", | |
| type=str, | |
| required=True, | |
| help="NATS subject to subscribe to (supports wildcards: * and >)", | |
| ) | |
| parser.add_argument("--version", "-v", action="version", version="%(prog)s 1.0.0") | |
| args = parser.parse_args() | |
| subscriber = NATSSubscriber(args.broker_uri, args.subject) | |
| try: | |
| asyncio.run(subscriber.run()) | |
| except KeyboardInterrupt: | |
| sys.exit(0) | |
| except Exception as e: | |
| console.print(f"[red]Fatal error: {e}[/red]") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment