Last active
September 21, 2025 20:43
-
-
Save mpentler/514ae0766e77ac7510b36b225c6bc296 to your computer and use it in GitHub Desktop.
TCAS RA analysis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os | |
| import sys | |
| import gzip | |
| import json | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from shapely.geometry import shape, MultiPoint, mapping | |
| from collections import Counter | |
| import argparse | |
| import numpy as np | |
| from sklearn.cluster import DBSCAN | |
| # ------------------------ FIR LOADING ------------------------ # | |
| def load_fir_data(fir_data_file): | |
| with open(fir_data_file, 'r') as file: | |
| geojson_data = json.load(file) | |
| fir_regions = [] | |
| if geojson_data.get("type") == "FeatureCollection": | |
| for feature in geojson_data.get("features", []): | |
| if feature.get("type") == "Feature": | |
| geometry = feature.get("geometry") | |
| properties = feature.get("properties", {}) | |
| if geometry and geometry.get("type") in ["Polygon", "MultiPolygon"]: | |
| polygon = shape(geometry) | |
| fir_regions.append({ | |
| "name": properties.get("AV_NAME", "Unnamed FIR"), | |
| "polygon": polygon | |
| }) | |
| return fir_regions | |
| # ------------------------ AIRCRAFT DB LOADING ------------------------ # | |
| def load_aircraft_db(db_path="/usr/local/share/tar1090/aircraft.csv.gz"): | |
| print(f"Loading aircraft database from: {db_path}") | |
| aircraft_info = {} | |
| if not os.path.exists(db_path): | |
| return aircraft_info | |
| try: | |
| with gzip.open(db_path, 'rt') as f: | |
| for line in f: | |
| if not line.strip(): | |
| continue | |
| parts = line.strip().split(";") | |
| if len(parts) < 7: | |
| continue | |
| hex_code = parts[0].upper() | |
| reg = parts[1] or "" | |
| type_code = parts[2] or "" | |
| long_type = parts[4] or "" | |
| owner = parts[6] or "" | |
| aircraft_info[hex_code] = { | |
| "reg": reg, | |
| "owner": owner, | |
| "type_code": type_code, | |
| "long_type": long_type | |
| } | |
| except Exception as e: | |
| print(f"❌ Error reading aircraft DB: {e}") | |
| return aircraft_info | |
| # ------------------------ DATE RANGE ------------------------ # | |
| def date_range(start_date, end_date): | |
| current = start_date | |
| while current <= end_date: | |
| yield current | |
| current += timedelta(days=1) | |
| # ------------------------ CSV PROCESSING ------------------------ # | |
| SESSION_TIMEOUT = 60 # seconds to auto-close RA session | |
| # COOLDOWN removed per user's request | |
| def _normalize_icao(code): | |
| if not code or pd.isna(code): | |
| return "UNKNOWN" | |
| return str(code).strip().upper() | |
| def _parse_timestamp(event_date, timestamp_str): | |
| # normalize to datetime; accept ms or not | |
| try: | |
| return datetime.strptime(f"{event_date} {timestamp_str}", "%Y-%m-%d %H:%M:%S.%f") | |
| except ValueError: | |
| return datetime.strptime(f"{event_date} {timestamp_str}", "%Y-%m-%d %H:%M:%S") | |
| def process_adsb_file(file_path, fir_regions, search_hex=None): | |
| """ | |
| Returns: encounters_list, coords_list | |
| Each encounter is a dict: | |
| { | |
| 'icao1': ..., | |
| 'icao2': ..., | |
| 'date': 'YYYY-MM-DD', | |
| 'start_time': 'HH:MM:SS' (or with .fff if present), | |
| 'end_time': 'HH:MM:SS', | |
| 'last_seen': datetime object, | |
| 'latitude': float, | |
| 'longitude': float | |
| } | |
| """ | |
| if not os.path.exists(file_path): | |
| # print for debugging; caller will handle | |
| #print(f"Looking for file: {file_path}\n❌ File not found") | |
| return [], [] | |
| try: | |
| with gzip.open(file_path, 'rt') as f: | |
| df = pd.read_csv(f, low_memory=False) | |
| if df.empty: | |
| #print(f"❌ {file_path} is empty, skipping") | |
| return [], [] | |
| except Exception as e: | |
| #print(f"❌ Error reading {file_path}: {e}") | |
| return [], [] | |
| encounters = [] | |
| coords = [] | |
| active_sessions = {} | |
| for idx, row in df.iterrows(): | |
| # defensive checks | |
| if len(row) < 9: | |
| continue | |
| try: | |
| event_date = str(row.iloc[0]).strip() | |
| timestamp = str(row.iloc[1]).strip() | |
| icao1_raw = row.iloc[2] | |
| latitude = float(row.iloc[7]) | |
| longitude = float(row.iloc[8]) | |
| except Exception: | |
| continue | |
| # message may be in column 22 if present | |
| message = str(row.iloc[22]).strip() if len(row) > 22 and pd.notna(row.iloc[22]) else "" | |
| tidh_index = message.find("TIDh: ") | |
| icao2_candidate = (message[tidh_index+6:tidh_index+12].strip().upper() if tidh_index != -1 else "") | |
| # normalize icao codes | |
| icao1 = _normalize_icao(icao1_raw) | |
| icao2 = _normalize_icao(icao2_candidate) | |
| # construct canonical pair key (sorted) - this ensures pair symmetry | |
| pair_key = "-".join(sorted([icao1, icao2])) | |
| # parse timestamp into datetime for comparisons | |
| try: | |
| now_ts = _parse_timestamp(event_date, timestamp) | |
| except Exception: | |
| # if timestamp is unreadable, skip | |
| continue | |
| # ---------- RA SESSION LOGIC ---------- | |
| # "Clear of Conflict" ends the active session (if present) | |
| if "Clear of Conflict" in message: | |
| if pair_key in active_sessions: | |
| session = active_sessions.pop(pair_key) | |
| # put end time as the timestamp of clear message (string) | |
| session['end_time'] = timestamp | |
| # keep last_seen as datetime for possible post-processing | |
| encounters.append(session) | |
| # If no active session, ignore clear (could be duplicate clear) | |
| continue | |
| # No cooldown checks (removed) — every message can start/extend session unless timeout closes it | |
| if pair_key in active_sessions: | |
| # check timeout since last_seen | |
| last_ts = active_sessions[pair_key]['last_seen'] | |
| if (now_ts - last_ts).total_seconds() > SESSION_TIMEOUT: | |
| # treat as ended due to timeout, store old session | |
| session = active_sessions.pop(pair_key) | |
| session['end_time'] = last_ts.strftime("%H:%M:%S") | |
| encounters.append(session) | |
| # start a new session for this message | |
| active_sessions[pair_key] = { | |
| 'icao1': icao1, | |
| 'icao2': icao2, | |
| 'date': event_date, | |
| 'start_time': timestamp, | |
| 'last_seen': now_ts, | |
| 'latitude': latitude, | |
| 'longitude': longitude | |
| } | |
| else: | |
| # continue existing session: update last_seen and optionally location | |
| active_sessions[pair_key]['last_seen'] = now_ts | |
| # update lat/lon to most recent (approx) | |
| active_sessions[pair_key]['latitude'] = latitude | |
| active_sessions[pair_key]['longitude'] = longitude | |
| else: | |
| # start new session | |
| active_sessions[pair_key] = { | |
| 'icao1': icao1, | |
| 'icao2': icao2, | |
| 'date': event_date, | |
| 'start_time': timestamp, | |
| 'last_seen': now_ts, | |
| 'latitude': latitude, | |
| 'longitude': longitude | |
| } | |
| # close any remaining active sessions (use last_seen as end) | |
| for session in active_sessions.values(): | |
| session['end_time'] = session['last_seen'].strftime("%H:%M:%S") | |
| encounters.append(session) | |
| # coords list for hotspot generation | |
| for enc in encounters: | |
| coords.append((enc['longitude'], enc['latitude'])) | |
| #print(f"✅ Processed {len(df)} rows, {len(encounters)} RA sessions in file {file_path}") | |
| return encounters, coords | |
| # ------------------------ HOTSPOT GENERATION ------------------------ # | |
| def generate_hotspot(coords, eps=0.01, min_samples=5, simplify_tol=0.01): | |
| if not coords: | |
| print("No coordinates for hotspot generation") | |
| return None, [] | |
| X = np.array(coords) | |
| clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(X) | |
| labels = clustering.labels_ | |
| clusters = [X[labels == lbl] for lbl in set(labels) if lbl != -1] | |
| if not clusters: | |
| print("No clusters found") | |
| return None, [] | |
| largest_cluster = max(clusters, key=lambda c: len(c)) | |
| hull = MultiPoint(largest_cluster).convex_hull | |
| hull = hull.simplify(simplify_tol, preserve_topology=True) | |
| geojson = { | |
| "type": "FeatureCollection", | |
| "features": [ { | |
| "type": "Feature", | |
| "properties": {"name": "TCAS RA Hotspot"}, | |
| "geometry": mapping(hull) | |
| } ] | |
| } | |
| return geojson, largest_cluster | |
| # ------------------------ DEDUPE UTIL ------------------------ # | |
| def _normalize_time_str(ts_str): | |
| # convert times like "HH:MM:SS(.fff)" to a canonical second-resolution string | |
| try: | |
| dt = datetime.strptime(ts_str, "%H:%M:%S.%f") | |
| except ValueError: | |
| try: | |
| dt = datetime.strptime(ts_str, "%H:%M:%S") | |
| except ValueError: | |
| return ts_str # return as-is if unparsable | |
| return dt.strftime("%H:%M:%S.%f")[:-3] # keep milliseconds if present (trim micro->ms) | |
| def dedupe_encounters(enc_list): | |
| """ | |
| Deduplicate based on canonical pair key + date + normalized start + normalized end. | |
| Returns list of unique encounters (first occurrence kept). | |
| """ | |
| seen = set() | |
| unique = [] | |
| for enc in enc_list: | |
| a = _normalize_icao(enc.get('icao1')) | |
| b = _normalize_icao(enc.get('icao2')) | |
| pair_key = "-".join(sorted([a, b])) | |
| date = enc.get('date', '') | |
| start = _normalize_time_str(enc.get('start_time', '')) | |
| end = _normalize_time_str(enc.get('end_time', '')) if enc.get('end_time') else '' | |
| dedupe_key = (pair_key, date, start, end) | |
| if dedupe_key not in seen: | |
| seen.add(dedupe_key) | |
| # Normalize the stored encounter to stable values | |
| normalized = { | |
| 'icao1': a, | |
| 'icao2': b, | |
| 'date': date, | |
| 'start_time': start, | |
| 'end_time': end, | |
| 'latitude': enc.get('latitude'), | |
| 'longitude': enc.get('longitude') | |
| } | |
| unique.append(normalized) | |
| return unique | |
| # ------------------------ MAIN ------------------------ # | |
| def main(): | |
| parser = argparse.ArgumentParser(description="TCAS-RA Analyzer") | |
| parser.add_argument("-s", "--search", action="store_true", help="Search for a specific ICAO hex") | |
| parser.add_argument("-c", "--count", action="store_true", help="Show top aircraft by number of encounters") | |
| parser.add_argument("-z", "--hotspot", action="store_true", help="Generate TCAS RA hotspot polygon") | |
| parser.add_argument("-t", "--total", action="store_true", help="Show total number of TCAS RA events") | |
| parser.add_argument("--eps", type=float, default=0.01, help="DBSCAN eps value for hotspot clustering") | |
| parser.add_argument("--min-samples", type=int, default=5, help="DBSCAN min_samples for hotspot clustering") | |
| parser.add_argument("--simplify", type=float, default=0.01, help="Simplify tolerance for output polygon") | |
| parser.add_argument("--bbox", nargs=4, type=float, metavar=('XMIN', 'YMIN', 'XMAX', 'YMAX'), | |
| help="Bounding box: xmin ymin xmax ymax", default=None) | |
| args = parser.parse_args() | |
| fir_file = os.path.join(os.path.dirname(__file__), "fir_boundaries.geojson") | |
| fir_regions = [] | |
| if os.path.exists(fir_file): | |
| try: | |
| fir_regions = load_fir_data(fir_file) | |
| except Exception: | |
| fir_regions = [] | |
| print(f"Loaded {len(fir_regions)} FIR regions") | |
| aircraft_db = load_aircraft_db() | |
| search_hex = None | |
| if args.search: | |
| search_hex = input("Enter ICAO hex to search: ").strip().upper() | |
| start_date_str = input("Enter start date (YYYY-MM-DD): ").strip() | |
| end_date_str = input("Enter end date (YYYY-MM-DD): ").strip() | |
| base_dir = input("Enter base directory (default: /var/globe_history): ").strip() or "/var/globe_history" | |
| try: | |
| start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date() | |
| end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date() | |
| except ValueError: | |
| print("Invalid date format. Use YYYY-MM-DD.") | |
| return | |
| all_encounters = [] | |
| all_coords = [] | |
| print(f"➡️ Processing files from {start_date} to {end_date} in {base_dir}") | |
| for single_date in date_range(start_date, end_date): | |
| path = os.path.join(base_dir, single_date.strftime("%Y/%m/%d/acas/acas.csv.gz")) | |
| encounters, coords = process_adsb_file(path, fir_regions, search_hex) | |
| all_encounters.extend(encounters) | |
| all_coords.extend(coords) | |
| # Deduplicate encounters globally (this prevents double counting the same event | |
| # if it was reported twice by both aircraft or duplicated across files) | |
| unique_encounters = dedupe_encounters(all_encounters) | |
| # ------------------------ APPLY BBOX FILTER GLOBALLY ------------------------ # | |
| filtered_encounters = unique_encounters | |
| filtered_coords = all_coords | |
| if args.bbox: | |
| min_lon, min_lat, max_lon, max_lat = args.bbox | |
| filtered_encounters = [ | |
| enc for enc in unique_encounters | |
| if enc['longitude'] is not None and enc['latitude'] is not None | |
| and min_lon <= enc['longitude'] <= max_lon and min_lat <= enc['latitude'] <= max_lat | |
| ] | |
| filtered_coords = [ | |
| (lon, lat) for lon, lat in all_coords | |
| if min_lon <= lon <= max_lon and min_lat <= lat <= max_lat | |
| ] | |
| # ------------------------ SEARCH OUTPUT ------------------------ # | |
| if search_hex: | |
| # only include encounters involving that hex | |
| matched = [enc for enc in filtered_encounters if search_hex in (enc['icao1'], enc['icao2'])] | |
| if matched: | |
| print(f"\n➡️ Found {len(matched)} encounters involving {search_hex}") | |
| for enc in matched: | |
| print(f"Encounter {enc['icao1']} vs {enc['icao2']} on {enc['date']} " | |
| f"{enc['start_time']}-{enc['end_time']} " | |
| f"(approx {enc['latitude']:.5f}, {enc['longitude']:.5f})") | |
| else: | |
| print(f"➡️ No encounters found for {search_hex}") | |
| # ------------------------ TOTAL EVENTS ------------------------ # | |
| if args.total: | |
| print(f"\n➡️ Total TCAS RA events from {start_date} to {end_date}: {len(filtered_encounters)}") | |
| # ------------------------ TOP COUNTS ------------------------ # | |
| if args.count: | |
| filtered_counter = Counter() | |
| # For each unique event, count +1 for each non-UNKNOWN aircraft (and also count UNKNOWN once if present) | |
| for enc in filtered_encounters: | |
| a = enc['icao1'] | |
| b = enc['icao2'] | |
| # if both UNKNOWN, count "UNKNOWN" once | |
| if a == "UNKNOWN" and b == "UNKNOWN": | |
| filtered_counter["UNKNOWN"] += 1 | |
| else: | |
| if a != "UNKNOWN": | |
| filtered_counter[a] += 1 | |
| if b != "UNKNOWN": | |
| filtered_counter[b] += 1 | |
| print(f"\n➡️ Top 10 aircraft by number of encounters (including UNKNOWN bucket) from {start_date} to {end_date}:") | |
| for icao, count in filtered_counter.most_common(10): | |
| info = aircraft_db.get(icao, {}) | |
| reg = info.get("reg", "") | |
| owner = info.get("owner", "") | |
| long_type = info.get("long_type", "") | |
| type_code = info.get("type_code", "") | |
| type_str = f"{long_type} ({type_code})" if long_type or type_code else "" | |
| print(f"{icao}/{reg}: {count} encounter(s) {owner} | {type_str}") | |
| # ------------------------ HOTSPOT ------------------------ # | |
| if args.hotspot and filtered_coords: | |
| geojson, hotspot_points = generate_hotspot(filtered_coords, eps=args.eps, min_samples=args.min_samples, simplify_tol=args.simplify) | |
| if geojson: | |
| out_file = f"tcas_hotspot_{start_date}_{end_date}.geojson" | |
| with open(out_file, "w") as f: | |
| json.dump(geojson, f, indent=2) | |
| print(f"\n➡️ Hotspot GeoJSON saved to: {out_file}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment