Created
February 13, 2024 21:42
-
-
Save marcadams/b6b5ed2378a596f8e5f9168e200f900e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import math | |
| class Reward: | |
| def __init__(self, verbose=False): | |
| self.first_racingpoint_index = 0 # None | |
| self.verbose = verbose | |
| def reward_function(self, params): | |
| # Import package (needed for heading) | |
| # import math | |
| ################## HELPER FUNCTIONS ################### | |
| def dist_2_points(x1, x2, y1, y2): | |
| return abs(abs(x1 - x2) ** 2 + abs(y1 - y2) ** 2) ** 0.5 | |
| def closest_2_racing_points_index(racing_coords, car_coords): | |
| # Calculate all distances to racing points | |
| distances = [] | |
| for i in range(len(racing_coords)): | |
| distance = dist_2_points( | |
| x1=racing_coords[i][0], | |
| x2=car_coords[0], | |
| y1=racing_coords[i][1], | |
| y2=car_coords[1], | |
| ) | |
| distances.append(distance) | |
| # Get index of the closest racing point | |
| closest_index = distances.index(min(distances)) | |
| # Get index of the second closest racing point | |
| distances_no_closest = distances.copy() | |
| distances_no_closest[closest_index] = 999 | |
| second_closest_index = distances_no_closest.index( | |
| min(distances_no_closest)) | |
| return [closest_index, second_closest_index] | |
| def dist_to_racing_line(closest_coords, second_closest_coords, car_coords): | |
| # Calculate the distances between 2 closest racing points | |
| a = abs( | |
| dist_2_points( | |
| x1=closest_coords[0], | |
| x2=second_closest_coords[0], | |
| y1=closest_coords[1], | |
| y2=second_closest_coords[1], | |
| ) | |
| ) | |
| # Distances between car and closest and second closest racing point | |
| b = abs( | |
| dist_2_points( | |
| x1=car_coords[0], | |
| x2=closest_coords[0], | |
| y1=car_coords[1], | |
| y2=closest_coords[1], | |
| ) | |
| ) | |
| c = abs( | |
| dist_2_points( | |
| x1=car_coords[0], | |
| x2=second_closest_coords[0], | |
| y1=car_coords[1], | |
| y2=second_closest_coords[1], | |
| ) | |
| ) | |
| # Calculate distance between car and racing line (goes through 2 closest racing points) | |
| # try-except in case a=0 (rare bug in DeepRacer) | |
| try: | |
| distance = abs( | |
| -(a ** 4) | |
| + 2 * (a ** 2) * (b ** 2) | |
| + 2 * (a ** 2) * (c ** 2) | |
| - (b ** 4) | |
| + 2 * (b ** 2) * (c ** 2) | |
| - (c ** 4) | |
| ) ** 0.5 / (2 * a) | |
| except: | |
| distance = b | |
| return distance | |
| # Calculate which one of the closest racing points is the next one and which one the previous one | |
| def next_prev_racing_point( | |
| closest_coords, second_closest_coords, car_coords, heading | |
| ): | |
| # Virtually set the car more into the heading direction | |
| heading_vector = [ | |
| math.cos(math.radians(heading)), | |
| math.sin(math.radians(heading)), | |
| ] | |
| new_car_coords = [ | |
| car_coords[0] + heading_vector[0], | |
| car_coords[1] + heading_vector[1], | |
| ] | |
| # Calculate distance from new car coords to 2 closest racing points | |
| distance_closest_coords_new = dist_2_points( | |
| x1=new_car_coords[0], | |
| x2=closest_coords[0], | |
| y1=new_car_coords[1], | |
| y2=closest_coords[1], | |
| ) | |
| distance_second_closest_coords_new = dist_2_points( | |
| x1=new_car_coords[0], | |
| x2=second_closest_coords[0], | |
| y1=new_car_coords[1], | |
| y2=second_closest_coords[1], | |
| ) | |
| if distance_closest_coords_new <= distance_second_closest_coords_new: | |
| next_point_coords = closest_coords | |
| prev_point_coords = second_closest_coords | |
| else: | |
| next_point_coords = second_closest_coords | |
| prev_point_coords = closest_coords | |
| return [next_point_coords, prev_point_coords] | |
| def racing_direction_diff( | |
| closest_coords, second_closest_coords, car_coords, heading | |
| ): | |
| # Calculate the direction of the center line based on the closest waypoints | |
| next_point, prev_point = next_prev_racing_point( | |
| closest_coords, second_closest_coords, car_coords, heading | |
| ) | |
| # Calculate the direction in radius, arctan2(dy, dx), the result is (-pi, pi) in radians | |
| track_direction = math.atan2( | |
| next_point[1] - prev_point[1], next_point[0] - prev_point[0] | |
| ) | |
| # Convert to degree | |
| track_direction = math.degrees(track_direction) | |
| # Calculate the difference between the track direction and the heading direction of the car | |
| direction_diff = abs(track_direction - heading) | |
| if direction_diff > 180: | |
| direction_diff = 360 - direction_diff | |
| return direction_diff | |
| # Gives back indexes that lie between start and end index of a cyclical list | |
| # (start index is included, end index is not) | |
| def indexes_cyclical(start, end, array_len): | |
| if end < start: | |
| end += array_len | |
| return [index % array_len for index in range(start, end)] | |
| # Calculate how long car would take for entire lap, if it continued like it did until now | |
| def projected_time(first_index, closest_index, step_count, times_list): | |
| # Calculate how much time has passed since start | |
| current_actual_time = (step_count - 1) / 15 | |
| # Calculate which indexes were already passed | |
| indexes_traveled = indexes_cyclical( | |
| first_index, closest_index, len(times_list) | |
| ) | |
| # Calculate how much time should have passed if car would have followed optimals | |
| current_expected_time = sum( | |
| [times_list[i] for i in indexes_traveled]) | |
| # Calculate how long one entire lap takes if car follows optimals | |
| total_expected_time = sum(times_list) | |
| # Calculate how long car would take for entire lap, if it continued like it did until now | |
| try: | |
| projected_time = ( | |
| current_actual_time / current_expected_time | |
| ) * total_expected_time | |
| except: | |
| projected_time = 9999 | |
| return projected_time | |
| #################### RACING LINE ###################### | |
| # Optimal racing line for the Spain track | |
| # Each row: [x,y,speed,timeFromPreviousPoint] | |
| racing_track = [[2.88706, 0.7258, 3.94468, 0.06892], | |
| [3.16724, 0.70398, 4.0, 0.07026], | |
| [3.45436, 0.69152, 4.0, 0.07185], | |
| [3.75186, 0.68552, 3.81169, 0.07807], | |
| [4.07281, 0.68361, 3.14704, 0.10199], | |
| [4.5, 0.68376, 2.73054, 0.15645], | |
| [4.55, 0.68378, 2.40688, 0.02077], | |
| [5.10947, 0.69128, 2.15399, 0.25976], | |
| [5.44315, 0.71199, 1.92092, 0.17404], | |
| [5.70785, 0.74299, 1.72999, 0.15405], | |
| [5.93753, 0.78554, 1.46949, 0.15896], | |
| [6.14402, 0.84099, 1.46949, 0.1455], | |
| [6.33003, 0.91022, 1.46949, 0.13506], | |
| [6.49528, 0.9932, 1.4179, 0.13042], | |
| [6.63853, 1.08981, 1.3, 0.13291], | |
| [6.75874, 1.19989, 1.3, 0.12538], | |
| [6.85106, 1.3254, 1.3, 0.11985], | |
| [6.91727, 1.46408, 1.3, 0.11821], | |
| [6.95853, 1.61373, 1.3, 0.11941], | |
| [6.96837, 1.7744, 1.3, 0.12382], | |
| [6.9342, 1.94323, 1.35468, 0.12715], | |
| [6.86049, 2.11166, 1.35468, 0.13571], | |
| [6.73339, 2.26589, 1.66739, 0.11986], | |
| [6.57106, 2.39898, 1.90788, 0.11003], | |
| [6.38406, 2.50903, 2.2568, 0.09614], | |
| [6.1818, 2.59894, 2.80925, 0.07879], | |
| [5.97104, 2.67441, 4.0, 0.05597], | |
| [5.75669, 2.74279, 3.4846, 0.06457], | |
| [5.55657, 2.81259, 3.4846, 0.06082], | |
| [5.3581, 2.88749, 3.4846, 0.06088], | |
| [5.16124, 2.96764, 3.4846, 0.061], | |
| [4.96602, 3.05329, 3.4846, 0.06118], | |
| [4.77263, 3.14531, 3.4846, 0.06146], | |
| [4.58194, 3.24724, 4.0, 0.05406], | |
| [4.39309, 3.35621, 4.0, 0.05451], | |
| [4.20542, 3.46985, 3.46882, 0.06325], | |
| [4.01834, 3.58583, 3.02128, 0.07285], | |
| [3.84572, 3.6925, 2.64271, 0.07678], | |
| [3.6784, 3.79359, 2.64271, 0.07397], | |
| [3.51581, 3.8876, 2.64271, 0.07107], | |
| [3.35451, 3.97485, 2.63037, 0.06972], | |
| [3.19148, 4.05479, 2.47383, 0.0734], | |
| [3.02506, 4.12582, 2.29639, 0.0788], | |
| [2.85384, 4.18542, 2.08592, 0.08691], | |
| [2.67783, 4.23432, 1.88131, 0.0971], | |
| [2.49664, 4.27227, 1.69624, 0.10914], | |
| [2.30927, 4.29775, 1.50069, 0.12601], | |
| [2.11408, 4.30795, 1.50069, 0.13024], | |
| [1.90854, 4.29804, 1.50069, 0.13712], | |
| [1.68849, 4.25834, 1.50069, 0.149], | |
| [1.44936, 4.1691, 1.50069, 0.17008], | |
| [1.20319, 3.99596, 1.50069, 0.20055], | |
| [1.01439, 3.70539, 1.77504, 0.19521], | |
| [0.91435, 3.35305, 2.1548, 0.16998], | |
| [0.88845, 3.0257, 2.4887, 0.13195], | |
| [0.90073, 2.76392, 2.48281, 0.10555], | |
| [0.92864, 2.53256, 2.28577, 0.10195], | |
| [0.96859, 2.31404, 2.09133, 0.10622], | |
| [1.01933, 2.11009, 1.88745, 0.11134], | |
| [1.08168, 1.91627, 1.67154, 0.12181], | |
| [1.15576, 1.73305, 1.67154, 0.11823], | |
| [1.2421, 1.56206, 1.67154, 0.1146], | |
| [1.34138, 1.40473, 1.67154, 0.1113], | |
| [1.45505, 1.26244, 1.67154, 0.10895], | |
| [1.58641, 1.13709, 1.67154, 0.10862], | |
| [1.74406, 1.03231, 1.99881, 0.0947], | |
| [1.92588, 0.94287, 2.27931, 0.0889], | |
| [2.13233, 0.86755, 2.56843, 0.08556], | |
| [2.36374, 0.80649, 2.9557, 0.08097], | |
| [2.61732, 0.7596, 3.46381, 0.07445]] | |
| ################## INPUT PARAMETERS ################### | |
| # Read all input parameters | |
| all_wheels_on_track = params["all_wheels_on_track"] | |
| x = params["x"] | |
| y = params["y"] | |
| distance_from_center = params["distance_from_center"] | |
| is_left_of_center = params["is_left_of_center"] | |
| heading = params["heading"] | |
| progress = params["progress"] | |
| steps = params["steps"] | |
| speed = params["speed"] | |
| steering_angle = params["steering_angle"] | |
| track_width = params["track_width"] | |
| waypoints = params["waypoints"] | |
| closest_waypoints = params["closest_waypoints"] | |
| is_offtrack = params["is_offtrack"] | |
| ############### OPTIMAL X,Y,SPEED,TIME ################ | |
| # Get closest indexes for racing line (and distances to all points on racing line) | |
| closest_index, second_closest_index = closest_2_racing_points_index( | |
| racing_track, [x, y] | |
| ) | |
| # Get optimal [x, y, speed, time] for closest and second closest index | |
| optimals = racing_track[closest_index] | |
| optimals_second = racing_track[second_closest_index] | |
| # Save first racingpoint of episode for later | |
| if self.verbose == True: | |
| self.first_racingpoint_index = 0 # this is just for testing purposes | |
| if steps == 1: | |
| self.first_racingpoint_index = closest_index | |
| ################ REWARD AND PUNISHMENT ################ | |
| ## Define the default reward ## | |
| reward = 1 | |
| MIN_REWARD = 1e-2 | |
| ## Reward if car goes close to optimal racing line ## | |
| DISTANCE_MULTIPLE = 2 | |
| dist = dist_to_racing_line(optimals[0:2], optimals_second[0:2], [x, y]) | |
| distance_reward = max(MIN_REWARD, 1 - (dist / (track_width * 0.5))) | |
| reward += distance_reward * DISTANCE_MULTIPLE | |
| ## Reward if speed is close to optimal speed ## | |
| SPEED_DIFF_NO_REWARD = 1 | |
| SPEED_MULTIPLE = 3 | |
| speed_diff = abs(optimals[2] - speed) | |
| if speed_diff <= SPEED_DIFF_NO_REWARD: | |
| # we use quadratic punishment (not linear) bc we're not as confident with the optimal speed | |
| # so, we do not punish small deviations from optimal speed | |
| speed_reward = ( | |
| 1 - (speed_diff / (SPEED_DIFF_NO_REWARD)) ** 2) ** 2 | |
| else: | |
| speed_reward = 0 | |
| reward += speed_reward * SPEED_MULTIPLE | |
| # Reward if less steps | |
| REWARD_PER_STEP_FOR_FASTEST_TIME = 1.5 | |
| STANDARD_TIME = 12 | |
| FASTEST_TIME = 7 | |
| times_list = [row[3] for row in racing_track] | |
| projected_time = projected_time( | |
| self.first_racingpoint_index, closest_index, steps, times_list | |
| ) | |
| try: | |
| steps_prediction = projected_time * 15 + 1 | |
| reward_prediction = max( | |
| MIN_REWARD, | |
| ( | |
| -REWARD_PER_STEP_FOR_FASTEST_TIME | |
| * (FASTEST_TIME) | |
| / (STANDARD_TIME - FASTEST_TIME) | |
| ) | |
| * (steps_prediction - (STANDARD_TIME * 15 + 1)), | |
| ) | |
| steps_reward = min( | |
| REWARD_PER_STEP_FOR_FASTEST_TIME, reward_prediction / steps_prediction | |
| ) | |
| except: | |
| steps_reward = 0 | |
| reward += steps_reward | |
| # Zero reward if obviously wrong direction (e.g. spin) | |
| direction_diff = racing_direction_diff( | |
| optimals[0:2], optimals_second[0:2], [x, y], heading | |
| ) | |
| if direction_diff > 30 or abs(steering_angle) > 20: | |
| reward = MIN_REWARD | |
| else: | |
| reward += 1.1 - (direction_diff / 30) | |
| # Zero reward of obviously too slow | |
| speed_diff_zero = optimals[2] - speed | |
| if speed_diff_zero > 0.5: | |
| reward = MIN_REWARD | |
| ## Incentive for finishing the lap in less steps ## | |
| # should be adapted to track length and other rewards | |
| REWARD_FOR_FASTEST_TIME = 300 | |
| STANDARD_TIME = 12 # seconds (time that is easily done by model) | |
| FASTEST_TIME = 7 # seconds (best time of 1st place on the track) | |
| if progress > 99.5: | |
| finish_reward = max( | |
| MIN_REWARD, | |
| (-REWARD_FOR_FASTEST_TIME / (15 * (STANDARD_TIME - FASTEST_TIME))) | |
| * (steps - STANDARD_TIME * 15), | |
| ) | |
| else: | |
| finish_reward = 0 | |
| reward += finish_reward | |
| ## Zero reward if off track ## | |
| if is_offtrack == True: | |
| reward = MIN_REWARD | |
| ####################### VERBOSE ####################### | |
| if self.verbose == True: | |
| print("Closest index: %i" % closest_index) | |
| print("Distance to racing line: %f" % dist) | |
| print("=== Distance reward (w/out multiple): %f ===" % | |
| (distance_reward)) | |
| print("Optimal speed: %f" % optimals[2]) | |
| print("Speed difference: %f" % speed_diff) | |
| print("=== Speed reward (w/out multiple): %f ===" % speed_reward) | |
| print("Direction difference: %f" % direction_diff) | |
| print("Predicted time: %f" % projected_time) | |
| print("=== Steps reward: %f ===" % steps_reward) | |
| print("=== Finish reward: %f ===" % finish_reward) | |
| #################### RETURN REWARD #################### | |
| # Always return a float value | |
| return float(reward) | |
| reward_object = Reward() # add parameter verbose=True to get noisy output for testing | |
| def reward_function(params): | |
| return reward_object.reward_function(params) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment