Last active
June 17, 2025 16:09
-
-
Save dzid26/52c7c88e0dc46c6e6e2d6bc3003aff9f to your computer and use it in GitHub Desktop.
Move event filters. Added curve fitting.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import os | |
import pickle | |
from aiohttp import Fingerprint | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from functools import partial | |
from tqdm import tqdm | |
from typing import NamedTuple | |
from openpilot.tools.lib.logreader import LogReader | |
from openpilot.selfdrive.locationd.models.pose_kf import EARTH_G | |
MIN_STEER_SPEED = 0.01 | |
MAX_STEER_SPEED = 11 | |
RLOG_MIN_LAT_ACTIVE = 50 | |
RLOG_MIN_STEERING_UNPRESSED = 50 | |
CACHE_DIR = os.path.expanduser("~/.cache/openpilot_torque_lat_accel") | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
QLOG_DECIMATION = 10 | |
class Event(NamedTuple): | |
lateral_accel: float | |
out_torque: float # Added torque value | |
torque_act: float | |
speed: float | |
roll: float | |
timestamp: float # relative to start of route (s) | |
steeringAngleDeg: float | |
steeringRateDeg: float | |
def find_events(lr: LogReader, extrapolate: bool = False, qlog: bool = False) -> list[Event]: | |
min_lat_active = RLOG_MIN_LAT_ACTIVE // QLOG_DECIMATION if qlog else RLOG_MIN_LAT_ACTIVE | |
min_steering_unpressed = RLOG_MIN_STEERING_UNPRESSED // QLOG_DECIMATION if qlog else RLOG_MIN_STEERING_UNPRESSED | |
events = [] | |
# state tracking | |
steering_unpressed = 0 # frames | |
lat_active = 0 # frames | |
# current state | |
curvature = 0 | |
v_ego = 0 | |
roll = 0 | |
out_torque = 0 | |
torque_act = 0 | |
steeringAngleDeg = 0 | |
steeringRateDeg = 0 | |
start_ts = 0 | |
for msg in lr: | |
if msg.which() == 'carControl': | |
if start_ts == 0: | |
start_ts = msg.logMonoTime | |
lat_active = lat_active + 1 if msg.carControl.latActive else 0 | |
elif msg.which() == 'carOutput': | |
out_torque = msg.carOutput.actuatorsOutput.torque | |
elif msg.which() == 'carState': | |
steering_unpressed = steering_unpressed + 1 if not msg.carState.steeringPressed else 0 | |
v_ego = msg.carState.vEgo | |
torque_act = msg.carState.steeringTorqueEps | |
steeringAngleDeg = msg.carState.steeringAngleDeg | |
steeringRateDeg = msg.carState.steeringRateDeg | |
elif msg.which() == 'controlsState': | |
curvature = msg.controlsState.curvature | |
elif msg.which() == 'liveParameters': | |
roll = msg.liveParameters.roll | |
if lat_active > min_lat_active and steering_unpressed > min_steering_unpressed and v_ego > 5: | |
current_lateral_accel = (curvature * v_ego ** 2) - roll * EARTH_G | |
# Store actual output torque in event | |
events.append(Event(current_lateral_accel, out_torque, torque_act, v_ego, roll, round((msg.logMonoTime - start_ts) * 1e-9, 2), | |
steeringAngleDeg, steeringRateDeg)) | |
# print(events[-1]) | |
return events | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Find max lateral acceleration events", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument("route", nargs='+', help="One or more route names. If not specified, a default route is used.") | |
parser.add_argument("-e", "--extrapolate", action="store_true", help="Extrapolates max lateral acceleration events linearly. " + | |
"This option can be far less accurate.") | |
args = parser.parse_args() | |
all_events = [] | |
fingerprints = [] | |
for route in tqdm(args.route): | |
# Generate a cache filename based on the route name | |
cache_filename = os.path.join(CACHE_DIR, route.replace('/', '_') + ".pkl") | |
try: | |
if os.path.exists(cache_filename): | |
print(f"Loading cached events for {route}...") | |
with open(cache_filename, 'rb') as f: | |
fingerprint, events = pickle.load(f) | |
else: | |
print(f"Processing route {route}...") | |
lr = LogReader(route, sort_by_time=True) | |
fingerprint = lr.first('carParams').carFingerprint | |
qlog = route.endswith('/q') | |
if qlog: | |
print('WARNING: Treating route as qlog!') | |
print('Finding events...') | |
events = lr.run_across_segments(8, partial(find_events, extrapolate=args.extrapolate, qlog=qlog), disable_tqdm=True) | |
with open(cache_filename, 'wb') as f: | |
pickle.dump((fingerprint, events), f) | |
all_events.extend(events) | |
fingerprints.append(fingerprint) | |
except Exception as e: | |
print(f'Skipping {route} due to error: {e}') | |
if os.path.exists(cache_filename): # remove potentially corrupted cache file | |
os.remove(cache_filename) | |
if (len(set(fingerprints)) > 1): | |
print(f"WARNING: Mismatched fingerprints: {', '.join(set(fingerprints))}") | |
print(f'\nFound {len(all_events)} events in total') | |
## % Processing | |
# Extract torque and acceleration values | |
torques = [ev.out_torque for ev in all_events] | |
accels = [ev.lateral_accel for ev in all_events] | |
steeringAngles = [ev.steeringAngleDeg for ev in all_events] | |
steeringRates = [ev.steeringRateDeg for ev in all_events] | |
torques_filtered = [] | |
accels_filtered = [] | |
for angle, rate, torque, accel in zip(steeringAngles, steeringRates, torques, accels): | |
# steering driven slowly: | |
condition = (MIN_STEER_SPEED < rate < MAX_STEER_SPEED) if angle > 0 else (-MIN_STEER_SPEED > rate > -MAX_STEER_SPEED) | |
if condition: | |
torques_filtered.append(torque) | |
accels_filtered.append(accel) | |
print(f'Found {len(torques_filtered)} events after filtering') | |
torques = torques_filtered | |
accels = accels_filtered | |
## Plotting | |
plt.ion() | |
plt.clf() | |
print(fingerprints) | |
plt.suptitle(f'{fingerprints[0]} - Output Torque vs Lateral Acceleration') | |
plt.title(', '.join(args.route)) | |
# Plot lateral acceleration vs output torque | |
plt.scatter(torques, accels, label='Events', s=.1, alpha=0.1) | |
# Fit a line (1st degree polynomial) to the data | |
if len(torques) > 1 and len(accels) > 1: | |
coeffs = np.polyfit(torques, accels, 1) | |
poly = np.poly1d(coeffs) | |
plt.plot(torques, poly(torques), color='red', label=f'Fit: LAT_ACCEL_FACTOR={coeffs[0]:.2f}, latAccelOffset={coeffs[1]:.2f}') | |
# Add grid and reference lines | |
plt.grid(True, linestyle='--', alpha=0.7) | |
plt.axhline(0, color='black', linewidth=0.5) | |
plt.axvline(0, color='black', linewidth=0.5) | |
plt.xlim(min(-1, min(torques)), max(1, max(torques))) | |
plt.ylim(-3, 3) | |
plt.xlabel('Output Steer') | |
plt.ylabel('Lateral Acceleration (m/s²)') | |
plt.legend(loc='upper left') | |
plt.tight_layout() | |
plt.show(block=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment