Skip to content

Instantly share code, notes, and snippets.

@dzid26
Last active June 17, 2025 16:09
Show Gist options
  • Save dzid26/52c7c88e0dc46c6e6e2d6bc3003aff9f to your computer and use it in GitHub Desktop.
Save dzid26/52c7c88e0dc46c6e6e2d6bc3003aff9f to your computer and use it in GitHub Desktop.
Move event filters. Added curve fitting.
#!/usr/bin/env python3
import argparse
import os
import pickle
from aiohttp import Fingerprint
import numpy as np
import matplotlib.pyplot as plt
from functools import partial
from tqdm import tqdm
from typing import NamedTuple
from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.locationd.models.pose_kf import EARTH_G
MIN_STEER_SPEED = 0.01
MAX_STEER_SPEED = 11
RLOG_MIN_LAT_ACTIVE = 50
RLOG_MIN_STEERING_UNPRESSED = 50
CACHE_DIR = os.path.expanduser("~/.cache/openpilot_torque_lat_accel")
os.makedirs(CACHE_DIR, exist_ok=True)
QLOG_DECIMATION = 10
class Event(NamedTuple):
lateral_accel: float
out_torque: float # Added torque value
torque_act: float
speed: float
roll: float
timestamp: float # relative to start of route (s)
steeringAngleDeg: float
steeringRateDeg: float
def find_events(lr: LogReader, extrapolate: bool = False, qlog: bool = False) -> list[Event]:
min_lat_active = RLOG_MIN_LAT_ACTIVE // QLOG_DECIMATION if qlog else RLOG_MIN_LAT_ACTIVE
min_steering_unpressed = RLOG_MIN_STEERING_UNPRESSED // QLOG_DECIMATION if qlog else RLOG_MIN_STEERING_UNPRESSED
events = []
# state tracking
steering_unpressed = 0 # frames
lat_active = 0 # frames
# current state
curvature = 0
v_ego = 0
roll = 0
out_torque = 0
torque_act = 0
steeringAngleDeg = 0
steeringRateDeg = 0
start_ts = 0
for msg in lr:
if msg.which() == 'carControl':
if start_ts == 0:
start_ts = msg.logMonoTime
lat_active = lat_active + 1 if msg.carControl.latActive else 0
elif msg.which() == 'carOutput':
out_torque = msg.carOutput.actuatorsOutput.torque
elif msg.which() == 'carState':
steering_unpressed = steering_unpressed + 1 if not msg.carState.steeringPressed else 0
v_ego = msg.carState.vEgo
torque_act = msg.carState.steeringTorqueEps
steeringAngleDeg = msg.carState.steeringAngleDeg
steeringRateDeg = msg.carState.steeringRateDeg
elif msg.which() == 'controlsState':
curvature = msg.controlsState.curvature
elif msg.which() == 'liveParameters':
roll = msg.liveParameters.roll
if lat_active > min_lat_active and steering_unpressed > min_steering_unpressed and v_ego > 5:
current_lateral_accel = (curvature * v_ego ** 2) - roll * EARTH_G
# Store actual output torque in event
events.append(Event(current_lateral_accel, out_torque, torque_act, v_ego, roll, round((msg.logMonoTime - start_ts) * 1e-9, 2),
steeringAngleDeg, steeringRateDeg))
# print(events[-1])
return events
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Find max lateral acceleration events",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("route", nargs='+', help="One or more route names. If not specified, a default route is used.")
parser.add_argument("-e", "--extrapolate", action="store_true", help="Extrapolates max lateral acceleration events linearly. " +
"This option can be far less accurate.")
args = parser.parse_args()
all_events = []
fingerprints = []
for route in tqdm(args.route):
# Generate a cache filename based on the route name
cache_filename = os.path.join(CACHE_DIR, route.replace('/', '_') + ".pkl")
try:
if os.path.exists(cache_filename):
print(f"Loading cached events for {route}...")
with open(cache_filename, 'rb') as f:
fingerprint, events = pickle.load(f)
else:
print(f"Processing route {route}...")
lr = LogReader(route, sort_by_time=True)
fingerprint = lr.first('carParams').carFingerprint
qlog = route.endswith('/q')
if qlog:
print('WARNING: Treating route as qlog!')
print('Finding events...')
events = lr.run_across_segments(8, partial(find_events, extrapolate=args.extrapolate, qlog=qlog), disable_tqdm=True)
with open(cache_filename, 'wb') as f:
pickle.dump((fingerprint, events), f)
all_events.extend(events)
fingerprints.append(fingerprint)
except Exception as e:
print(f'Skipping {route} due to error: {e}')
if os.path.exists(cache_filename): # remove potentially corrupted cache file
os.remove(cache_filename)
if (len(set(fingerprints)) > 1):
print(f"WARNING: Mismatched fingerprints: {', '.join(set(fingerprints))}")
print(f'\nFound {len(all_events)} events in total')
## % Processing
# Extract torque and acceleration values
torques = [ev.out_torque for ev in all_events]
accels = [ev.lateral_accel for ev in all_events]
steeringAngles = [ev.steeringAngleDeg for ev in all_events]
steeringRates = [ev.steeringRateDeg for ev in all_events]
torques_filtered = []
accels_filtered = []
for angle, rate, torque, accel in zip(steeringAngles, steeringRates, torques, accels):
# steering driven slowly:
condition = (MIN_STEER_SPEED < rate < MAX_STEER_SPEED) if angle > 0 else (-MIN_STEER_SPEED > rate > -MAX_STEER_SPEED)
if condition:
torques_filtered.append(torque)
accels_filtered.append(accel)
print(f'Found {len(torques_filtered)} events after filtering')
torques = torques_filtered
accels = accels_filtered
## Plotting
plt.ion()
plt.clf()
print(fingerprints)
plt.suptitle(f'{fingerprints[0]} - Output Torque vs Lateral Acceleration')
plt.title(', '.join(args.route))
# Plot lateral acceleration vs output torque
plt.scatter(torques, accels, label='Events', s=.1, alpha=0.1)
# Fit a line (1st degree polynomial) to the data
if len(torques) > 1 and len(accels) > 1:
coeffs = np.polyfit(torques, accels, 1)
poly = np.poly1d(coeffs)
plt.plot(torques, poly(torques), color='red', label=f'Fit: LAT_ACCEL_FACTOR={coeffs[0]:.2f}, latAccelOffset={coeffs[1]:.2f}')
# Add grid and reference lines
plt.grid(True, linestyle='--', alpha=0.7)
plt.axhline(0, color='black', linewidth=0.5)
plt.axvline(0, color='black', linewidth=0.5)
plt.xlim(min(-1, min(torques)), max(1, max(torques)))
plt.ylim(-3, 3)
plt.xlabel('Output Steer')
plt.ylabel('Lateral Acceleration (m/s²)')
plt.legend(loc='upper left')
plt.tight_layout()
plt.show(block=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment