Skip to content

Instantly share code, notes, and snippets.

@robcowie
Created September 17, 2025 13:17
Show Gist options
  • Save robcowie/e9b6fc1db90de278c33227b2035c37a7 to your computer and use it in GitHub Desktop.
Save robcowie/e9b6fc1db90de278c33227b2035c37a7 to your computer and use it in GitHub Desktop.
Observe messages on a NATS broker on the command line
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "nats-py",
# "rich",
# ]
# ///
"""NATS Message Subscriber.
Connect to a NATS broker and display messages in the console.
"""
import argparse
import asyncio
import json
import sys
from datetime import datetime
from typing import Any
import nats
from nats.errors import ConnectionClosedError, TimeoutError
from rich.console import Console, Group
from rich.panel import Panel
from rich.syntax import Syntax
from rich.text import Text
from rich.rule import Rule
console = Console()
class NATSSubscriber:
def __init__(self, broker_uri: str, subject: str):
self.broker_uri = broker_uri
self.subject = subject
self.nc = None
self.message_count = 0
async def message_handler(self, msg):
"""Handle incoming NATS messages with pretty printing."""
self.message_count += 1
metadata_lines = []
metadata_lines.append(f"[cyan]Subject:[/cyan] {msg.subject}")
# metadata_lines.append(f"[cyan]Message #:[/cyan] {self.message_count}")
metadata_lines.append(
f"[cyan]Timestamp:[/cyan] {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}"
)
if msg.reply:
metadata_lines.append(f"[cyan]Reply To:[/cyan] {msg.reply}")
if msg.headers:
metadata_lines.append(f"[cyan]Headers:[/cyan] {msg.headers}")
metadata_text = Text.from_markup("\n".join(metadata_lines))
data_display = self.format_data(msg.data)
content_items = [
metadata_text,
Rule(style="dim"),
Text("Payload:", style="bold"),
data_display,
]
console.print(
Panel(
Group(*content_items),
title=f"[bold blue]Message #{self.message_count}[/bold blue]",
border_style="blue",
expand=False,
)
)
console.print()
def format_data(self, data: bytes) -> Any:
"""Format message data for display."""
try:
decoded = data.decode("utf-8")
try:
json_data = json.loads(decoded)
return Syntax(
json.dumps(json_data, indent=2),
"json",
theme="monokai",
line_numbers=False,
background_color="default",
)
except json.JSONDecodeError:
return Text(decoded, style="yellow")
except UnicodeDecodeError:
hex_data = data.hex()
formatted_hex = " ".join(
hex_data[i : i + 2] for i in range(0, len(hex_data), 2)
)
return Text(
f"[Binary Data - {len(data)} bytes]\n{formatted_hex}", style="red"
)
async def run(self):
"""Run the main loop for the subscriber."""
try:
# Connect
console.print(
f"[cyan]Connecting to NATS broker at {self.broker_uri}...[/cyan]"
)
self.nc = await nats.connect(self.broker_uri)
console.print("[green]Connected successfully![/green]")
# Subscribe
console.print(f"[cyan]Subscribing to subject: {self.subject}[/cyan]")
sub = await self.nc.subscribe(self.subject, cb=self.message_handler)
console.print()
while True:
await asyncio.sleep(1)
except ConnectionClosedError:
console.print("[red]Connection to NATS broker closed![/red]")
except TimeoutError:
console.print("[red]Connection to NATS broker timed out![/red]")
except KeyboardInterrupt:
console.print("\n[yellow]Received interrupt signal...[/yellow]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
finally:
if self.nc:
console.print("[cyan]Closing connection...[/cyan]")
await self.nc.close()
console.print(
f"[green]Connection closed. Received {self.message_count} message(s).[/green]"
)
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --broker-uri nats://localhost:4222 --subject ">"
""",
)
parser.add_argument(
"--broker-uri",
"-b",
type=str,
default="nats://localhost:4222",
help="NATS broker URI",
)
parser.add_argument(
"--subject",
"-s",
type=str,
required=True,
help="NATS subject to subscribe to (supports wildcards: * and >)",
)
parser.add_argument("--version", "-v", action="version", version="%(prog)s 1.0.0")
args = parser.parse_args()
subscriber = NATSSubscriber(args.broker_uri, args.subject)
try:
asyncio.run(subscriber.run())
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
console.print(f"[red]Fatal error: {e}[/red]")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment