Skip to content

Instantly share code, notes, and snippets.

@marcadams
Created February 13, 2024 21:42
Show Gist options
  • Select an option

  • Save marcadams/b6b5ed2378a596f8e5f9168e200f900e to your computer and use it in GitHub Desktop.

Select an option

Save marcadams/b6b5ed2378a596f8e5f9168e200f900e to your computer and use it in GitHub Desktop.
import math
class Reward:
def __init__(self, verbose=False):
self.first_racingpoint_index = 0 # None
self.verbose = verbose
def reward_function(self, params):
# Import package (needed for heading)
# import math
################## HELPER FUNCTIONS ###################
def dist_2_points(x1, x2, y1, y2):
return abs(abs(x1 - x2) ** 2 + abs(y1 - y2) ** 2) ** 0.5
def closest_2_racing_points_index(racing_coords, car_coords):
# Calculate all distances to racing points
distances = []
for i in range(len(racing_coords)):
distance = dist_2_points(
x1=racing_coords[i][0],
x2=car_coords[0],
y1=racing_coords[i][1],
y2=car_coords[1],
)
distances.append(distance)
# Get index of the closest racing point
closest_index = distances.index(min(distances))
# Get index of the second closest racing point
distances_no_closest = distances.copy()
distances_no_closest[closest_index] = 999
second_closest_index = distances_no_closest.index(
min(distances_no_closest))
return [closest_index, second_closest_index]
def dist_to_racing_line(closest_coords, second_closest_coords, car_coords):
# Calculate the distances between 2 closest racing points
a = abs(
dist_2_points(
x1=closest_coords[0],
x2=second_closest_coords[0],
y1=closest_coords[1],
y2=second_closest_coords[1],
)
)
# Distances between car and closest and second closest racing point
b = abs(
dist_2_points(
x1=car_coords[0],
x2=closest_coords[0],
y1=car_coords[1],
y2=closest_coords[1],
)
)
c = abs(
dist_2_points(
x1=car_coords[0],
x2=second_closest_coords[0],
y1=car_coords[1],
y2=second_closest_coords[1],
)
)
# Calculate distance between car and racing line (goes through 2 closest racing points)
# try-except in case a=0 (rare bug in DeepRacer)
try:
distance = abs(
-(a ** 4)
+ 2 * (a ** 2) * (b ** 2)
+ 2 * (a ** 2) * (c ** 2)
- (b ** 4)
+ 2 * (b ** 2) * (c ** 2)
- (c ** 4)
) ** 0.5 / (2 * a)
except:
distance = b
return distance
# Calculate which one of the closest racing points is the next one and which one the previous one
def next_prev_racing_point(
closest_coords, second_closest_coords, car_coords, heading
):
# Virtually set the car more into the heading direction
heading_vector = [
math.cos(math.radians(heading)),
math.sin(math.radians(heading)),
]
new_car_coords = [
car_coords[0] + heading_vector[0],
car_coords[1] + heading_vector[1],
]
# Calculate distance from new car coords to 2 closest racing points
distance_closest_coords_new = dist_2_points(
x1=new_car_coords[0],
x2=closest_coords[0],
y1=new_car_coords[1],
y2=closest_coords[1],
)
distance_second_closest_coords_new = dist_2_points(
x1=new_car_coords[0],
x2=second_closest_coords[0],
y1=new_car_coords[1],
y2=second_closest_coords[1],
)
if distance_closest_coords_new <= distance_second_closest_coords_new:
next_point_coords = closest_coords
prev_point_coords = second_closest_coords
else:
next_point_coords = second_closest_coords
prev_point_coords = closest_coords
return [next_point_coords, prev_point_coords]
def racing_direction_diff(
closest_coords, second_closest_coords, car_coords, heading
):
# Calculate the direction of the center line based on the closest waypoints
next_point, prev_point = next_prev_racing_point(
closest_coords, second_closest_coords, car_coords, heading
)
# Calculate the direction in radius, arctan2(dy, dx), the result is (-pi, pi) in radians
track_direction = math.atan2(
next_point[1] - prev_point[1], next_point[0] - prev_point[0]
)
# Convert to degree
track_direction = math.degrees(track_direction)
# Calculate the difference between the track direction and the heading direction of the car
direction_diff = abs(track_direction - heading)
if direction_diff > 180:
direction_diff = 360 - direction_diff
return direction_diff
# Gives back indexes that lie between start and end index of a cyclical list
# (start index is included, end index is not)
def indexes_cyclical(start, end, array_len):
if end < start:
end += array_len
return [index % array_len for index in range(start, end)]
# Calculate how long car would take for entire lap, if it continued like it did until now
def projected_time(first_index, closest_index, step_count, times_list):
# Calculate how much time has passed since start
current_actual_time = (step_count - 1) / 15
# Calculate which indexes were already passed
indexes_traveled = indexes_cyclical(
first_index, closest_index, len(times_list)
)
# Calculate how much time should have passed if car would have followed optimals
current_expected_time = sum(
[times_list[i] for i in indexes_traveled])
# Calculate how long one entire lap takes if car follows optimals
total_expected_time = sum(times_list)
# Calculate how long car would take for entire lap, if it continued like it did until now
try:
projected_time = (
current_actual_time / current_expected_time
) * total_expected_time
except:
projected_time = 9999
return projected_time
#################### RACING LINE ######################
# Optimal racing line for the Spain track
# Each row: [x,y,speed,timeFromPreviousPoint]
racing_track = [[2.88706, 0.7258, 3.94468, 0.06892],
[3.16724, 0.70398, 4.0, 0.07026],
[3.45436, 0.69152, 4.0, 0.07185],
[3.75186, 0.68552, 3.81169, 0.07807],
[4.07281, 0.68361, 3.14704, 0.10199],
[4.5, 0.68376, 2.73054, 0.15645],
[4.55, 0.68378, 2.40688, 0.02077],
[5.10947, 0.69128, 2.15399, 0.25976],
[5.44315, 0.71199, 1.92092, 0.17404],
[5.70785, 0.74299, 1.72999, 0.15405],
[5.93753, 0.78554, 1.46949, 0.15896],
[6.14402, 0.84099, 1.46949, 0.1455],
[6.33003, 0.91022, 1.46949, 0.13506],
[6.49528, 0.9932, 1.4179, 0.13042],
[6.63853, 1.08981, 1.3, 0.13291],
[6.75874, 1.19989, 1.3, 0.12538],
[6.85106, 1.3254, 1.3, 0.11985],
[6.91727, 1.46408, 1.3, 0.11821],
[6.95853, 1.61373, 1.3, 0.11941],
[6.96837, 1.7744, 1.3, 0.12382],
[6.9342, 1.94323, 1.35468, 0.12715],
[6.86049, 2.11166, 1.35468, 0.13571],
[6.73339, 2.26589, 1.66739, 0.11986],
[6.57106, 2.39898, 1.90788, 0.11003],
[6.38406, 2.50903, 2.2568, 0.09614],
[6.1818, 2.59894, 2.80925, 0.07879],
[5.97104, 2.67441, 4.0, 0.05597],
[5.75669, 2.74279, 3.4846, 0.06457],
[5.55657, 2.81259, 3.4846, 0.06082],
[5.3581, 2.88749, 3.4846, 0.06088],
[5.16124, 2.96764, 3.4846, 0.061],
[4.96602, 3.05329, 3.4846, 0.06118],
[4.77263, 3.14531, 3.4846, 0.06146],
[4.58194, 3.24724, 4.0, 0.05406],
[4.39309, 3.35621, 4.0, 0.05451],
[4.20542, 3.46985, 3.46882, 0.06325],
[4.01834, 3.58583, 3.02128, 0.07285],
[3.84572, 3.6925, 2.64271, 0.07678],
[3.6784, 3.79359, 2.64271, 0.07397],
[3.51581, 3.8876, 2.64271, 0.07107],
[3.35451, 3.97485, 2.63037, 0.06972],
[3.19148, 4.05479, 2.47383, 0.0734],
[3.02506, 4.12582, 2.29639, 0.0788],
[2.85384, 4.18542, 2.08592, 0.08691],
[2.67783, 4.23432, 1.88131, 0.0971],
[2.49664, 4.27227, 1.69624, 0.10914],
[2.30927, 4.29775, 1.50069, 0.12601],
[2.11408, 4.30795, 1.50069, 0.13024],
[1.90854, 4.29804, 1.50069, 0.13712],
[1.68849, 4.25834, 1.50069, 0.149],
[1.44936, 4.1691, 1.50069, 0.17008],
[1.20319, 3.99596, 1.50069, 0.20055],
[1.01439, 3.70539, 1.77504, 0.19521],
[0.91435, 3.35305, 2.1548, 0.16998],
[0.88845, 3.0257, 2.4887, 0.13195],
[0.90073, 2.76392, 2.48281, 0.10555],
[0.92864, 2.53256, 2.28577, 0.10195],
[0.96859, 2.31404, 2.09133, 0.10622],
[1.01933, 2.11009, 1.88745, 0.11134],
[1.08168, 1.91627, 1.67154, 0.12181],
[1.15576, 1.73305, 1.67154, 0.11823],
[1.2421, 1.56206, 1.67154, 0.1146],
[1.34138, 1.40473, 1.67154, 0.1113],
[1.45505, 1.26244, 1.67154, 0.10895],
[1.58641, 1.13709, 1.67154, 0.10862],
[1.74406, 1.03231, 1.99881, 0.0947],
[1.92588, 0.94287, 2.27931, 0.0889],
[2.13233, 0.86755, 2.56843, 0.08556],
[2.36374, 0.80649, 2.9557, 0.08097],
[2.61732, 0.7596, 3.46381, 0.07445]]
################## INPUT PARAMETERS ###################
# Read all input parameters
all_wheels_on_track = params["all_wheels_on_track"]
x = params["x"]
y = params["y"]
distance_from_center = params["distance_from_center"]
is_left_of_center = params["is_left_of_center"]
heading = params["heading"]
progress = params["progress"]
steps = params["steps"]
speed = params["speed"]
steering_angle = params["steering_angle"]
track_width = params["track_width"]
waypoints = params["waypoints"]
closest_waypoints = params["closest_waypoints"]
is_offtrack = params["is_offtrack"]
############### OPTIMAL X,Y,SPEED,TIME ################
# Get closest indexes for racing line (and distances to all points on racing line)
closest_index, second_closest_index = closest_2_racing_points_index(
racing_track, [x, y]
)
# Get optimal [x, y, speed, time] for closest and second closest index
optimals = racing_track[closest_index]
optimals_second = racing_track[second_closest_index]
# Save first racingpoint of episode for later
if self.verbose == True:
self.first_racingpoint_index = 0 # this is just for testing purposes
if steps == 1:
self.first_racingpoint_index = closest_index
################ REWARD AND PUNISHMENT ################
## Define the default reward ##
reward = 1
MIN_REWARD = 1e-2
## Reward if car goes close to optimal racing line ##
DISTANCE_MULTIPLE = 2
dist = dist_to_racing_line(optimals[0:2], optimals_second[0:2], [x, y])
distance_reward = max(MIN_REWARD, 1 - (dist / (track_width * 0.5)))
reward += distance_reward * DISTANCE_MULTIPLE
## Reward if speed is close to optimal speed ##
SPEED_DIFF_NO_REWARD = 1
SPEED_MULTIPLE = 3
speed_diff = abs(optimals[2] - speed)
if speed_diff <= SPEED_DIFF_NO_REWARD:
# we use quadratic punishment (not linear) bc we're not as confident with the optimal speed
# so, we do not punish small deviations from optimal speed
speed_reward = (
1 - (speed_diff / (SPEED_DIFF_NO_REWARD)) ** 2) ** 2
else:
speed_reward = 0
reward += speed_reward * SPEED_MULTIPLE
# Reward if less steps
REWARD_PER_STEP_FOR_FASTEST_TIME = 1.5
STANDARD_TIME = 12
FASTEST_TIME = 7
times_list = [row[3] for row in racing_track]
projected_time = projected_time(
self.first_racingpoint_index, closest_index, steps, times_list
)
try:
steps_prediction = projected_time * 15 + 1
reward_prediction = max(
MIN_REWARD,
(
-REWARD_PER_STEP_FOR_FASTEST_TIME
* (FASTEST_TIME)
/ (STANDARD_TIME - FASTEST_TIME)
)
* (steps_prediction - (STANDARD_TIME * 15 + 1)),
)
steps_reward = min(
REWARD_PER_STEP_FOR_FASTEST_TIME, reward_prediction / steps_prediction
)
except:
steps_reward = 0
reward += steps_reward
# Zero reward if obviously wrong direction (e.g. spin)
direction_diff = racing_direction_diff(
optimals[0:2], optimals_second[0:2], [x, y], heading
)
if direction_diff > 30 or abs(steering_angle) > 20:
reward = MIN_REWARD
else:
reward += 1.1 - (direction_diff / 30)
# Zero reward of obviously too slow
speed_diff_zero = optimals[2] - speed
if speed_diff_zero > 0.5:
reward = MIN_REWARD
## Incentive for finishing the lap in less steps ##
# should be adapted to track length and other rewards
REWARD_FOR_FASTEST_TIME = 300
STANDARD_TIME = 12 # seconds (time that is easily done by model)
FASTEST_TIME = 7 # seconds (best time of 1st place on the track)
if progress > 99.5:
finish_reward = max(
MIN_REWARD,
(-REWARD_FOR_FASTEST_TIME / (15 * (STANDARD_TIME - FASTEST_TIME)))
* (steps - STANDARD_TIME * 15),
)
else:
finish_reward = 0
reward += finish_reward
## Zero reward if off track ##
if is_offtrack == True:
reward = MIN_REWARD
####################### VERBOSE #######################
if self.verbose == True:
print("Closest index: %i" % closest_index)
print("Distance to racing line: %f" % dist)
print("=== Distance reward (w/out multiple): %f ===" %
(distance_reward))
print("Optimal speed: %f" % optimals[2])
print("Speed difference: %f" % speed_diff)
print("=== Speed reward (w/out multiple): %f ===" % speed_reward)
print("Direction difference: %f" % direction_diff)
print("Predicted time: %f" % projected_time)
print("=== Steps reward: %f ===" % steps_reward)
print("=== Finish reward: %f ===" % finish_reward)
#################### RETURN REWARD ####################
# Always return a float value
return float(reward)
reward_object = Reward() # add parameter verbose=True to get noisy output for testing
def reward_function(params):
return reward_object.reward_function(params)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment