Created
February 17, 2025 13:39
-
-
Save jones2126/88d995dc5b471f0e12031bc23f90a911 to your computer and use it in GitHub Desktop.
Script to read incoming GPS data from SkyTraQ PX1172RDP gnss device and report message types and Hz
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| This program opens a serial connection to a PX1172RDP rover device (on ROVER_PORT at BAUD_RATE), continuously reads incoming | |
| data for a specified duration (RUN_DURATION), identifies message types (e.g., RTCM or various NMEA sentences), counts how many | |
| of each type are received, and then prints a summary report of the message frequencies at the end. It can optionally print the | |
| raw messages and their hexadecimal representations during data collection if PRINT_MESSAGES is enabled. | |
| ''' | |
| import serial | |
| import time | |
| from collections import defaultdict | |
| # --- CONFIGURATION --- | |
| ROVER_PORT = "COM6" | |
| BAUD_RATE = 115200 | |
| RUN_DURATION = 30 # Duration in seconds to collect data | |
| PRINT_MESSAGES = False # Set to True to print each message, False to suppress output | |
| # Dictionary to track message types and counts | |
| message_counts = defaultdict(int) | |
| def identify_message_type(line): | |
| """Identify message type based on the first few characters.""" | |
| if not line: | |
| return "Unknown" | |
| if line.startswith(b"\xD3"): # RTCM messages start with 0xD3 | |
| return "RTCM 3.x" | |
| message_str = line.decode(errors='ignore').strip() | |
| if message_str.startswith("$"): | |
| return message_str.split(',')[0] # Extract NMEA message type (e.g., "$GNGGA") | |
| return "Other Binary" | |
| try: | |
| with serial.Serial(ROVER_PORT, BAUD_RATE, timeout=1) as ser: | |
| print(f"Listening to {ROVER_PORT} (PX1172RDP Rover)...\n") | |
| start_time = time.time() | |
| end_time = start_time + RUN_DURATION | |
| while time.time() < end_time: | |
| line = ser.readline().strip() | |
| if line: | |
| message_type = identify_message_type(line) | |
| message_counts[message_type] += 1 # Increment count for that message type | |
| if PRINT_MESSAGES: | |
| print(f"Type : {message_type}") | |
| print(f" Raw Bytes : {line}") # Print raw binary data | |
| print(f" Hex String : {line.hex()}") # Print hex representation | |
| print("-" * 50) | |
| # Print Summary Report | |
| print("\n--- NMEA Message Summary ---") | |
| for msg_type, count in sorted(message_counts.items()): | |
| message_hz = count / RUN_DURATION if RUN_DURATION > 0 else 0 | |
| print(f"{msg_type}: {message_hz:.2f} Hz ({count} messages)") | |
| print("\n--- Completed NMEA Test ---") | |
| except serial.SerialException as e: | |
| print(f"Error opening {ROVER_PORT}: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample output