Skip to content

Instantly share code, notes, and snippets.

@joaofig
Created December 3, 2024 14:40
Show Gist options
  • Save joaofig/4b36efbf934b7999182c11765c5f5617 to your computer and use it in GitHub Desktop.
Save joaofig/4b36efbf934b7999182c11765c5f5617 to your computer and use it in GitHub Desktop.
Main function of the GPS interpolation code
def main():
traj_df = load_trajectories()
traj_ids = traj_df["traj_id"].to_numpy()
update_schema()
for traj_id in tqdm(traj_ids):
polyline_str = load_polyline(int(traj_id))
if polyline_str is not None and len(polyline_str):
signals_df = load_trajectory_signals(int(traj_id))
unique_df = signals_df.drop_duplicates(subset=["match_latitude",
"match_longitude"],
keep="first")
polyline = np.array(decode_polyline(polyline_str, order="latlon"))
gps_segments = compute_gps_segments(unique_df, polyline)
locations: List[DeadReckon] = []
for gps_segment in gps_segments:
segment_dx = sum(gps_segment.distances)
seq_ini = gps_segment.points[0].seq
seq_end = gps_segment.points[-1].seq
# Extract the kinematics data for this segment
seg_df = signals_df[(signals_df["signal_id"] >= seq_ini) &
(signals_df["signal_id"] <= seq_end)]
if seg_df.shape[0] > 1:
# Infer the point in-between using kinematics
dxs = integrate_speeds(seg_df)
dx = np.sum(dxs)
if dx > 0.0:
r = segment_dx / dx
corrected_dxs = r * dxs
signal_ids = seg_df["signal_id"].to_numpy()
timestamps = seg_df["time_stamp"].to_numpy() / 1000.0
dr = dead_reckon(gps_segment,
corrected_dxs,
signal_ids,
timestamps)
locations.extend(dr)
if len(locations) > 1:
insert_dead_reckon(locations)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment