Last active
January 21, 2024 08:05
-
-
Save kylemcdonald/6e860400656433f8a274d915626837d0 to your computer and use it in GitHub Desktop.
Parse the ADSBX heatmap files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import datetime | |
import re | |
from dataclasses import dataclass | |
def point_to_str(point): | |
hex = f"{point & 0xFFFFFF:06x}" | |
hex = ("~" + hex) if (point & 0x1000000) else hex | |
return hex | |
@dataclass | |
class Slice: | |
timestamp: datetime.datetime | |
callsigns: list | |
telemetry: list | |
@dataclass | |
class Callsign: | |
hex: str | |
flight: str | |
squawk: str | |
@dataclass | |
class Telemetry: | |
hex: str | |
type: str | |
lat: float | |
lon: float | |
alt: float | |
gs: float | |
# bbox format: [min lat, min lon, max lat, max lon] | |
def parse_heatmap(fn, bbox=None, return_callsigns=True): | |
""" | |
This parses the ADSBX heatmap format. | |
The beginning of the file has around 2KB of unknown data. | |
Then the format is a sequence of slices. | |
Each slice has a header of a 16-byte chunk: | |
- Bytes 1-4 (4 bytes) are the magic number 0x0E7F7C9D | |
- Bytes 5-12 (8 bytes) are a timestamp | |
- Bytes 13-16 (4 bytes) are an update interval. | |
Then a sequence of 16-byte chunks. This can either be a callsign, or telemetry. | |
A callsign is has a hex code, call sign, and squawk code. | |
Telemetry has a hex code, lat/lon, altitude, and ground speed. | |
- Bytes 0-4 (4 bytes) are the hex code and type. | |
- Bytes 5-8 (4 bytes) are the lat. | |
- Bytes 9-12 (4 bytes) are the lon. | |
- Bytes 13-16 (4 bytes) are the altitude and ground speed. | |
Callsign and telemetry are randomly interleaved within the slice. | |
""" | |
with open(fn, "rb") as f: | |
raw = f.read() | |
points_u8 = np.frombuffer(raw, dtype=np.uint8) | |
points_u = points_u8.view(np.uint32) | |
points = points_u8.view(np.int32) | |
if bbox is not None: | |
bbox = [bbox[0] * 1e6, bbox[1] * 1e6, bbox[2] * 1e6, bbox[3] * 1e6] | |
slices = [] | |
slice_begin_marker = 0x0E7F7C9D | |
type_list = [ | |
"adsb_icao", | |
"adsb_icao_nt", | |
"adsr_icao", | |
"tisb_icao", | |
"adsc", | |
"mlat", | |
"other", | |
"mode_s", | |
"adsb_other", | |
"adsr_other", | |
"tisb_trackfile", | |
"tisb_other", | |
"mode_ac", | |
] | |
data = [] | |
i = 0 | |
for i in range(len(points)): | |
if points[i] == slice_begin_marker: | |
break | |
while i < len(points): | |
callsigns = [] | |
telemetry = [] | |
now = points_u[i + 2] / 1000 + points_u[i + 1] * 4294967.296 # timestamp | |
ival = (points_u[i + 3] & 65535) / 1000 # update interval | |
timestamp = datetime.datetime.fromtimestamp(now) | |
i += 4 | |
while i < len(points) and points[i] != slice_begin_marker: | |
p0 = points[i] | |
p1 = points[i + 1] | |
p2 = points[i + 2] | |
# callsign data | |
if p1 > 1073741824: | |
if not return_callsigns: | |
i += 4 | |
continue | |
hex = point_to_str(p0) | |
flight = None | |
if points_u8[4 * (i + 2)] != 0: | |
flight = "".join( | |
chr(points_u8[4 * (i + 2) + j]) for j in range(8) | |
).strip() | |
squawk = str(p1 & 0xFFFF).zfill(4) | |
callsigns.append(Callsign(hex=hex, flight=flight, squawk=squawk)) | |
i += 4 | |
continue | |
lat = p1 | |
lon = p2 | |
if bbox is not None: | |
if lat < bbox[0] or lat > bbox[2] or lon < bbox[1] or lon > bbox[3]: | |
i += 4 | |
continue | |
# lat/lon in degrees | |
lat /= 1e6 | |
lon /= 1e6 | |
# hex code | |
hex = point_to_str(p0) | |
# hex code type: first 5 bits of 1st 32-bit int | |
type = p0 >> 27 & 0x1F | |
type = type_list[type] if type < len(type_list) else "unknown" | |
# final chunk of data | |
p3 = points[i + 3] | |
# barometric altitude in feet: first two bytes of 4th 32-bit int | |
alt = p3 & 65535 | |
if alt & 32768: | |
alt |= -65536 | |
if alt == -123: | |
alt = "ground" | |
else: | |
alt *= 25 | |
# ground speed in knots: second two bytes of 4th 32-bit int | |
gs = p3 >> 16 | |
if gs == -1: | |
gs = None | |
else: | |
gs /= 10 | |
telemetry.append(Telemetry(hex=hex, type=type, lat=lat, lon=lon, alt=alt, gs=gs)) | |
i += 4 | |
data.append(Slice(timestamp=timestamp, callsigns=callsigns, telemetry=telemetry)) | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment