Skip to content

Instantly share code, notes, and snippets.

@jones2126
Last active May 1, 2025 13:26
Show Gist options
  • Save jones2126/c4296ad9ce07ecba6ef4a1d4382bd4b7 to your computer and use it in GitHub Desktop.
Save jones2126/c4296ad9ce07ecba6ef4a1d4382bd4b7 to your computer and use it in GitHub Desktop.
GPS RTK Correction Data Using RTK2GO Mount Point

Overview

This script connects to a mount point on RTK2GO NTRIP caster to fetch RTCM correction data and forwards it to a GPS receiver connected via a serial port (COM6). It supports NTRIP v1 protocol, parses NMEA GPGGA sentences. For debugging it logs RTCM data.

Python Library Dependencies

  • socket, base64, serial, serial.tools.list_ports, time, threading, traceback, sys, binascii

How to Use

The script uses the following constants, defined at the top:

  • NTRIP_SERVER: "rtk2go.com" – The NTRIP caster hostname.
  • NTRIP_PORT: 2101 – Standard NTRIP port.
  • MOUNTPOINT: "cmuairlab01" – The mountpoint to connect to. Update based on your location. I'm near Pittsburgh.
  • USERNAME: "[email protected]" – User email for authentication.
  • PASSWORD: "none" – Password (RTK2GO typically uses "none").
  • SERIAL_PORT: "COM6" – Serial port for GPS communication (confirmed working).
  • BAUDRATE: 115200 – Baud rate for serial communication.

Test Mode (--test)

When you run the script with the --test flag (i.e., python fetch_rtk2go.py --test), it enables the test_server_connection() function, which is designed to troubleshoot connectivity issues with the RTK2GO NTRIP caster. The following features are enabled in this mode:

  • Connectivity Testing: Performs three distinct tests to diagnose connection problems:
    • Source Table Check:
      • Connects to rtk2go.com:2101 and sends a GET / HTTP/1.0 request to retrieve the source table.
      • Prints the first 200 characters of the response and checks for "200 OK", "SOURCETABLE 200 OK", or "ICY 200 OK" to confirm a successful connection.
      • Indicates success (✓) or failure (✗) with an error message if the connection fails.
    • Authentication Test:
      • Attempts to connect to rtk2go.com:2101 using the provided USERNAME (email) and PASSWORD ("none").
      • Sends NTRIP v1 headers with Basic Authentication (username:password encoded in Base64).
      • Prints the response and checks for "200 OK", "SOURCETABLE 200 OK", or "ICY 200 OK" to verify authentication.
      • Indicates success (✓) or failure (✗) with an error message.
    • Server Status Check:
      • Sends a GET /SNIP::STATUS HTTP/1.0 request to check the server’s status.
      • Prints the response (first 200 characters) and logs any errors if the request fails.
      • Indicates failure (✗) with an error message if the connection fails.

Key Points:

  • The --test mode does not fetch RTCM data or interact with the GPS serial port. It focuses solely on diagnosing network and authentication issues with the RTK2GO caster.
  • Each test uses a 5-second socket timeout to avoid hanging.
  • The script closes the socket after each test to ensure clean connections.

This mode is useful for verifying server availability, authentication credentials, and network connectivity before attempting to fetch RTCM corrections in the normal mode.

import socket
import base64
import serial
import serial.tools.list_ports
import time
import threading
import traceback
import sys
import binascii
# NTRIP server details
NTRIP_SERVER = "rtk2go.com"
NTRIP_PORT = 2101
MOUNTPOINT = "cmuairlab01"
USERNAME = "[email protected]"
PASSWORD = "none"
# Serial port details - COM6 is confirmed working
SERIAL_PORT = "COM6" # Set to the known working port
BAUDRATE = 115200
# Global variables for RTK status
latest_gps_data = {
"fix_type": None,
"satellites": None,
"hdop": None,
"altitude": None,
"last_report_time": 0
}
def read_nmea(gps_serial):
"""Read NMEA sentences from the GPS and parse them."""
while True:
try:
line = gps_serial.readline().decode('ascii', errors='ignore').strip()
if line.startswith("$GPGGA"):
# Just log the received GPGGA message
print(f"Received NMEA: {line}")
except Exception as e:
print(f"Error reading NMEA: {e}")
time.sleep(1)
def report_rtk_status():
"""Report RTK status every 5 seconds."""
while True:
time.sleep(5)
# Just report that we're receiving RTCM data
print("RTK status: Receiving RTCM corrections data")
def save_rtcm_data(data, count=0):
"""Save RTCM data to a file."""
filename = f"rtcm_data_{count}.bin"
try:
with open(filename, 'wb') as f:
f.write(data)
print(f"Saved {len(data)} bytes of RTCM data to {filename}")
except Exception as e:
print(f"Error saving RTCM data: {e}")
def report_rtk_status():
"""Report RTK status every 5 seconds."""
while True:
time.sleep(5)
fix_type = latest_gps_data["fix_type"]
satellites = latest_gps_data["satellites"]
hdop = latest_gps_data["hdop"]
altitude = latest_gps_data["altitude"]
if fix_type is None:
print("No GPGGA data received yet")
continue
fix_status = {
0: "No Fix",
1: "GPS Fix",
4: "RTK Fixed",
5: "RTK Float"
}.get(fix_type, f"Unknown Fix Type ({fix_type})")
print(f"RTK Status: {fix_status}, Satellites: {satellites}, HDOP: {hdop}, Altitude: {altitude}m")
def read_nmea(gps_serial):
"""Read NMEA sentences from the GPS and parse them."""
while True:
try:
line = gps_serial.readline().decode('ascii', errors='ignore').strip()
if line.startswith("$GPGGA"):
parse_gpgga(line)
except Exception as e:
print(f"Error reading NMEA: {e}")
time.sleep(1)
def fetch_rtcm():
"""Fetch RTCM data from NTRIP server and send to GPS."""
# Check source table - but assume mountpoint is available
print("Verifying mountpoint availability...")
check_source_table()
max_retries = 3
retry_count = 0
while retry_count < max_retries:
client = None
try:
print("Waiting 2 seconds before connecting to avoid rapid reconnections...")
time.sleep(2)
# Create socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(10)
print(f"Attempting to connect to {NTRIP_SERVER}:{NTRIP_PORT}...")
client.connect((NTRIP_SERVER, NTRIP_PORT))
print(f"Connected to {NTRIP_SERVER}:{NTRIP_PORT}")
# Prepare authentication
auth_string = f"{USERNAME}:{PASSWORD}"
auth = base64.b64encode(auth_string.encode()).decode()
# Use NTRIP v1 protocol (most compatible)
print("Using NTRIP v1 protocol...")
headers = (
f"GET /{MOUNTPOINT} HTTP/1.0\r\n"
f"User-Agent: NTRIP GNSSViewer/1.0\r\n"
f"Authorization: Basic {auth}\r\n"
f"\r\n"
)
def save_rtcm_data(data, count=0):
"""Save RTCM data to a file instead of printing it."""
filename = f"rtcm_data_{count}.bin"
try:
with open(filename, 'wb') as f:
f.write(data)
print(f"Saved {len(data)} bytes of RTCM data to {filename}")
# Print just a sample of the data as hex
print(f"Sample RTCM data (first 32 bytes):")
for line in hexdump(data[:32]):
print(line)
except Exception as e:
print(f"Error saving RTCM data: {e}")
print("Sending NTRIP request headers...")
client.send(headers.encode())
# Receive and process initial response (just the header)
response_buffer = b""
while b"\r\n\r\n" not in response_buffer and len(response_buffer) < 4096:
try:
chunk = client.recv(1024)
if not chunk:
break
response_buffer += chunk
except socket.timeout:
break
# Split header from any data that might be included
header_end = response_buffer.find(b"\r\n\r\n")
if header_end >= 0:
header = response_buffer[:header_end+4]
initial_data = response_buffer[header_end+4:]
header_text = header.decode('utf-8', errors='ignore')
print("Server response:", header_text)
else:
header_text = response_buffer.decode('utf-8', errors='ignore')
print("Server response:", header_text)
initial_data = b""
# Check for successful response
if "200 OK" not in header_text:
print("Failed to connect to NTRIP server")
print(f"Response: {header_text}")
# Provide specific error feedback
if "401" in header_text:
print("Error: Authentication failed. Check username and password.")
elif "403" in header_text:
print("Error: Access forbidden. No permission for this mountpoint.")
elif "404" in header_text:
print("Error: Mountpoint not found. Check mountpoint name.")
else:
print("Error: Unknown error. Check connection settings.")
retry_count += 1
if retry_count < max_retries:
print(f"Retrying ({retry_count}/{max_retries})...")
time.sleep(5)
continue
print("NTRIP server connected successfully")
# Sample GPGGA message
gpgga = "$GPGGA,092750.000,5321.6802,N,00630.3372,W,1,8,1.03,61.7,M,55.2,M,,*76\r\n"
# Open serial port
try:
gps_serial = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1)
print(f"Opened serial port {SERIAL_PORT} at {BAUDRATE} baud")
# Start NMEA reading in a separate thread
nmea_thread = threading.Thread(target=read_nmea, args=(gps_serial,), daemon=True)
nmea_thread.start()
# Start status reporting in a separate thread
report_thread = threading.Thread(target=report_rtk_status, daemon=True)
report_thread.start()
# Send initial data if any
if initial_data:
gps_serial.write(initial_data)
print(f"Sent {len(initial_data)} bytes of RTCM data to GPS")
except Exception as e:
print(f"Error opening serial port: {e}")
print("Please check your GPS connection and port settings.")
break
# Send initial position to NTRIP server
client.send(gpgga.encode())
print(f"Sent initial position data to NTRIP server")
last_gpgga_time = time.time()
data_received = 0
# Main data receiving loop
while True:
# Send GPGGA update every 10 seconds
current_time = time.time()
if current_time - last_gpgga_time >= 10:
client.send(gpgga.encode())
print(f"Sent position update to NTRIP server")
last_gpgga_time = current_time
# Receive RTCM data with a short timeout
client.settimeout(1.0)
try:
data = client.recv(1024)
if not data:
print("Connection closed by server")
break
# Send to GPS and count bytes
gps_serial.write(data)
data_received += len(data)
print(f"Sent {len(data)} bytes of RTCM data to GPS (Total: {data_received} bytes)")
except socket.timeout:
# Timeout is normal, just continue
continue
except Exception as e:
print(f"Error receiving data: {e}")
break
print("Sending NTRIP request headers:")
print(headers)
client.send(headers.encode())
# Receive and process initial response
response_buffer = b""
while b"\r\n\r\n" not in response_buffer and len(response_buffer) < 4096:
try:
chunk = client.recv(1024)
if not chunk:
break
response_buffer += chunk
except socket.timeout:
break
# Split header from any data that might be included
header_end = response_buffer.find(b"\r\n\r\n")
if header_end >= 0:
header = response_buffer[:header_end+4]
initial_data = response_buffer[header_end+4:]
header_text = header.decode('utf-8', errors='ignore')
print("Server response:", header_text)
else:
header_text = response_buffer.decode('utf-8', errors='ignore')
print("Server response:", header_text)
initial_data = b""
# Check for successful response (either ICY 200 OK or HTTP/1.0 200 OK)
if "200 OK" not in header_text:
print("Failed to connect to NTRIP server")
print(f"Response: {header_text}")
# Parse error code to provide better feedback
if "401" in header_text:
print("Error: Authentication failed. Check your username and password.")
print("For RTK2GO, use your email as username and 'none' as password.")
elif "403" in header_text:
print("Error: Access forbidden. You may not have permission to access this mountpoint.")
elif "404" in header_text:
print("Error: Mountpoint not found. Check if the mountpoint name is correct.")
else:
print("Error: Unknown error. Please check connection settings.")
# Increase retry count and wait before retrying
retry_count += 1
if retry_count < max_retries:
print(f"Retrying ({retry_count}/{max_retries})...")
time.sleep(5) # Longer wait between retries
continue
print("NTRIP server connected successfully")
# Create a GPGGA message to send to the server
# This is a sample GPGGA message - replace with your actual coordinates
gpgga = "$GPGGA,092750.000,5321.6802,N,00630.3372,W,1,8,1.03,61.7,M,55.2,M,,*76\r\n"
# Try to open serial port if available
gps_serial = None
if SERIAL_PORT:
try:
gps_serial = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1)
print(f"Opened serial port {SERIAL_PORT} at {BAUDRATE} baud")
# Start NMEA reading thread if serial port is open
nmea_thread = threading.Thread(target=read_nmea, args=(gps_serial,), daemon=True)
nmea_thread.start()
report_thread = threading.Thread(target=report_rtk_status, daemon=True)
report_thread.start()
# Send initial data if any
if initial_data:
gps_serial.write(initial_data)
print(f"Sent {len(initial_data)} bytes to GPS")
except Exception as e:
print(f"Error opening serial port: {e}")
print("RTCM data will be saved to files instead of sent to GPS.")
gps_serial = None
# If we have initial data but no serial port, save it
if initial_data and not gps_serial:
save_rtcm_data(initial_data, data_count)
data_count += 1
# Send initial position
client.send(gpgga.encode())
print(f"Sent initial GPGGA position: {gpgga.strip()}")
last_gpgga_time = time.time()
last_save_time = time.time()
accumulated_data = b""
# Main data receiving loop
while True:
# Send GPGGA update every 10 seconds
current_time = time.time()
if current_time - last_gpgga_time >= 10:
client.send(gpgga.encode())
print(f"Sent GPGGA update: {gpgga.strip()}")
last_gpgga_time = current_time
# Receive RTCM data with a short timeout to allow for regular GPGGA updates
client.settimeout(1.0)
try:
data = client.recv(1024)
if not data:
print("Connection closed by server")
break
# If we have a serial port, send data there
if gps_serial:
gps_serial.write(data)
print(f"Sent {len(data)} bytes to GPS")
else:
# Otherwise accumulate data and save periodically
accumulated_data += data
if current_time - last_save_time >= 10 and accumulated_data:
save_rtcm_data(accumulated_data, data_count)
data_count += 1
accumulated_data = b""
last_save_time = current_time
else:
# Just print a brief message about received data
print(f"Received {len(data)} bytes of RTCM data")
except socket.timeout:
# Timeout is expected, just continue the loop
continue
except Exception as e:
print(f"Error receiving data: {e}")
break
# If we exit the loop successfully, reset retry count
retry_count = 0
break
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc())
retry_count += 1
if retry_count < max_retries:
print(f"Retrying ({retry_count}/{max_retries})...")
time.sleep(5) # Wait before retrying
finally:
if client:
try:
client.close()
except:
pass
print("NTRIP connection closed")
if retry_count >= max_retries:
print(f"Failed after {max_retries} attempts. Check your connection settings.")
def test_server_connection():
"""Test different connection options to find what works with RTK2GO."""
print("Testing different connection methods to troubleshoot connection issues...")
# Try to get source table
try:
print("\nTest 1: Checking source table on rtk2go.com:2101...")
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
client.connect(("rtk2go.com", 2101))
headers = "GET / HTTP/1.0\r\n\r\n"
client.send(headers.encode())
# Get initial response
response = client.recv(1024).decode('utf-8', errors='ignore')
print(f"Response: {response[:200]}...")
# Check for success
if "200 OK" in response or "SOURCETABLE 200 OK" in response or "ICY 200 OK" in response:
print("✓ Successfully connected to RTK2GO Caster!")
else:
print("✗ Connection issue with RTK2GO Caster.")
client.close()
except Exception as e:
print(f"✗ Connection test 1 failed: {e}")
# Try with email as username and 'none' as password
try:
print("\nTest 2: Trying connection with email and 'none' password...")
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
client.connect(("rtk2go.com", 2101))
# Encode credentials
auth_string = f"{USERNAME}:none"
auth = base64.b64encode(auth_string.encode()).decode()
headers = (
f"GET / HTTP/1.0\r\n"
f"User-Agent: NTRIP GNSSViewer/1.0\r\n"
f"Authorization: Basic {auth}\r\n"
f"\r\n"
)
client.send(headers.encode())
response = client.recv(1024).decode('utf-8', errors='ignore')
print(f"Response: {response[:200]}...")
if "200 OK" in response or "SOURCETABLE 200 OK" in response or "ICY 200 OK" in response:
print("✓ Authentication successful!")
else:
print("✗ Authentication issue.")
client.close()
except Exception as e:
print(f"✗ Connection test 2 failed: {e}")
# Try checking server status
try:
print("\nTest 3: Checking server status...")
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
client.connect(("rtk2go.com", 2101))
headers = "GET /SNIP::STATUS HTTP/1.0\r\n\r\n"
client.send(headers.encode())
response = client.recv(1024).decode('utf-8', errors='ignore')
print(f"Response: {response[:200]}...")
client.close()
except Exception as e:
print(f"✗ Connection test 3 failed: {e}")
print("\nTest complete. Please check the results above.")
# Print recommendations
print("\nRECOMMENDATIONS BASED ON RTK2GO BEST PRACTICES:")
print("1. Use rtk2go.com instead of the IP address")
print("2. Use port 2101 (standard NTRIP port)")
print("3. For authentication, use your email as username and 'none' as password")
print("4. Use NTRIP v1 protocol for best compatibility")
print("5. Check if your mountpoint is available in the source table")
print("6. Make sure your GGA position data is correct")
if __name__ == "__main__":
try:
if len(sys.argv) > 1 and sys.argv[1] == "--test":
test_server_connection()
else:
fetch_rtcm()
except KeyboardInterrupt:
print("\nProgram terminated by user")
except Exception as e:
print(f"Unhandled exception: {e}")
traceback.print_exc()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment