Last active
May 19, 2025 22:12
-
-
Save Geofferey/5215f1d3eb77efd37478a97e19d5c69b to your computer and use it in GitHub Desktop.
Take raw nmea and convert to XML compatible with ATAK CoT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import socket | |
import sys | |
from datetime import datetime, timedelta, timezone | |
import string | |
import argparse | |
import os | |
import time | |
import threading | |
import queue # Explicitly import queue for clarity | |
from queue import Empty # Keep Empty for direct use | |
def clean_line_input(raw): | |
"""Removes leading nulls, ZWSP, and non-printable characters from the beginning and strips whitespace.""" | |
# Remove BOM and other zero-width characters often found at the beginning | |
clean = raw.lstrip('\x00\u200b\u200c\u200d\ufeff').strip() | |
# Remove any other leading non-printable characters | |
while clean and clean[0] not in string.printable: | |
clean = clean[1:] | |
return clean | |
def nmea_to_decimal(degrees_minutes, direction): | |
"""Converts NMEA DDDMM.MMMM or DDMM.MMMM to decimal degrees.""" | |
if not degrees_minutes or len(degrees_minutes) < 3: # Basic check | |
return None | |
try: | |
if direction in ['N', 'S']: # Latitude | |
degrees = int(degrees_minutes[0:2]) | |
minutes = float(degrees_minutes[2:]) | |
elif direction in ['E', 'W']: # Longitude | |
degrees = int(degrees_minutes[0:3]) | |
minutes = float(degrees_minutes[3:]) | |
else: # Invalid direction | |
return None | |
decimal = degrees + minutes / 60 | |
if direction in ['S', 'W']: | |
decimal = -decimal | |
return decimal | |
except ValueError: | |
# This handles cases like empty degrees_minutes or non-numeric content | |
print(f"[ERROR] Failed to convert NMEA coordinate part: '{degrees_minutes}', direction: '{direction}'") | |
return None | |
def parse_gga(sentence, quiet=False): | |
"""Parses a GGA NMEA sentence for position and altitude.""" | |
parts = sentence.strip().split(',') | |
if len(parts) < 10: # GGA has at least 10 fields before altitude sometimes | |
if not quiet: | |
print(f"[DEBUG] Incomplete GGA sentence (fields < 10): {parts}") | |
return None | |
# Check for common GGA identifiers (e.g., $GPGGA, $GNGGA) | |
if not parts[0].endswith("GGA"): | |
if not quiet: | |
print(f"[DEBUG] Not an GGA sentence: {repr(parts[0])}") | |
return None | |
# Field 6: GPS Quality Indicator | |
# 0 = No fix | |
# 1 = GPS fix | |
# 2 = DGPS fix | |
# etc. | |
if parts[6] == '0': | |
if not quiet: | |
print("[DEBUG] GGA sentence reports no fix (Fix Quality: 0)") | |
return None | |
# Field 2: Latitude (DDMM.MMMM) | |
# Field 3: N/S Indicator | |
# Field 4: Longitude (DDDMM.MMMM) | |
# Field 5: E/W Indicator | |
lat = nmea_to_decimal(parts[2], parts[3]) | |
lon = nmea_to_decimal(parts[4], parts[5]) | |
# Field 9: Altitude (MSL) | |
# Field 10: Units of altitude (M for meters) | |
try: | |
hae = float(parts[9]) if parts[9] else 0.0 # Height Above Ellipsoid (using MSL altitude as HAE proxy) | |
except ValueError: | |
if not quiet: | |
print(f"[WARN] GGA sentence has invalid altitude: {parts[9]}") | |
hae = 0.0 # Default HAE if conversion fails or empty | |
if lat is None or lon is None: | |
if not quiet: | |
print(f"[DEBUG] GGA sentence yielded invalid lat/lon after conversion.") | |
return None | |
return {'lat': lat, 'lon': lon, 'hae': hae} | |
def parse_rmc(sentence, quiet=False): | |
"""Parses an RMC NMEA sentence for speed and course.""" | |
parts = sentence.strip().split(',') | |
if len(parts) < 9: # RMC needs at least speed and course fields | |
if not quiet: | |
print(f"[DEBUG] Incomplete RMC sentence (fields < 9): {parts}") | |
return None | |
# Check for common RMC identifiers (e.g., $GPRMC, $GNRMC) | |
if not parts[0].endswith("RMC"): | |
if not quiet: | |
print(f"[DEBUG] Not an RMC sentence: {repr(parts[0])}") | |
return None | |
# Field 2: Status, A=Active, V=Void | |
if parts[2] != 'A': | |
if not quiet: | |
print("[DEBUG] RMC sentence status is not Active (A).") | |
return None | |
speed_knots_str = parts[7] | |
course_deg_str = parts[8] | |
speed_mps = None | |
course_deg = None | |
try: | |
if speed_knots_str: # Check if the string is not empty | |
speed_knots = float(speed_knots_str) | |
speed_mps = speed_knots * 0.514444 # Convert knots to m/s | |
else: | |
if not quiet: | |
print("[DEBUG] RMC sentence has empty speed field.") | |
except ValueError: | |
if not quiet: | |
print(f"[WARN] RMC sentence has invalid speed: {speed_knots_str}") | |
try: | |
if course_deg_str: # Check if the string is not empty | |
course_deg = float(course_deg_str) | |
else: | |
if not quiet: | |
print("[DEBUG] RMC sentence has empty course field.") | |
except ValueError: | |
if not quiet: | |
print(f"[WARN] RMC sentence has invalid course: {course_deg_str}") | |
# Only return data if at least one valid value was parsed | |
if speed_mps is not None or course_deg is not None: | |
return {'speed': speed_mps, 'course': course_deg} | |
else: | |
if not quiet: | |
print("[DEBUG] RMC sentence parsed but no valid speed or course found.") | |
return None | |
def make_atak_event(uid, callsign, lat, lon, hae, speed=None, course=None): | |
"""Creates an ATAK Cursor-on-Target XML event string.""" | |
now = datetime.now(timezone.utc) | |
time_str = now.strftime("%Y-%m-%dT%H:%M:%SZ") | |
stale_time = (now + timedelta(minutes=10)).strftime("%Y-%m-%dT%H:%M:%SZ") | |
if speed is None: speed=0.0 | |
if course is None: course=0 | |
return f'''<event version="2.0" uid="{uid}" type="a-n-G-E-S-E" time="{time_str}" start="{time_str}" stale="{stale_time}" how="m-g"> | |
<point lat="{lat:.6f}" lon="{lon:.6f}" hae="{hae:.1f}" ce="10.0" le="10.0"/> | |
<detail> | |
<track speed="{speed:.1f}" course="{course:.1f}"/> | |
<contact callsign="{callsign}"/> | |
<remarks>CoT GENERATED FROM RAW NMEA</remarks> | |
</detail> | |
</event>''' # Added newline consistency and ensured </event> is on its own line if remarks/track are multi-line | |
def connect_gps_with_retries(ip, port, timeout, quiet): | |
"""Connects to the GPS source with a specified number of retries.""" | |
attempt = 0 | |
while True: | |
attempt += 1 | |
if not quiet: | |
print(f"[RETRY] Attempt {attempt}: Connecting to GPS at {ip}:{port} ...") | |
try: | |
gps_socket = socket.create_connection((ip, port), timeout=5) # Connection timeout | |
# Set read timeout for operations on the socket's file object | |
gps_socket.settimeout(timeout if timeout > 0 else None) | |
gps_file = gps_socket.makefile('r', encoding='utf-8', errors='ignore') # Specify encoding | |
if not quiet: | |
print("[CONNECTED] GPS connection established.") | |
return gps_socket, gps_file | |
except socket.timeout: # This timeout is for create_connection | |
print(f"[ERROR] Timeout connecting to GPS source: {ip}:{port}") | |
except Exception as e: | |
print(f"[ERROR] Could not connect to GPS source: {e}") | |
# if max_retries and attempt >= max_retries: | |
# print("[FAIL] Max retries reached for GPS connection. Exiting.") | |
# sys.exit(1) | |
# time.sleep(min(30, 2**attempt)) # Exponential backoff capped at 30s | |
def process_nmea_line(line, quiet, data_queue): | |
"""Processes a single NMEA line, parses GGA or RMC, and puts data into the queue.""" | |
if not line: | |
if not quiet: | |
print(f"[DEBUG] Skipped empty or non-printable line.") | |
return | |
if not quiet: | |
print(f"[RAW] {line}") | |
parsed_data = None | |
data_type = None | |
# Case-insensitive check for sentence type | |
if line.upper().startswith('$') and len(line.split(',')) > 0: | |
sentence_id = line.split(',')[0] | |
if sentence_id.endswith("GGA"): | |
parsed_data = parse_gga(line, quiet=quiet) | |
if parsed_data: | |
data_type = 'gga' | |
elif sentence_id.endswith("RMC"): | |
parsed_data = parse_rmc(line, quiet=quiet) | |
if parsed_data: | |
data_type = 'rmc' | |
# Add other parsers here if needed (e.g., elif sentence_id.endswith("VTG") for another way to get course/speed) | |
if parsed_data and data_type: | |
if not quiet: | |
print(f"[PARSED {data_type.upper()}] {parsed_data}") | |
data_queue.put({'type': data_type, 'data': parsed_data}) | |
else: | |
if not quiet and line.startswith('$'): # Only print skip for actual NMEA-like lines | |
print(f"[SKIP] Could not parse a valid GGA/RMC or sentence type not supported/relevant from: {line}") | |
def gps_reader_thread(gps_ip, gps_port, timeout, quiet, data_queue, stop_event): | |
"""Reads NMEA sentences from a GPSD-like source and puts parsed data into a queue.""" | |
gps_sock = None | |
gps_file = None | |
while not stop_event.is_set(): | |
if gps_sock is None or gps_file is None: | |
try: | |
gps_sock, gps_file = connect_gps_with_retries(gps_ip, gps_port, timeout, quiet) | |
except SystemExit: # connect_gps_with_retries can sys.exit | |
stop_event.set() # Signal other threads to stop | |
break | |
try: | |
raw_line = gps_file.readline() | |
if not raw_line: # EOF or stream closed | |
if not quiet: | |
print("[WARN] GPS stream closed or EOF detected, attempting to reconnect...") | |
# Close and nullify to trigger reconnection logic | |
if gps_file: gps_file.close() | |
if gps_sock: gps_sock.close() | |
gps_sock, gps_file = None, None | |
time.sleep(1) # Brief pause before reconnection attempt | |
continue | |
except socket.timeout: | |
if not quiet: | |
print("[DEBUG] GPS read timeout, continuing...") | |
# This timeout means no data was received in the 'timeout' duration. | |
# Check if connection is still alive or attempt reconnect. | |
# For simplicity, we can just continue and let readline try again. | |
# A more robust check might try sending a keep-alive or fully reconnecting. | |
continue | |
except Exception as e: | |
if not quiet: | |
print(f"[ERROR] Exception while reading GPS data: {e}, attempting to reconnect...") | |
if gps_file: gps_file.close() | |
if gps_sock: gps_sock.close() | |
gps_sock, gps_file = None, None | |
time.sleep(1) # Brief pause | |
continue | |
line = clean_line_input(raw_line) | |
process_nmea_line(line, quiet, data_queue) | |
# Cleanup on stop | |
if gps_file: gps_file.close() | |
if gps_sock: gps_sock.close() | |
if not quiet: print("[INFO] GPS reader thread stopped.") | |
def listener_reader_thread(listen_ip, listen_port, quiet, data_queue, stop_event): | |
"""Listens for incoming NMEA sentences on a TCP port and processes them.""" | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((listen_ip, listen_port)) | |
sock.listen(1) | |
if not quiet: | |
print(f"[LISTEN] Listening for incoming NMEA on {listen_ip}:{listen_port}") | |
except Exception as e: | |
print(f"[ERROR] Failed to start listener socket on {listen_ip}:{listen_port} - {e}") | |
stop_event.set() # Signal other threads to stop | |
return | |
sock.settimeout(1.0) # Timeout for accept() to check stop_event | |
client_sock = None | |
client_file = None | |
while not stop_event.is_set(): | |
if client_sock is None: | |
try: | |
client_sock, addr = sock.accept() | |
client_sock.settimeout(5.0) # Set read timeout for client socket | |
client_file = client_sock.makefile('r', encoding='utf-8', errors='ignore') | |
if not quiet: | |
print(f"[LISTEN] Accepted NMEA connection from {addr}") | |
except socket.timeout: # Timeout on accept() | |
continue | |
except Exception as e: | |
if not quiet: | |
print(f"[ERROR] Listener accept error: {e}") | |
# Potentially add a small delay before retrying accept | |
time.sleep(1) | |
continue | |
try: | |
raw_line = client_file.readline() | |
if not raw_line: # Client disconnected | |
if not quiet: | |
print("[LISTEN] NMEA client disconnected, waiting for new connection...") | |
if client_file: client_file.close() | |
if client_sock: client_sock.close() | |
client_file, client_sock = None, None | |
continue | |
except socket.timeout: | |
if not quiet: | |
print("[DEBUG] NMEA client read timeout. Checking connection or waiting for more data...") | |
# If timeout, client might still be connected but sending slowly. | |
# Or, connection might be dead. A more robust check could be added. | |
# For now, just continue to try reading again. | |
continue | |
except Exception as e: | |
if not quiet: | |
print(f"[ERROR] Exception while reading from NMEA client: {e}") | |
if client_file: client_file.close() | |
if client_sock: client_sock.close() | |
client_file, client_sock = None, None | |
continue | |
line = clean_line_input(raw_line) | |
process_nmea_line(line, quiet, data_queue) | |
# Cleanup on stop | |
if client_file: client_file.close() | |
if client_sock: client_sock.close() | |
sock.close() | |
if not quiet: print("[INFO] Listener reader thread stopped.") | |
def sender_thread(atak_ip, atak_port, uid, callsign, tcp_mode, quiet, data_queue, stop_event): | |
"""Sends ATAK CoT messages based on data received from the queue.""" | |
sock_send = None | |
# State variables | |
latest_gga_data = None # Holds {'lat': ..., 'lon': ..., 'hae': ...} | |
latest_rmc_data = None # Holds {'speed': ..., 'course': ...} | |
last_sent_position_tuple = None # Holds (lat, lon, hae) of the last sent CoT | |
def connect_atak_tcp(): | |
nonlocal sock_send | |
if sock_send: | |
try: | |
sock_send.close() | |
except Exception: | |
pass # Ignore errors on close | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.settimeout(5.0) # Connection and send timeout | |
s.connect((atak_ip, atak_port)) | |
if not quiet: | |
print(f"[CONNECTED] Connected to ATAK TCP at {atak_ip}:{atak_port}") | |
return s | |
except Exception as e: | |
print(f"[ERROR] Could not connect to ATAK TCP at {atak_ip}:{atak_port} - {e}") | |
return None | |
if not tcp_mode: # UDP | |
sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
else: # TCP - initial connection attempt | |
sock_send = connect_atak_tcp() | |
if not sock_send: | |
# If initial TCP connection fails, we might retry in the loop or exit. | |
# For now, the loop will attempt to resend, which for TCP means re-establishing. | |
print(f"[WARN] Initial ATAK TCP connection failed. Will retry on next data send.") | |
while not stop_event.is_set(): | |
try: | |
item = data_queue.get(timeout=1.0) # Check queue with timeout | |
except Empty: | |
continue # No data, loop to check stop_event | |
item_type = item.get('type') | |
item_data = item.get('data') | |
if not item_data: # Should not happen if parsers are well-behaved | |
continue | |
if item_type == 'gga': | |
# Check for essential keys, though parse_gga should ensure them if not None | |
if 'lat' not in item_data or 'lon' not in item_data or 'hae' not in item_data: | |
if not quiet: print(f"[WARN] Received malformed GGA data: {item_data}") | |
continue | |
if item_data['lat'] is None or item_data['lon'] is None: # Check from nmea_to_decimal | |
if not quiet: print(f"[DEBUG] Received GGA data with invalid coordinates: {item_data}") | |
continue | |
latest_gga_data = item_data | |
current_position_tuple = (latest_gga_data['lat'], latest_gga_data['lon'], latest_gga_data['hae']) | |
if current_position_tuple != last_sent_position_tuple: | |
speed_to_send = latest_rmc_data.get('speed') if latest_rmc_data else None | |
course_to_send = latest_rmc_data.get('course') if latest_rmc_data else None | |
xml = make_atak_event(uid, callsign, | |
latest_gga_data['lat'], latest_gga_data['lon'], latest_gga_data['hae'], | |
speed=speed_to_send, course=course_to_send) | |
if not quiet: | |
print(f"[XML] Generated ATAK CoT:\n{xml}") | |
try: | |
if tcp_mode: | |
if sock_send is None: # Attempt to reconnect if TCP socket is dead | |
if not quiet: print("[INFO] ATAK TCP socket is not connected. Attempting to reconnect...") | |
sock_send = connect_atak_tcp() | |
if sock_send: # If connection (re)established or was fine | |
sock_send.sendall(xml.encode('utf-8')) | |
else: # Connection failed | |
print(f"[ERROR] Failed to send to {atak_ip}:{atak_port} (TCP connection issue)") | |
continue # Skip this send, will try to reconnect on next data | |
else: # UDP | |
sock_send.sendto(xml.encode('utf-8'), (atak_ip, atak_port)) | |
if not quiet: | |
print(f"[SENT] ATAK CoT sent to {atak_ip}:{atak_port}") | |
last_sent_position_tuple = current_position_tuple # Update only on successful send | |
except socket.timeout: | |
print(f"[ERROR] Timeout sending to {atak_ip}:{atak_port} (TCP)") | |
if tcp_mode and sock_send: # Close broken TCP socket | |
try: sock_send.close() | |
except: pass | |
sock_send = None | |
except Exception as e: | |
print(f"[ERROR] Failed to send to {atak_ip}:{atak_port} — {e}") | |
if tcp_mode and sock_send: # Close broken TCP socket on other errors too | |
try: sock_send.close() | |
except: pass | |
sock_send = None | |
else: | |
if not quiet: | |
print("[SKIP] Position unchanged from last sent. No CoT update sent.") | |
elif item_type == 'rmc': | |
latest_rmc_data = item_data # Update latest RMC data | |
if not quiet: | |
print(f"[INFO] Updated RMC data: Speed={latest_rmc_data.get('speed')}, Course={latest_rmc_data.get('course')}") | |
data_queue.task_done() # Signal that the item from the queue has been processed | |
# Cleanup on stop | |
if sock_send: | |
sock_send.close() | |
if not quiet: print("[INFO] Sender thread stopped.") | |
def main(): | |
parser = argparse.ArgumentParser(description="Convert NMEA (GGA/RMC) to ATAK CoT. Reads from GPSD or listens for NMEA.") | |
parser.add_argument("-g", "--gpsd", help="GPSD IP and port (e.g., 127.0.0.1:2947). If -l is used, this is ignored.") | |
parser.add_argument("-l", "--listen", help="Listen IP:port for incoming raw NMEA (e.g., 0.0.0.0:5000). Overrides --gpsd.") | |
parser.add_argument("-a", "--atak", default="127.0.0.1:8087", help="ATAK IP and port (default 127.0.0.1:8087)") | |
parser.add_argument("-c", "--callsign", default="NMEA-Track", help="Callsign for CoT (default NMEA-Track)") | |
parser.add_argument("-u", "--uid", default="nmea-track-001", help="UID for CoT (default nmea-track-001)") | |
parser.add_argument("-t", "--tcp", action="store_true", help="Send ATAK CoT over TCP instead of UDP (default UDP)") | |
parser.add_argument("-q", "--quiet", action="store_true", help="Quiet mode, suppress debug and informational output") | |
parser.add_argument("-T", "--timeout", type=int, default=10, help="GPSD socket read timeout in seconds (default 10). Use 0 for blocking.") | |
args = parser.parse_args() | |
if not args.listen and not args.gpsd: | |
print("[ERROR] You must specify a source: either --gpsd IP:PORT or --listen IP:PORT") | |
sys.exit(1) | |
if args.listen and args.gpsd and not args.quiet: | |
print("[INFO] --listen specified, --gpsd will be ignored.") | |
data_src_ip, data_src_port = None, None | |
is_listen_mode = False | |
if args.listen: | |
is_listen_mode = True | |
try: | |
data_src_ip, port_str = args.listen.split(':') | |
data_src_port = int(port_str) | |
except ValueError: | |
print("[ERROR] Invalid listen format. Use IP:port (e.g., 0.0.0.0:5000)") | |
sys.exit(1) | |
elif args.gpsd: # Only use gpsd if listen is not set | |
try: | |
data_src_ip, port_str = args.gpsd.split(':') | |
data_src_port = int(port_str) | |
except ValueError: | |
print("[ERROR] Invalid gpsd format. Use IP:port (e.g., 127.0.0.1:2947)") | |
sys.exit(1) | |
else: # Should be caught by the initial check, but as a safeguard | |
print("[ERROR] No data source specified.") | |
sys.exit(1) | |
try: | |
atak_ip, atak_port_str = args.atak.split(':') | |
atak_port = int(atak_port_str) | |
except ValueError: | |
print("[ERROR] Invalid ATAK format. Use IP:port (e.g., 127.0.0.1:8087)") | |
sys.exit(1) | |
data_queue = queue.Queue() | |
stop_event = threading.Event() | |
reader_thread_obj = None | |
if is_listen_mode: | |
if not args.quiet: print(f"[CONFIG] Mode: NMEA Listener ({data_src_ip}:{data_src_port})") | |
reader_thread_obj = threading.Thread(target=listener_reader_thread, | |
args=(data_src_ip, data_src_port, args.quiet, data_queue, stop_event), | |
daemon=True) | |
else: # GPSD mode | |
if not args.quiet: print(f"[CONFIG] Mode: GPSD Client ({data_src_ip}:{data_src_port})") | |
reader_thread_obj = threading.Thread(target=gps_reader_thread, | |
args=(data_src_ip, data_src_port, args.timeout, args.quiet, data_queue, stop_event), | |
daemon=True) | |
sender_thread_obj = threading.Thread(target=sender_thread, | |
args=(atak_ip, atak_port, args.uid, args.callsign, args.tcp, args.quiet, data_queue, stop_event), | |
daemon=True) | |
if not args.quiet: | |
print(f"[CONFIG] ATAK Target: {atak_ip}:{atak_port} ({'TCP' if args.tcp else 'UDP'})") | |
print(f"[CONFIG] Callsign: {args.callsign}, UID: {args.uid}") | |
if args.quiet: print("[CONFIG] Quiet mode enabled.") | |
reader_thread_obj.start() | |
sender_thread_obj.start() | |
try: | |
while not stop_event.is_set(): | |
# Keep main thread alive to handle KeyboardInterrupt and allow daemon threads to run | |
# Check if threads are alive; if not, something went wrong. | |
if not reader_thread_obj.is_alive() or not sender_thread_obj.is_alive(): | |
if not args.quiet and not stop_event.is_set(): # Avoid printing if already stopping | |
print("[ERROR] A critical thread has terminated unexpectedly. Setting stop event.") | |
stop_event.set() # Signal other threads to stop | |
break | |
time.sleep(1) | |
except KeyboardInterrupt: | |
if not args.quiet: | |
print("\n[STOP] Keyboard interrupt received. Shutting down gracefully...") | |
stop_event.set() | |
except Exception as e: # Catch any other unexpected errors in main | |
if not args.quiet: | |
print(f"\n[FATAL] Unexpected error in main: {e}. Shutting down...") | |
stop_event.set() | |
if not args.quiet: print("[INFO] Waiting for threads to complete...") | |
if reader_thread_obj.is_alive(): reader_thread_obj.join(timeout=5) | |
if sender_thread_obj.is_alive(): sender_thread_obj.join(timeout=5) | |
if not args.quiet: | |
if reader_thread_obj.is_alive(): print("[WARN] Reader thread did not exit cleanly.") | |
if sender_thread_obj.is_alive(): print("[WARN] Sender thread did not exit cleanly.") | |
print("[EXIT] Program terminated.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment