Skip to content

Instantly share code, notes, and snippets.

@mpentler
Last active September 21, 2025 20:43
Show Gist options
  • Select an option

  • Save mpentler/514ae0766e77ac7510b36b225c6bc296 to your computer and use it in GitHub Desktop.

Select an option

Save mpentler/514ae0766e77ac7510b36b225c6bc296 to your computer and use it in GitHub Desktop.
TCAS RA analysis
#!/usr/bin/env python3
import os
import sys
import gzip
import json
import pandas as pd
from datetime import datetime, timedelta
from shapely.geometry import shape, MultiPoint, mapping
from collections import Counter
import argparse
import numpy as np
from sklearn.cluster import DBSCAN
# ------------------------ FIR LOADING ------------------------ #
def load_fir_data(fir_data_file):
with open(fir_data_file, 'r') as file:
geojson_data = json.load(file)
fir_regions = []
if geojson_data.get("type") == "FeatureCollection":
for feature in geojson_data.get("features", []):
if feature.get("type") == "Feature":
geometry = feature.get("geometry")
properties = feature.get("properties", {})
if geometry and geometry.get("type") in ["Polygon", "MultiPolygon"]:
polygon = shape(geometry)
fir_regions.append({
"name": properties.get("AV_NAME", "Unnamed FIR"),
"polygon": polygon
})
return fir_regions
# ------------------------ AIRCRAFT DB LOADING ------------------------ #
def load_aircraft_db(db_path="/usr/local/share/tar1090/aircraft.csv.gz"):
print(f"Loading aircraft database from: {db_path}")
aircraft_info = {}
if not os.path.exists(db_path):
return aircraft_info
try:
with gzip.open(db_path, 'rt') as f:
for line in f:
if not line.strip():
continue
parts = line.strip().split(";")
if len(parts) < 7:
continue
hex_code = parts[0].upper()
reg = parts[1] or ""
type_code = parts[2] or ""
long_type = parts[4] or ""
owner = parts[6] or ""
aircraft_info[hex_code] = {
"reg": reg,
"owner": owner,
"type_code": type_code,
"long_type": long_type
}
except Exception as e:
print(f"❌ Error reading aircraft DB: {e}")
return aircraft_info
# ------------------------ DATE RANGE ------------------------ #
def date_range(start_date, end_date):
current = start_date
while current <= end_date:
yield current
current += timedelta(days=1)
# ------------------------ CSV PROCESSING ------------------------ #
SESSION_TIMEOUT = 60 # seconds to auto-close RA session
# COOLDOWN removed per user's request
def _normalize_icao(code):
if not code or pd.isna(code):
return "UNKNOWN"
return str(code).strip().upper()
def _parse_timestamp(event_date, timestamp_str):
# normalize to datetime; accept ms or not
try:
return datetime.strptime(f"{event_date} {timestamp_str}", "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
return datetime.strptime(f"{event_date} {timestamp_str}", "%Y-%m-%d %H:%M:%S")
def process_adsb_file(file_path, fir_regions, search_hex=None):
"""
Returns: encounters_list, coords_list
Each encounter is a dict:
{
'icao1': ...,
'icao2': ...,
'date': 'YYYY-MM-DD',
'start_time': 'HH:MM:SS' (or with .fff if present),
'end_time': 'HH:MM:SS',
'last_seen': datetime object,
'latitude': float,
'longitude': float
}
"""
if not os.path.exists(file_path):
# print for debugging; caller will handle
#print(f"Looking for file: {file_path}\n❌ File not found")
return [], []
try:
with gzip.open(file_path, 'rt') as f:
df = pd.read_csv(f, low_memory=False)
if df.empty:
#print(f"❌ {file_path} is empty, skipping")
return [], []
except Exception as e:
#print(f"❌ Error reading {file_path}: {e}")
return [], []
encounters = []
coords = []
active_sessions = {}
for idx, row in df.iterrows():
# defensive checks
if len(row) < 9:
continue
try:
event_date = str(row.iloc[0]).strip()
timestamp = str(row.iloc[1]).strip()
icao1_raw = row.iloc[2]
latitude = float(row.iloc[7])
longitude = float(row.iloc[8])
except Exception:
continue
# message may be in column 22 if present
message = str(row.iloc[22]).strip() if len(row) > 22 and pd.notna(row.iloc[22]) else ""
tidh_index = message.find("TIDh: ")
icao2_candidate = (message[tidh_index+6:tidh_index+12].strip().upper() if tidh_index != -1 else "")
# normalize icao codes
icao1 = _normalize_icao(icao1_raw)
icao2 = _normalize_icao(icao2_candidate)
# construct canonical pair key (sorted) - this ensures pair symmetry
pair_key = "-".join(sorted([icao1, icao2]))
# parse timestamp into datetime for comparisons
try:
now_ts = _parse_timestamp(event_date, timestamp)
except Exception:
# if timestamp is unreadable, skip
continue
# ---------- RA SESSION LOGIC ----------
# "Clear of Conflict" ends the active session (if present)
if "Clear of Conflict" in message:
if pair_key in active_sessions:
session = active_sessions.pop(pair_key)
# put end time as the timestamp of clear message (string)
session['end_time'] = timestamp
# keep last_seen as datetime for possible post-processing
encounters.append(session)
# If no active session, ignore clear (could be duplicate clear)
continue
# No cooldown checks (removed) — every message can start/extend session unless timeout closes it
if pair_key in active_sessions:
# check timeout since last_seen
last_ts = active_sessions[pair_key]['last_seen']
if (now_ts - last_ts).total_seconds() > SESSION_TIMEOUT:
# treat as ended due to timeout, store old session
session = active_sessions.pop(pair_key)
session['end_time'] = last_ts.strftime("%H:%M:%S")
encounters.append(session)
# start a new session for this message
active_sessions[pair_key] = {
'icao1': icao1,
'icao2': icao2,
'date': event_date,
'start_time': timestamp,
'last_seen': now_ts,
'latitude': latitude,
'longitude': longitude
}
else:
# continue existing session: update last_seen and optionally location
active_sessions[pair_key]['last_seen'] = now_ts
# update lat/lon to most recent (approx)
active_sessions[pair_key]['latitude'] = latitude
active_sessions[pair_key]['longitude'] = longitude
else:
# start new session
active_sessions[pair_key] = {
'icao1': icao1,
'icao2': icao2,
'date': event_date,
'start_time': timestamp,
'last_seen': now_ts,
'latitude': latitude,
'longitude': longitude
}
# close any remaining active sessions (use last_seen as end)
for session in active_sessions.values():
session['end_time'] = session['last_seen'].strftime("%H:%M:%S")
encounters.append(session)
# coords list for hotspot generation
for enc in encounters:
coords.append((enc['longitude'], enc['latitude']))
#print(f"✅ Processed {len(df)} rows, {len(encounters)} RA sessions in file {file_path}")
return encounters, coords
# ------------------------ HOTSPOT GENERATION ------------------------ #
def generate_hotspot(coords, eps=0.01, min_samples=5, simplify_tol=0.01):
if not coords:
print("No coordinates for hotspot generation")
return None, []
X = np.array(coords)
clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(X)
labels = clustering.labels_
clusters = [X[labels == lbl] for lbl in set(labels) if lbl != -1]
if not clusters:
print("No clusters found")
return None, []
largest_cluster = max(clusters, key=lambda c: len(c))
hull = MultiPoint(largest_cluster).convex_hull
hull = hull.simplify(simplify_tol, preserve_topology=True)
geojson = {
"type": "FeatureCollection",
"features": [ {
"type": "Feature",
"properties": {"name": "TCAS RA Hotspot"},
"geometry": mapping(hull)
} ]
}
return geojson, largest_cluster
# ------------------------ DEDUPE UTIL ------------------------ #
def _normalize_time_str(ts_str):
# convert times like "HH:MM:SS(.fff)" to a canonical second-resolution string
try:
dt = datetime.strptime(ts_str, "%H:%M:%S.%f")
except ValueError:
try:
dt = datetime.strptime(ts_str, "%H:%M:%S")
except ValueError:
return ts_str # return as-is if unparsable
return dt.strftime("%H:%M:%S.%f")[:-3] # keep milliseconds if present (trim micro->ms)
def dedupe_encounters(enc_list):
"""
Deduplicate based on canonical pair key + date + normalized start + normalized end.
Returns list of unique encounters (first occurrence kept).
"""
seen = set()
unique = []
for enc in enc_list:
a = _normalize_icao(enc.get('icao1'))
b = _normalize_icao(enc.get('icao2'))
pair_key = "-".join(sorted([a, b]))
date = enc.get('date', '')
start = _normalize_time_str(enc.get('start_time', ''))
end = _normalize_time_str(enc.get('end_time', '')) if enc.get('end_time') else ''
dedupe_key = (pair_key, date, start, end)
if dedupe_key not in seen:
seen.add(dedupe_key)
# Normalize the stored encounter to stable values
normalized = {
'icao1': a,
'icao2': b,
'date': date,
'start_time': start,
'end_time': end,
'latitude': enc.get('latitude'),
'longitude': enc.get('longitude')
}
unique.append(normalized)
return unique
# ------------------------ MAIN ------------------------ #
def main():
parser = argparse.ArgumentParser(description="TCAS-RA Analyzer")
parser.add_argument("-s", "--search", action="store_true", help="Search for a specific ICAO hex")
parser.add_argument("-c", "--count", action="store_true", help="Show top aircraft by number of encounters")
parser.add_argument("-z", "--hotspot", action="store_true", help="Generate TCAS RA hotspot polygon")
parser.add_argument("-t", "--total", action="store_true", help="Show total number of TCAS RA events")
parser.add_argument("--eps", type=float, default=0.01, help="DBSCAN eps value for hotspot clustering")
parser.add_argument("--min-samples", type=int, default=5, help="DBSCAN min_samples for hotspot clustering")
parser.add_argument("--simplify", type=float, default=0.01, help="Simplify tolerance for output polygon")
parser.add_argument("--bbox", nargs=4, type=float, metavar=('XMIN', 'YMIN', 'XMAX', 'YMAX'),
help="Bounding box: xmin ymin xmax ymax", default=None)
args = parser.parse_args()
fir_file = os.path.join(os.path.dirname(__file__), "fir_boundaries.geojson")
fir_regions = []
if os.path.exists(fir_file):
try:
fir_regions = load_fir_data(fir_file)
except Exception:
fir_regions = []
print(f"Loaded {len(fir_regions)} FIR regions")
aircraft_db = load_aircraft_db()
search_hex = None
if args.search:
search_hex = input("Enter ICAO hex to search: ").strip().upper()
start_date_str = input("Enter start date (YYYY-MM-DD): ").strip()
end_date_str = input("Enter end date (YYYY-MM-DD): ").strip()
base_dir = input("Enter base directory (default: /var/globe_history): ").strip() or "/var/globe_history"
try:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
except ValueError:
print("Invalid date format. Use YYYY-MM-DD.")
return
all_encounters = []
all_coords = []
print(f"➡️ Processing files from {start_date} to {end_date} in {base_dir}")
for single_date in date_range(start_date, end_date):
path = os.path.join(base_dir, single_date.strftime("%Y/%m/%d/acas/acas.csv.gz"))
encounters, coords = process_adsb_file(path, fir_regions, search_hex)
all_encounters.extend(encounters)
all_coords.extend(coords)
# Deduplicate encounters globally (this prevents double counting the same event
# if it was reported twice by both aircraft or duplicated across files)
unique_encounters = dedupe_encounters(all_encounters)
# ------------------------ APPLY BBOX FILTER GLOBALLY ------------------------ #
filtered_encounters = unique_encounters
filtered_coords = all_coords
if args.bbox:
min_lon, min_lat, max_lon, max_lat = args.bbox
filtered_encounters = [
enc for enc in unique_encounters
if enc['longitude'] is not None and enc['latitude'] is not None
and min_lon <= enc['longitude'] <= max_lon and min_lat <= enc['latitude'] <= max_lat
]
filtered_coords = [
(lon, lat) for lon, lat in all_coords
if min_lon <= lon <= max_lon and min_lat <= lat <= max_lat
]
# ------------------------ SEARCH OUTPUT ------------------------ #
if search_hex:
# only include encounters involving that hex
matched = [enc for enc in filtered_encounters if search_hex in (enc['icao1'], enc['icao2'])]
if matched:
print(f"\n➡️ Found {len(matched)} encounters involving {search_hex}")
for enc in matched:
print(f"Encounter {enc['icao1']} vs {enc['icao2']} on {enc['date']} "
f"{enc['start_time']}-{enc['end_time']} "
f"(approx {enc['latitude']:.5f}, {enc['longitude']:.5f})")
else:
print(f"➡️ No encounters found for {search_hex}")
# ------------------------ TOTAL EVENTS ------------------------ #
if args.total:
print(f"\n➡️ Total TCAS RA events from {start_date} to {end_date}: {len(filtered_encounters)}")
# ------------------------ TOP COUNTS ------------------------ #
if args.count:
filtered_counter = Counter()
# For each unique event, count +1 for each non-UNKNOWN aircraft (and also count UNKNOWN once if present)
for enc in filtered_encounters:
a = enc['icao1']
b = enc['icao2']
# if both UNKNOWN, count "UNKNOWN" once
if a == "UNKNOWN" and b == "UNKNOWN":
filtered_counter["UNKNOWN"] += 1
else:
if a != "UNKNOWN":
filtered_counter[a] += 1
if b != "UNKNOWN":
filtered_counter[b] += 1
print(f"\n➡️ Top 10 aircraft by number of encounters (including UNKNOWN bucket) from {start_date} to {end_date}:")
for icao, count in filtered_counter.most_common(10):
info = aircraft_db.get(icao, {})
reg = info.get("reg", "")
owner = info.get("owner", "")
long_type = info.get("long_type", "")
type_code = info.get("type_code", "")
type_str = f"{long_type} ({type_code})" if long_type or type_code else ""
print(f"{icao}/{reg}: {count} encounter(s) {owner} | {type_str}")
# ------------------------ HOTSPOT ------------------------ #
if args.hotspot and filtered_coords:
geojson, hotspot_points = generate_hotspot(filtered_coords, eps=args.eps, min_samples=args.min_samples, simplify_tol=args.simplify)
if geojson:
out_file = f"tcas_hotspot_{start_date}_{end_date}.geojson"
with open(out_file, "w") as f:
json.dump(geojson, f, indent=2)
print(f"\n➡️ Hotspot GeoJSON saved to: {out_file}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment