Skip to content

Instantly share code, notes, and snippets.

@equant
Created April 19, 2024 16:40
Show Gist options
  • Save equant/8fcc32c42721e68ceac41da3ada5fa34 to your computer and use it in GitHub Desktop.
Save equant/8fcc32c42721e68ceac41da3ada5fa34 to your computer and use it in GitHub Desktop.
import json
import base64
from pubsub import pub
import meshtastic.serial_interface
from meshtastic import mesh_pb2, storeforward_pb2, paxcount_pb2
from pymongo import MongoClient
from bson import Binary
from google.protobuf.json_format import MessageToJson
interface = meshtastic.serial_interface.SerialInterface()
client = MongoClient('mongodb://localhost:27017/')
db = client['meshtastic']
collection = db['packets']
def convert_bytes(value):
# Deprecated
"""Convert bytes to base64 encoded strings."""
if isinstance(value, bytes):
return base64.b64encode(value).decode('utf-8') # decode as UTF-8 string for JSON compatibility
return value
def prepare_for_mongodb(obj):
"""Recursively prepare data for MongoDB, converting or skipping non-serializable objects."""
if isinstance(obj, dict):
return {k: prepare_for_mongodb(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [prepare_for_mongodb(item) for item in obj]
elif isinstance(obj, bytes):
return Binary(obj) # Convert bytes to BSON Binary for MongoDB
elif hasattr(obj, 'DESCRIPTOR'): # Check if it's a Protobuf message
return json.loads(MessageToJson(obj)) # Convert Protobuf to dictionary
elif isinstance(obj, (str, int, float, bool)):
return obj
else:
return str(obj) # Convert unknown types to string or use `None` to skip
def flatten_decoded(packet):
"""Flatten the 'decoded' dictionary into the top-level dictionary."""
if 'decoded' in packet:
decoded_content = packet.pop('decoded') # Remove and retrieve 'decoded'
if isinstance(decoded_content, dict):
packet.update(decoded_content) # Merge its contents into the top-level dict
return packet
def write_to_db(packet):
try:
flattened_packet = flatten_decoded(packet) # First flatten the packet
packet_ready = prepare_for_mongodb(packet)
collection.insert_one(packet_ready)
print("Saved")
except Exception as e:
print("Failed to insert data:", e)
print(packet)
breakpoint()
def idToHex(nodeId):
return '!' + hex(nodeId)[2:]
def onReceive(packet, interface):
# Print all packets
#print(f"{packet} \n\n")
write_to_db(packet)
pub.subscribe(onReceive, 'meshtastic.receive')
while True:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment