Skip to content

Instantly share code, notes, and snippets.

@Geofferey
Last active May 19, 2025 22:12
Show Gist options
  • Save Geofferey/5215f1d3eb77efd37478a97e19d5c69b to your computer and use it in GitHub Desktop.
Save Geofferey/5215f1d3eb77efd37478a97e19d5c69b to your computer and use it in GitHub Desktop.
Take raw nmea and convert to XML compatible with ATAK CoT
#!/bin/env python3
import socket
import sys
from datetime import datetime, timedelta, timezone
import string
import argparse
import os
import time
import threading
import queue # Explicitly import queue for clarity
from queue import Empty # Keep Empty for direct use
def clean_line_input(raw):
"""Removes leading nulls, ZWSP, and non-printable characters from the beginning and strips whitespace."""
# Remove BOM and other zero-width characters often found at the beginning
clean = raw.lstrip('\x00\u200b\u200c\u200d\ufeff').strip()
# Remove any other leading non-printable characters
while clean and clean[0] not in string.printable:
clean = clean[1:]
return clean
def nmea_to_decimal(degrees_minutes, direction):
"""Converts NMEA DDDMM.MMMM or DDMM.MMMM to decimal degrees."""
if not degrees_minutes or len(degrees_minutes) < 3: # Basic check
return None
try:
if direction in ['N', 'S']: # Latitude
degrees = int(degrees_minutes[0:2])
minutes = float(degrees_minutes[2:])
elif direction in ['E', 'W']: # Longitude
degrees = int(degrees_minutes[0:3])
minutes = float(degrees_minutes[3:])
else: # Invalid direction
return None
decimal = degrees + minutes / 60
if direction in ['S', 'W']:
decimal = -decimal
return decimal
except ValueError:
# This handles cases like empty degrees_minutes or non-numeric content
print(f"[ERROR] Failed to convert NMEA coordinate part: '{degrees_minutes}', direction: '{direction}'")
return None
def parse_gga(sentence, quiet=False):
"""Parses a GGA NMEA sentence for position and altitude."""
parts = sentence.strip().split(',')
if len(parts) < 10: # GGA has at least 10 fields before altitude sometimes
if not quiet:
print(f"[DEBUG] Incomplete GGA sentence (fields < 10): {parts}")
return None
# Check for common GGA identifiers (e.g., $GPGGA, $GNGGA)
if not parts[0].endswith("GGA"):
if not quiet:
print(f"[DEBUG] Not an GGA sentence: {repr(parts[0])}")
return None
# Field 6: GPS Quality Indicator
# 0 = No fix
# 1 = GPS fix
# 2 = DGPS fix
# etc.
if parts[6] == '0':
if not quiet:
print("[DEBUG] GGA sentence reports no fix (Fix Quality: 0)")
return None
# Field 2: Latitude (DDMM.MMMM)
# Field 3: N/S Indicator
# Field 4: Longitude (DDDMM.MMMM)
# Field 5: E/W Indicator
lat = nmea_to_decimal(parts[2], parts[3])
lon = nmea_to_decimal(parts[4], parts[5])
# Field 9: Altitude (MSL)
# Field 10: Units of altitude (M for meters)
try:
hae = float(parts[9]) if parts[9] else 0.0 # Height Above Ellipsoid (using MSL altitude as HAE proxy)
except ValueError:
if not quiet:
print(f"[WARN] GGA sentence has invalid altitude: {parts[9]}")
hae = 0.0 # Default HAE if conversion fails or empty
if lat is None or lon is None:
if not quiet:
print(f"[DEBUG] GGA sentence yielded invalid lat/lon after conversion.")
return None
return {'lat': lat, 'lon': lon, 'hae': hae}
def parse_rmc(sentence, quiet=False):
"""Parses an RMC NMEA sentence for speed and course."""
parts = sentence.strip().split(',')
if len(parts) < 9: # RMC needs at least speed and course fields
if not quiet:
print(f"[DEBUG] Incomplete RMC sentence (fields < 9): {parts}")
return None
# Check for common RMC identifiers (e.g., $GPRMC, $GNRMC)
if not parts[0].endswith("RMC"):
if not quiet:
print(f"[DEBUG] Not an RMC sentence: {repr(parts[0])}")
return None
# Field 2: Status, A=Active, V=Void
if parts[2] != 'A':
if not quiet:
print("[DEBUG] RMC sentence status is not Active (A).")
return None
speed_knots_str = parts[7]
course_deg_str = parts[8]
speed_mps = None
course_deg = None
try:
if speed_knots_str: # Check if the string is not empty
speed_knots = float(speed_knots_str)
speed_mps = speed_knots * 0.514444 # Convert knots to m/s
else:
if not quiet:
print("[DEBUG] RMC sentence has empty speed field.")
except ValueError:
if not quiet:
print(f"[WARN] RMC sentence has invalid speed: {speed_knots_str}")
try:
if course_deg_str: # Check if the string is not empty
course_deg = float(course_deg_str)
else:
if not quiet:
print("[DEBUG] RMC sentence has empty course field.")
except ValueError:
if not quiet:
print(f"[WARN] RMC sentence has invalid course: {course_deg_str}")
# Only return data if at least one valid value was parsed
if speed_mps is not None or course_deg is not None:
return {'speed': speed_mps, 'course': course_deg}
else:
if not quiet:
print("[DEBUG] RMC sentence parsed but no valid speed or course found.")
return None
def make_atak_event(uid, callsign, lat, lon, hae, speed=None, course=None):
"""Creates an ATAK Cursor-on-Target XML event string."""
now = datetime.now(timezone.utc)
time_str = now.strftime("%Y-%m-%dT%H:%M:%SZ")
stale_time = (now + timedelta(minutes=10)).strftime("%Y-%m-%dT%H:%M:%SZ")
if speed is None: speed=0.0
if course is None: course=0
return f'''<event version="2.0" uid="{uid}" type="a-n-G-E-S-E" time="{time_str}" start="{time_str}" stale="{stale_time}" how="m-g">
<point lat="{lat:.6f}" lon="{lon:.6f}" hae="{hae:.1f}" ce="10.0" le="10.0"/>
<detail>
<track speed="{speed:.1f}" course="{course:.1f}"/>
<contact callsign="{callsign}"/>
<remarks>CoT GENERATED FROM RAW NMEA</remarks>
</detail>
</event>''' # Added newline consistency and ensured </event> is on its own line if remarks/track are multi-line
def connect_gps_with_retries(ip, port, timeout, quiet):
"""Connects to the GPS source with a specified number of retries."""
attempt = 0
while True:
attempt += 1
if not quiet:
print(f"[RETRY] Attempt {attempt}: Connecting to GPS at {ip}:{port} ...")
try:
gps_socket = socket.create_connection((ip, port), timeout=5) # Connection timeout
# Set read timeout for operations on the socket's file object
gps_socket.settimeout(timeout if timeout > 0 else None)
gps_file = gps_socket.makefile('r', encoding='utf-8', errors='ignore') # Specify encoding
if not quiet:
print("[CONNECTED] GPS connection established.")
return gps_socket, gps_file
except socket.timeout: # This timeout is for create_connection
print(f"[ERROR] Timeout connecting to GPS source: {ip}:{port}")
except Exception as e:
print(f"[ERROR] Could not connect to GPS source: {e}")
# if max_retries and attempt >= max_retries:
# print("[FAIL] Max retries reached for GPS connection. Exiting.")
# sys.exit(1)
# time.sleep(min(30, 2**attempt)) # Exponential backoff capped at 30s
def process_nmea_line(line, quiet, data_queue):
"""Processes a single NMEA line, parses GGA or RMC, and puts data into the queue."""
if not line:
if not quiet:
print(f"[DEBUG] Skipped empty or non-printable line.")
return
if not quiet:
print(f"[RAW] {line}")
parsed_data = None
data_type = None
# Case-insensitive check for sentence type
if line.upper().startswith('$') and len(line.split(',')) > 0:
sentence_id = line.split(',')[0]
if sentence_id.endswith("GGA"):
parsed_data = parse_gga(line, quiet=quiet)
if parsed_data:
data_type = 'gga'
elif sentence_id.endswith("RMC"):
parsed_data = parse_rmc(line, quiet=quiet)
if parsed_data:
data_type = 'rmc'
# Add other parsers here if needed (e.g., elif sentence_id.endswith("VTG") for another way to get course/speed)
if parsed_data and data_type:
if not quiet:
print(f"[PARSED {data_type.upper()}] {parsed_data}")
data_queue.put({'type': data_type, 'data': parsed_data})
else:
if not quiet and line.startswith('$'): # Only print skip for actual NMEA-like lines
print(f"[SKIP] Could not parse a valid GGA/RMC or sentence type not supported/relevant from: {line}")
def gps_reader_thread(gps_ip, gps_port, timeout, quiet, data_queue, stop_event):
"""Reads NMEA sentences from a GPSD-like source and puts parsed data into a queue."""
gps_sock = None
gps_file = None
while not stop_event.is_set():
if gps_sock is None or gps_file is None:
try:
gps_sock, gps_file = connect_gps_with_retries(gps_ip, gps_port, timeout, quiet)
except SystemExit: # connect_gps_with_retries can sys.exit
stop_event.set() # Signal other threads to stop
break
try:
raw_line = gps_file.readline()
if not raw_line: # EOF or stream closed
if not quiet:
print("[WARN] GPS stream closed or EOF detected, attempting to reconnect...")
# Close and nullify to trigger reconnection logic
if gps_file: gps_file.close()
if gps_sock: gps_sock.close()
gps_sock, gps_file = None, None
time.sleep(1) # Brief pause before reconnection attempt
continue
except socket.timeout:
if not quiet:
print("[DEBUG] GPS read timeout, continuing...")
# This timeout means no data was received in the 'timeout' duration.
# Check if connection is still alive or attempt reconnect.
# For simplicity, we can just continue and let readline try again.
# A more robust check might try sending a keep-alive or fully reconnecting.
continue
except Exception as e:
if not quiet:
print(f"[ERROR] Exception while reading GPS data: {e}, attempting to reconnect...")
if gps_file: gps_file.close()
if gps_sock: gps_sock.close()
gps_sock, gps_file = None, None
time.sleep(1) # Brief pause
continue
line = clean_line_input(raw_line)
process_nmea_line(line, quiet, data_queue)
# Cleanup on stop
if gps_file: gps_file.close()
if gps_sock: gps_sock.close()
if not quiet: print("[INFO] GPS reader thread stopped.")
def listener_reader_thread(listen_ip, listen_port, quiet, data_queue, stop_event):
"""Listens for incoming NMEA sentences on a TCP port and processes them."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((listen_ip, listen_port))
sock.listen(1)
if not quiet:
print(f"[LISTEN] Listening for incoming NMEA on {listen_ip}:{listen_port}")
except Exception as e:
print(f"[ERROR] Failed to start listener socket on {listen_ip}:{listen_port} - {e}")
stop_event.set() # Signal other threads to stop
return
sock.settimeout(1.0) # Timeout for accept() to check stop_event
client_sock = None
client_file = None
while not stop_event.is_set():
if client_sock is None:
try:
client_sock, addr = sock.accept()
client_sock.settimeout(5.0) # Set read timeout for client socket
client_file = client_sock.makefile('r', encoding='utf-8', errors='ignore')
if not quiet:
print(f"[LISTEN] Accepted NMEA connection from {addr}")
except socket.timeout: # Timeout on accept()
continue
except Exception as e:
if not quiet:
print(f"[ERROR] Listener accept error: {e}")
# Potentially add a small delay before retrying accept
time.sleep(1)
continue
try:
raw_line = client_file.readline()
if not raw_line: # Client disconnected
if not quiet:
print("[LISTEN] NMEA client disconnected, waiting for new connection...")
if client_file: client_file.close()
if client_sock: client_sock.close()
client_file, client_sock = None, None
continue
except socket.timeout:
if not quiet:
print("[DEBUG] NMEA client read timeout. Checking connection or waiting for more data...")
# If timeout, client might still be connected but sending slowly.
# Or, connection might be dead. A more robust check could be added.
# For now, just continue to try reading again.
continue
except Exception as e:
if not quiet:
print(f"[ERROR] Exception while reading from NMEA client: {e}")
if client_file: client_file.close()
if client_sock: client_sock.close()
client_file, client_sock = None, None
continue
line = clean_line_input(raw_line)
process_nmea_line(line, quiet, data_queue)
# Cleanup on stop
if client_file: client_file.close()
if client_sock: client_sock.close()
sock.close()
if not quiet: print("[INFO] Listener reader thread stopped.")
def sender_thread(atak_ip, atak_port, uid, callsign, tcp_mode, quiet, data_queue, stop_event):
"""Sends ATAK CoT messages based on data received from the queue."""
sock_send = None
# State variables
latest_gga_data = None # Holds {'lat': ..., 'lon': ..., 'hae': ...}
latest_rmc_data = None # Holds {'speed': ..., 'course': ...}
last_sent_position_tuple = None # Holds (lat, lon, hae) of the last sent CoT
def connect_atak_tcp():
nonlocal sock_send
if sock_send:
try:
sock_send.close()
except Exception:
pass # Ignore errors on close
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5.0) # Connection and send timeout
s.connect((atak_ip, atak_port))
if not quiet:
print(f"[CONNECTED] Connected to ATAK TCP at {atak_ip}:{atak_port}")
return s
except Exception as e:
print(f"[ERROR] Could not connect to ATAK TCP at {atak_ip}:{atak_port} - {e}")
return None
if not tcp_mode: # UDP
sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else: # TCP - initial connection attempt
sock_send = connect_atak_tcp()
if not sock_send:
# If initial TCP connection fails, we might retry in the loop or exit.
# For now, the loop will attempt to resend, which for TCP means re-establishing.
print(f"[WARN] Initial ATAK TCP connection failed. Will retry on next data send.")
while not stop_event.is_set():
try:
item = data_queue.get(timeout=1.0) # Check queue with timeout
except Empty:
continue # No data, loop to check stop_event
item_type = item.get('type')
item_data = item.get('data')
if not item_data: # Should not happen if parsers are well-behaved
continue
if item_type == 'gga':
# Check for essential keys, though parse_gga should ensure them if not None
if 'lat' not in item_data or 'lon' not in item_data or 'hae' not in item_data:
if not quiet: print(f"[WARN] Received malformed GGA data: {item_data}")
continue
if item_data['lat'] is None or item_data['lon'] is None: # Check from nmea_to_decimal
if not quiet: print(f"[DEBUG] Received GGA data with invalid coordinates: {item_data}")
continue
latest_gga_data = item_data
current_position_tuple = (latest_gga_data['lat'], latest_gga_data['lon'], latest_gga_data['hae'])
if current_position_tuple != last_sent_position_tuple:
speed_to_send = latest_rmc_data.get('speed') if latest_rmc_data else None
course_to_send = latest_rmc_data.get('course') if latest_rmc_data else None
xml = make_atak_event(uid, callsign,
latest_gga_data['lat'], latest_gga_data['lon'], latest_gga_data['hae'],
speed=speed_to_send, course=course_to_send)
if not quiet:
print(f"[XML] Generated ATAK CoT:\n{xml}")
try:
if tcp_mode:
if sock_send is None: # Attempt to reconnect if TCP socket is dead
if not quiet: print("[INFO] ATAK TCP socket is not connected. Attempting to reconnect...")
sock_send = connect_atak_tcp()
if sock_send: # If connection (re)established or was fine
sock_send.sendall(xml.encode('utf-8'))
else: # Connection failed
print(f"[ERROR] Failed to send to {atak_ip}:{atak_port} (TCP connection issue)")
continue # Skip this send, will try to reconnect on next data
else: # UDP
sock_send.sendto(xml.encode('utf-8'), (atak_ip, atak_port))
if not quiet:
print(f"[SENT] ATAK CoT sent to {atak_ip}:{atak_port}")
last_sent_position_tuple = current_position_tuple # Update only on successful send
except socket.timeout:
print(f"[ERROR] Timeout sending to {atak_ip}:{atak_port} (TCP)")
if tcp_mode and sock_send: # Close broken TCP socket
try: sock_send.close()
except: pass
sock_send = None
except Exception as e:
print(f"[ERROR] Failed to send to {atak_ip}:{atak_port} — {e}")
if tcp_mode and sock_send: # Close broken TCP socket on other errors too
try: sock_send.close()
except: pass
sock_send = None
else:
if not quiet:
print("[SKIP] Position unchanged from last sent. No CoT update sent.")
elif item_type == 'rmc':
latest_rmc_data = item_data # Update latest RMC data
if not quiet:
print(f"[INFO] Updated RMC data: Speed={latest_rmc_data.get('speed')}, Course={latest_rmc_data.get('course')}")
data_queue.task_done() # Signal that the item from the queue has been processed
# Cleanup on stop
if sock_send:
sock_send.close()
if not quiet: print("[INFO] Sender thread stopped.")
def main():
parser = argparse.ArgumentParser(description="Convert NMEA (GGA/RMC) to ATAK CoT. Reads from GPSD or listens for NMEA.")
parser.add_argument("-g", "--gpsd", help="GPSD IP and port (e.g., 127.0.0.1:2947). If -l is used, this is ignored.")
parser.add_argument("-l", "--listen", help="Listen IP:port for incoming raw NMEA (e.g., 0.0.0.0:5000). Overrides --gpsd.")
parser.add_argument("-a", "--atak", default="127.0.0.1:8087", help="ATAK IP and port (default 127.0.0.1:8087)")
parser.add_argument("-c", "--callsign", default="NMEA-Track", help="Callsign for CoT (default NMEA-Track)")
parser.add_argument("-u", "--uid", default="nmea-track-001", help="UID for CoT (default nmea-track-001)")
parser.add_argument("-t", "--tcp", action="store_true", help="Send ATAK CoT over TCP instead of UDP (default UDP)")
parser.add_argument("-q", "--quiet", action="store_true", help="Quiet mode, suppress debug and informational output")
parser.add_argument("-T", "--timeout", type=int, default=10, help="GPSD socket read timeout in seconds (default 10). Use 0 for blocking.")
args = parser.parse_args()
if not args.listen and not args.gpsd:
print("[ERROR] You must specify a source: either --gpsd IP:PORT or --listen IP:PORT")
sys.exit(1)
if args.listen and args.gpsd and not args.quiet:
print("[INFO] --listen specified, --gpsd will be ignored.")
data_src_ip, data_src_port = None, None
is_listen_mode = False
if args.listen:
is_listen_mode = True
try:
data_src_ip, port_str = args.listen.split(':')
data_src_port = int(port_str)
except ValueError:
print("[ERROR] Invalid listen format. Use IP:port (e.g., 0.0.0.0:5000)")
sys.exit(1)
elif args.gpsd: # Only use gpsd if listen is not set
try:
data_src_ip, port_str = args.gpsd.split(':')
data_src_port = int(port_str)
except ValueError:
print("[ERROR] Invalid gpsd format. Use IP:port (e.g., 127.0.0.1:2947)")
sys.exit(1)
else: # Should be caught by the initial check, but as a safeguard
print("[ERROR] No data source specified.")
sys.exit(1)
try:
atak_ip, atak_port_str = args.atak.split(':')
atak_port = int(atak_port_str)
except ValueError:
print("[ERROR] Invalid ATAK format. Use IP:port (e.g., 127.0.0.1:8087)")
sys.exit(1)
data_queue = queue.Queue()
stop_event = threading.Event()
reader_thread_obj = None
if is_listen_mode:
if not args.quiet: print(f"[CONFIG] Mode: NMEA Listener ({data_src_ip}:{data_src_port})")
reader_thread_obj = threading.Thread(target=listener_reader_thread,
args=(data_src_ip, data_src_port, args.quiet, data_queue, stop_event),
daemon=True)
else: # GPSD mode
if not args.quiet: print(f"[CONFIG] Mode: GPSD Client ({data_src_ip}:{data_src_port})")
reader_thread_obj = threading.Thread(target=gps_reader_thread,
args=(data_src_ip, data_src_port, args.timeout, args.quiet, data_queue, stop_event),
daemon=True)
sender_thread_obj = threading.Thread(target=sender_thread,
args=(atak_ip, atak_port, args.uid, args.callsign, args.tcp, args.quiet, data_queue, stop_event),
daemon=True)
if not args.quiet:
print(f"[CONFIG] ATAK Target: {atak_ip}:{atak_port} ({'TCP' if args.tcp else 'UDP'})")
print(f"[CONFIG] Callsign: {args.callsign}, UID: {args.uid}")
if args.quiet: print("[CONFIG] Quiet mode enabled.")
reader_thread_obj.start()
sender_thread_obj.start()
try:
while not stop_event.is_set():
# Keep main thread alive to handle KeyboardInterrupt and allow daemon threads to run
# Check if threads are alive; if not, something went wrong.
if not reader_thread_obj.is_alive() or not sender_thread_obj.is_alive():
if not args.quiet and not stop_event.is_set(): # Avoid printing if already stopping
print("[ERROR] A critical thread has terminated unexpectedly. Setting stop event.")
stop_event.set() # Signal other threads to stop
break
time.sleep(1)
except KeyboardInterrupt:
if not args.quiet:
print("\n[STOP] Keyboard interrupt received. Shutting down gracefully...")
stop_event.set()
except Exception as e: # Catch any other unexpected errors in main
if not args.quiet:
print(f"\n[FATAL] Unexpected error in main: {e}. Shutting down...")
stop_event.set()
if not args.quiet: print("[INFO] Waiting for threads to complete...")
if reader_thread_obj.is_alive(): reader_thread_obj.join(timeout=5)
if sender_thread_obj.is_alive(): sender_thread_obj.join(timeout=5)
if not args.quiet:
if reader_thread_obj.is_alive(): print("[WARN] Reader thread did not exit cleanly.")
if sender_thread_obj.is_alive(): print("[WARN] Sender thread did not exit cleanly.")
print("[EXIT] Program terminated.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment