Created
April 19, 2024 16:40
-
-
Save equant/8fcc32c42721e68ceac41da3ada5fa34 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import base64 | |
from pubsub import pub | |
import meshtastic.serial_interface | |
from meshtastic import mesh_pb2, storeforward_pb2, paxcount_pb2 | |
from pymongo import MongoClient | |
from bson import Binary | |
from google.protobuf.json_format import MessageToJson | |
interface = meshtastic.serial_interface.SerialInterface() | |
client = MongoClient('mongodb://localhost:27017/') | |
db = client['meshtastic'] | |
collection = db['packets'] | |
def convert_bytes(value): | |
# Deprecated | |
"""Convert bytes to base64 encoded strings.""" | |
if isinstance(value, bytes): | |
return base64.b64encode(value).decode('utf-8') # decode as UTF-8 string for JSON compatibility | |
return value | |
def prepare_for_mongodb(obj): | |
"""Recursively prepare data for MongoDB, converting or skipping non-serializable objects.""" | |
if isinstance(obj, dict): | |
return {k: prepare_for_mongodb(v) for k, v in obj.items()} | |
elif isinstance(obj, list): | |
return [prepare_for_mongodb(item) for item in obj] | |
elif isinstance(obj, bytes): | |
return Binary(obj) # Convert bytes to BSON Binary for MongoDB | |
elif hasattr(obj, 'DESCRIPTOR'): # Check if it's a Protobuf message | |
return json.loads(MessageToJson(obj)) # Convert Protobuf to dictionary | |
elif isinstance(obj, (str, int, float, bool)): | |
return obj | |
else: | |
return str(obj) # Convert unknown types to string or use `None` to skip | |
def flatten_decoded(packet): | |
"""Flatten the 'decoded' dictionary into the top-level dictionary.""" | |
if 'decoded' in packet: | |
decoded_content = packet.pop('decoded') # Remove and retrieve 'decoded' | |
if isinstance(decoded_content, dict): | |
packet.update(decoded_content) # Merge its contents into the top-level dict | |
return packet | |
def write_to_db(packet): | |
try: | |
flattened_packet = flatten_decoded(packet) # First flatten the packet | |
packet_ready = prepare_for_mongodb(packet) | |
collection.insert_one(packet_ready) | |
print("Saved") | |
except Exception as e: | |
print("Failed to insert data:", e) | |
print(packet) | |
breakpoint() | |
def idToHex(nodeId): | |
return '!' + hex(nodeId)[2:] | |
def onReceive(packet, interface): | |
# Print all packets | |
#print(f"{packet} \n\n") | |
write_to_db(packet) | |
pub.subscribe(onReceive, 'meshtastic.receive') | |
while True: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment