Last active
August 29, 2015 14:09
-
-
Save z-------------/fa7ee2bba8195fc668c9 to your computer and use it in GitHub Desktop.
Decode latitude/longitude data from CPR-encoded hex strings
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import math | |
| def nround(n): | |
| if n % 0.5 == 0: | |
| return math.ceil(n) | |
| else: | |
| return round(n) | |
| def hex2bin(hex_str): | |
| bin_str = bin(int(hex_str, 16))[2:] | |
| return(bin_str.zfill(math.ceil(len(bin_str)/2)*2)) | |
| def NL(lat): | |
| # this is a mess, dammit faa | |
| return(nround(2*math.pi*(math.acos(1 - (1-math.cos(math.pi/(2*15)))/(math.cos((math.pi*abs(lat))/180))))**(-1))) | |
| def extract_latlon_b(bin_str): | |
| latlon = { | |
| "lat": None, | |
| "lon": None | |
| } | |
| latlon["lat"] = bin_str[22:39] | |
| latlon["lon"] = bin_str[39:56] | |
| return(latlon) | |
| def calc_latlon(cpr1, cpr2): | |
| frame_1 = cpr1 | |
| frame_2 = cpr2 | |
| frame_1_latlon_b = extract_latlon_b(hex2bin(frame_1)) | |
| frame_2_latlon_b = extract_latlon_b(hex2bin(frame_2)) | |
| lat0 = int(frame_1_latlon_b["lat"], 2) | |
| lat1 = int(frame_2_latlon_b["lat"], 2) | |
| lon0 = int(frame_1_latlon_b["lon"], 2) | |
| lon1 = int(frame_2_latlon_b["lon"], 2) | |
| j = int(((59 * lat0 - 60 * lat1) / 131072) + 0.5) | |
| rlat0 = 6 * (j % 60 + lat0 / 131072) | |
| rlat1 = (360 / 59) * (j % 59 + lat1 / 131072) | |
| nl0 = NL(rlat0) | |
| nl1 = NL(rlat1) | |
| ni = max(1, nl1-1) | |
| dlon1 = 360 / ni | |
| M = nround((((lon0 * (nl1 - 1)) - (lon1 * nl1)) / 131072) + 0.5) | |
| lon = dlon1 * (M % ni + lon1 / 131072) | |
| return([rlat1, lon]) | |
| def analyse_stream(stream): | |
| for frame in stream: | |
| frame = frame[1:] | |
| first_byte = frame[0:2] | |
| if len(frame) == 28 and hex2bin(first_byte)[0:5] == "10001": # check that it has correct length and downlink format (DF) = 17 | |
| icao = frame[2:8] | |
| if not icao in craft_frames: | |
| craft_frames[icao] = [None] * 2 # empty list of length 2 | |
| # substring the useful part | |
| cpr_data = frame[8:-6] | |
| # extract type code (TC) | |
| tc_hex = cpr_data[0:2] | |
| tc_bin = hex2bin(tc_hex)[0:5] | |
| if tc_bin == "01011": # check that TC is 11 | |
| # extract flag | |
| flag = int(hex2bin(cpr_data)[21]) | |
| craft_frames[icao][flag] = cpr_data | |
| for icao in craft_frames: | |
| if craft_frames[icao][0] and craft_frames[icao][1]: | |
| print(icao + ": " + str(calc_latlon(craft_frames[icao][0], craft_frames[icao][1]))) | |
| craft_frames = {} | |
| ip = "INSERT_IP_HERE" | |
| port = INSERT_PORT_HERE | |
| client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| client_socket.connect((ip, port)) | |
| while True: | |
| data = client_socket.recv(512).decode("utf-8") | |
| analyse_stream(data.split(";\r\n")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment