Skip to content

Instantly share code, notes, and snippets.

@anvarazizov
Last active February 27, 2026 04:17
Show Gist options
  • Select an option

  • Save anvarazizov/10d25b1eeb19ff937aeacd3db4935cd9 to your computer and use it in GitHub Desktop.

Select an option

Save anvarazizov/10d25b1eeb19ff937aeacd3db4935cd9 to your computer and use it in GitHub Desktop.
listener_public.py
"""Meshtastic LoRa Listener β€” Off-Grid AI with Tool Calling
full repo link: https://github.com/anvarazizov/meshtastic-listener-ollama
Architecture:
phi4-mini = smart router (decides which tool to call)
gemma3:12b = knowledge brain (for general Q&A)
HA API = home automation (sensors, device control)
Radio = LoRa mesh I/O
Flow:
Radio msg β†’ internet check
Online: forward to Discord (cloud AI responds)
Offline: phi4-mini routes β†’ execute tools β†’ respond via radio
"SAY:" prefix β†’ TTS on Home Assistant speaker
"AI:" prefix β†’ always routes to local AI
"status" β†’ Home Assistant sensor readout
"""
import json
import time
import os
import glob
import logging
import threading
import requests
from datetime import datetime
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
# ===== CONFIGURATION (edit these) =====
SERIAL_PORT = "/dev/cu.usbmodem1101" # Your Meshtastic radio serial port
CONFIG_PATH = "config.json" # {"discord_webhook_url": "...", "known_nodes": {...}}
LOG_PATH = "messages.log"
OLLAMA_URL = "http://localhost:11434/api"
ROUTER_MODEL = "phi4-mini" # Lightweight intent classifier
BRAIN_MODEL = "gemma3:12b" # Main AI brain
MAX_RADIO_MSG = 200 # LoRa message size limit
OUTBOX_DIR = "outbox" # Drop .msg files here to send via radio
# Home Assistant config path β€” expects {"ha_token": "your-long-lived-token"}
HA_CONFIG_PATH = "ha_config.json"
HA_URL = "http://homeassistant.local:8123"
# TTS config β€” adjust entity IDs to match your setup
TTS_ENTITY = "tts.google_translate_en_com"
TTS_SPEAKER = "media_player.your_speaker_entity" # Your HA media player
TTS_LANGUAGE = "uk" # Language for TTS
# Known nodes β€” map Meshtastic node IDs to friendly names
KNOWN_NODES = {
# "!abcd1234": "Alice",
# "!efgh5678": "Bob",
}
# Your own node ID (messages from this node are ignored)
MY_NODE_ID = "!00000000" # Replace with your radio node ID
DISCORD_WEBHOOK_URL = None
INTERFACE = None
# ===== TOOL DEFINITIONS (for phi4-mini router) =====
TOOLS = [
{
"type": "function",
"function": {
"name": "get_home_status",
"description": "Get current home sensor data: temperature, humidity, power status, weather, who is home",
"parameters": {"type": "object", "properties": {}}
}
},
{
"type": "function",
"function": {
"name": "control_device",
"description": "Control a home device (lights, switches, plugs). Use entity names like: living_room_lights, bedroom_lights, all_lights",
"parameters": {
"type": "object",
"properties": {
"entity": {"type": "string", "description": "Device name"},
"action": {"type": "string", "enum": ["on", "off", "toggle"]}
},
"required": ["entity", "action"]
}
}
},
{
"type": "function",
"function": {
"name": "answer_question",
"description": "Answer a general knowledge question, have a conversation, or provide information. Use this for anything that is NOT about home devices or sensors.",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string", "description": "The question to answer"}
},
"required": ["question"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "Run a system command on the server (e.g., check disk space, uptime, network status)",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to run"}
},
"required": ["command"]
}
}
}
]
# Safe commands whitelist β€” only these are allowed via radio
SAFE_COMMANDS = ["uptime", "df", "date", "whoami", "hostname", "ollama", "cat", "ps", "top"]
# ===== LOGGING =====
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_PATH),
]
)
logger = logging.getLogger(__name__)
# ===== HELPERS =====
def load_config():
global DISCORD_WEBHOOK_URL
try:
with open(CONFIG_PATH) as f:
config = json.load(f)
DISCORD_WEBHOOK_URL = config.get("discord_webhook_url")
for node_id, name in config.get("known_nodes", {}).items():
KNOWN_NODES[node_id] = name
except FileNotFoundError:
logger.warning("Config not found, using defaults")
def send_discord(message: str):
"""Forward message to Discord via webhook."""
if not DISCORD_WEBHOOK_URL:
return
try:
requests.post(DISCORD_WEBHOOK_URL, json={"content": message}, timeout=10)
except:
pass
def send_radio(text: str, channel_index: int = 0):
"""Send text over LoRa, auto-chunking if needed."""
global INTERFACE
if not INTERFACE:
logger.error("No radio interface")
return
chunks = []
while text:
if len(text) <= MAX_RADIO_MSG:
chunks.append(text)
break
cut = text[:MAX_RADIO_MSG].rfind(' ')
if cut <= 0:
cut = MAX_RADIO_MSG
chunks.append(text[:cut])
text = text[cut:].lstrip()
for i, chunk in enumerate(chunks):
if len(chunks) > 1:
chunk = f"[{i+1}/{len(chunks)}] {chunk}"
try:
INTERFACE.sendText(chunk, channelIndex=channel_index)
logger.info(f"Radio TX ch{channel_index}: {chunk[:80]}...")
if i < len(chunks) - 1:
time.sleep(3) # Pause between chunks to avoid radio congestion
except Exception as e:
logger.error(f"Radio send error: {e}")
def check_internet() -> bool:
"""Quick internet connectivity check."""
try:
requests.get("https://discord.com", timeout=5)
return True
except:
return False
def get_ha_headers():
"""Load Home Assistant auth headers from config."""
try:
cc = json.load(open(HA_CONFIG_PATH))
return {"Authorization": f"Bearer {cc['ha_token']}"}
except:
return None
# ===== TOOL IMPLEMENTATIONS =====
def tool_get_home_status() -> str:
"""Read sensors from Home Assistant."""
headers = get_ha_headers()
if not headers:
return "HA config not found"
# Map your HA entity IDs to friendly labels
sensors = {
# "sensor.living_room_temperature": "Temp",
# "sensor.living_room_humidity": "Humidity",
# "person.your_name": "You",
# "weather.forecast_home": "Weather",
}
parts = []
for entity_id, label in sensors.items():
try:
r = requests.get(f"{HA_URL}/api/states/{entity_id}",
headers=headers, timeout=5)
if r.status_code == 200:
state = r.json().get("state", "?")
parts.append(f"{label}: {state}")
except:
pass
return " | ".join(parts) if parts else "HA unreachable"
def tool_control_device(entity: str, action: str) -> str:
"""Control a Home Assistant device."""
headers = get_ha_headers()
if not headers:
return "HA config not found"
# Map friendly names to HA entity IDs β€” customize for your setup
entity_map = {
"living_room_lights": "light.living_room",
"bedroom_lights": "light.bedroom",
"kitchen_lights": "light.kitchen",
"all_lights": "light.all",
}
ha_entity = entity_map.get(entity.lower().replace(" ", "_"), entity)
domain = ha_entity.split(".")[0] if "." in ha_entity else "light"
service = f"turn_{action}" if action in ("on", "off") else "toggle"
try:
r = requests.post(
f"{HA_URL}/api/services/{domain}/{service}",
headers=headers,
json={"entity_id": ha_entity},
timeout=10
)
if r.status_code == 200:
return f"Done: {entity} β†’ {action}"
else:
return f"HA error: {r.status_code}"
except Exception as e:
return f"HA error: {str(e)[:80]}"
def tool_answer_question(question: str) -> str:
"""Use the brain model (gemma3:12b) for general knowledge answers."""
try:
resp = requests.post(f"{OLLAMA_URL}/generate", json={
"model": BRAIN_MODEL,
"prompt": question,
"system": "You are a helpful AI responding via LoRa radio. Keep answers SHORT (under 350 chars). No markdown. Be direct.",
"stream": False,
"options": {"temperature": 0.7, "num_predict": 200}
}, timeout=60)
if resp.status_code == 200:
return resp.json().get("response", "").strip() or "No response"
return f"AI error: HTTP {resp.status_code}"
except requests.exceptions.Timeout:
return "AI timeout"
except Exception as e:
return f"AI error: {str(e)[:80]}"
def tool_run_command(command: str) -> str:
"""Run safe system commands only."""
import subprocess
cmd_base = command.split("|")[0].strip().split()[0] if command else ""
if cmd_base not in SAFE_COMMANDS:
return f"Command not allowed: {cmd_base}. Safe: {', '.join(SAFE_COMMANDS)}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)
output = result.stdout.strip()[:300]
return output if output else "No output"
except Exception as e:
return f"Error: {str(e)[:80]}"
# ===== ROUTING ENGINE =====
def route_with_phi(user_message: str) -> str:
"""Use phi4-mini to decide which tool to call, then execute it."""
try:
resp = requests.post(f"{OLLAMA_URL}/chat", json={
"model": ROUTER_MODEL,
"messages": [{"role": "user", "content": user_message}],
"tools": TOOLS,
"stream": False
}, timeout=30)
if resp.status_code != 200:
return tool_answer_question(user_message)
msg = resp.json().get("message", {})
tool_calls = msg.get("tool_calls", [])
content = msg.get("content", "")
if not tool_calls:
if content:
return content[:400]
return tool_answer_question(user_message)
results = []
for tc in tool_calls:
func = tc.get("function", {})
name = func.get("name", "")
args = func.get("arguments", {})
logger.info(f"Tool call: {name}({json.dumps(args)})")
if name == "get_home_status":
results.append(tool_get_home_status())
elif name == "control_device":
results.append(tool_control_device(
args.get("entity", "unknown"),
args.get("action", "off")
))
elif name == "answer_question":
results.append(tool_answer_question(
args.get("question", user_message)
))
elif name == "run_command":
results.append(tool_run_command(
args.get("command", "uptime")
))
else:
results.append(f"Unknown tool: {name}")
return " | ".join(results)
except Exception as e:
logger.error(f"Router error: {e}")
return tool_answer_question(user_message)
# ===== MESSAGE HANDLERS =====
def handle_message(sender_id: str, sender_name: str, text: str, channel: int):
"""Main message handler with internet-aware routing."""
text_lower = text.strip().lower()
# === SAY: prefix β€” TTS on Home Assistant speaker ===
if text_lower.startswith("say:") or text_lower.startswith("say "):
msg = text[4:].strip() if text_lower.startswith("say:") else text[3:].strip()
if not msg:
send_radio("Usage: SAY: <message to read aloud>", channel_index=channel)
return
logger.info(f"TTS request from {sender_name}: {msg}")
send_discord(f"πŸ”Š **TTS [{sender_name}]:** {msg}")
try:
headers = get_ha_headers()
if headers:
requests.post(
f"{HA_URL}/api/services/tts/speak",
headers=headers,
json={
"entity_id": TTS_ENTITY,
"media_player_entity_id": TTS_SPEAKER,
"message": msg,
"language": TTS_LANGUAGE
},
timeout=10
)
send_radio("Done, message read aloud", channel_index=channel)
else:
send_radio("HA config not found", channel_index=channel)
except Exception as e:
send_radio(f"TTS error: {str(e)[:80]}", channel_index=channel)
return
# === AI: prefix β€” always local AI ===
if text_lower.startswith("ai:") or text_lower.startswith("ai "):
prompt = text[3:].strip() if text_lower.startswith("ai:") else text[2:].strip()
if not prompt:
send_radio("Usage: AI: <your question>", channel_index=channel)
return
logger.info(f"AI request from {sender_name}: {prompt}")
send_discord(f"🧠 **AI query [{sender_name}]:** {prompt}")
result = route_with_phi(prompt)
send_radio(result, channel_index=channel)
send_discord(f"πŸ€– **AI β†’ radio:** {result[:500]}")
return
# === status β€” Home Assistant readout ===
if text_lower in ("status", "статус", "ha", "home"):
status = tool_get_home_status()
send_radio(f"HA: {status}", channel_index=channel)
send_discord(f"πŸ“Š **Status [{sender_name}]:** {status}")
return
# === Regular message β€” check internet ===
has_internet = check_internet()
if has_internet:
# Online: forward to Discord for cloud AI
send_discord(f"πŸ“‘ **LoRa [{sender_name}]:** {text}")
logger.info("Online β€” forwarded to Discord")
else:
# Offline: route everything through local AI
logger.info(f"OFFLINE β€” routing to local AI: {text}")
send_radio("(offline mode)", channel_index=channel)
result = route_with_phi(text)
send_radio(result, channel_index=channel)
logger.info(f"Offline response: {result[:100]}")
# ===== RADIO EVENT HANDLERS =====
def on_receive(packet, interface):
try:
if 'decoded' not in packet or 'text' not in packet['decoded']:
return
sender_id = packet.get('fromId', 'unknown')
sender_name = KNOWN_NODES.get(sender_id, sender_id)
text = packet['decoded']['text']
channel = packet.get('channel', 0)
snr = packet.get('rxSnr')
rssi = packet.get('rxRssi')
# Ignore our own messages
if sender_id == MY_NODE_ID:
return
signal = f" [SNR:{snr}dB RSSI:{rssi}dBm]" if snr else ""
logger.info(f"[{sender_name}] ch{channel}: {text}{signal}")
# Handle in a thread to not block radio reception
thread = threading.Thread(target=handle_message,
args=(sender_id, sender_name, text, channel))
thread.daemon = True
thread.start()
except Exception as e:
logger.error(f"Error: {e}")
def check_outbox():
"""Check outbox directory for .msg files to send via radio."""
if not os.path.isdir(OUTBOX_DIR):
return
for filepath in sorted(glob.glob(os.path.join(OUTBOX_DIR, "*.msg"))):
try:
with open(filepath, 'r') as f:
content = f.read().strip()
if not content:
os.remove(filepath)
continue
# Optional first line: "channel:N" to specify radio channel
channel = 0
lines = content.split('\n', 1)
if lines[0].startswith('channel:'):
channel = int(lines[0].split(':')[1].strip())
content = lines[1] if len(lines) > 1 else ''
if content:
logger.info(f"Outbox sending: {content[:80]}...")
send_radio(content, channel_index=channel)
send_discord(f"πŸ“‘ **Radio TX (outbox):** {content[:500]}")
os.remove(filepath)
except Exception as e:
logger.error(f"Outbox error {filepath}: {e}")
def on_connection(interface, topic=pub.AUTO_TOPIC):
logger.info("Connected to Meshtastic device")
def on_disconnect(interface, topic=pub.AUTO_TOPIC):
logger.warning("Disconnected β€” will reconnect...")
# ===== MAIN LOOP =====
WATCHDOG_INTERVAL = 300 # 5 min β€” check interface health
def main():
global INTERFACE
load_config()
logger.info("=" * 50)
logger.info("Meshtastic Off-Grid AI Listener")
logger.info(f"Router: {ROUTER_MODEL} | Brain: {BRAIN_MODEL}")
logger.info(f"Serial: {SERIAL_PORT}")
logger.info(f"Tools: {[t['function']['name'] for t in TOOLS]}")
logger.info("=" * 50)
pub.subscribe(on_receive, "meshtastic.receive.text")
pub.subscribe(on_connection, "meshtastic.connection.established")
pub.subscribe(on_disconnect, "meshtastic.connection.lost")
while True:
try:
INTERFACE = meshtastic.serial_interface.SerialInterface(SERIAL_PORT)
logger.info("Listening...")
os.makedirs(OUTBOX_DIR, exist_ok=True)
logger.info(f"Outbox: {OUTBOX_DIR}")
watchdog_counter = 0
while True:
check_outbox()
time.sleep(2)
watchdog_counter += 2
if watchdog_counter >= WATCHDOG_INTERVAL:
watchdog_counter = 0
try:
if INTERFACE and INTERFACE.myInfo:
logger.debug("Watchdog: interface healthy")
else:
logger.warning("Watchdog: interface stale, reconnecting...")
raise Exception("Stale interface")
except Exception as e:
logger.warning(f"Watchdog triggered reconnect: {e}")
try: INTERFACE.close()
except: pass
INTERFACE = None
break
except KeyboardInterrupt:
try: INTERFACE.close()
except: pass
break
except Exception as e:
logger.error(f"Connection error: {e}")
INTERFACE = None
time.sleep(10)
if __name__ == "__main__":
main()
@anvarazizov
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment