Created
June 5, 2022 04:30
-
-
Save adover/89dc0f54f1e1472294f03e3653b760af to your computer and use it in GitHub Desktop.
Deepracer Reward fn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import traceback | |
class Reward: | |
SLOW_SPEED = 2.5 | |
HIGH_SPEED = 3 | |
WEIGHT_ON_TRACK = 9 | |
WEIGHT_SPEED = 6 | |
WEIGHT_HEADING = 4 | |
WEIGHT_CORRECT_LANE = 3 | |
WEIGHT_PROGRESS = 0.5 | |
WORST_REWARD = 1e-3 | |
TOTAL_NUM_STEPS = 150 # 15 FPS = 15 steps per second. I want it done in 10 | |
REWARD_STEP = 15 | |
CENTRE_TOLERANCE = 0.4 | |
LOOK_AHEAD_COUNT = 3 | |
# Summit Speedway 2022 waypoints | |
WAYPOINTS_KEEP_LEFT = [47, 48, 49, 50, 51, 52, 53, | |
54, 55, 56, 57, 58, 59, 117, 118, 119] | |
WAYPOINTS_KEEP_CENTRAL = [34, 35, 36, 36, 37, 38, 39, 40, 41, 42, 43, 45, 46, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 90, 91, | |
92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 120, 121, 122, 123] # Fill in the waypoints | |
WAYPOINTS_KEEP_RIGHT = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, | |
23, 24, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 82, 83, 84, 85, 86, 87, 88, 89, 124, 125, 126] | |
WAYPOINTS_FAST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 41, 42, 43, 45, 46, | |
47, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 82, 82, 83, 84, 85, 86, 87, 88, 89, 100, 101, 102, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 120, 121, 122, 123, 124, 125, 126] | |
WAYPOINTS_SLOW = [35, 36, 37, 38, 39, 40, 48, 49, 50, 51, 52, 90, 91, | |
92, 93, 94, 95, 96, 97, 98, 99, 103, 104, 105, 106, 118, 119, 120] | |
# Base reward - this is what we accumulate and output | |
reward = float(0.001) | |
# Params | |
all_wheels_on_track = None | |
x = None | |
y = None | |
distance_from_center = None | |
is_left_of_center = None | |
is_reversed = None | |
heading = None | |
progress = None | |
steps = None | |
speed = None | |
steering_angle = None | |
track_width = None | |
waypoints = None | |
closest_waypoints = None | |
nearest_previous_waypoint_ind = None | |
nearest_next_waypoint_ind = None | |
centre_variance = None | |
def __init__(self, params): | |
self.all_wheels_on_track = params['all_wheels_on_track'] | |
self.x = params['x'] | |
self.y = params['y'] | |
self.distance_from_center = params['distance_from_center'] | |
self.is_left_of_center = params['is_left_of_center'] | |
self.is_reversed = params['is_reversed'] | |
self.heading = params['heading'] | |
self.progress = params['progress'] | |
self.steps = params['steps'] | |
self.speed = params['speed'] | |
self.steering_angle = params['steering_angle'] | |
self.track_width = params['track_width'] | |
self.waypoints = params['waypoints'] | |
closest_waypoints = params['closest_waypoints'] | |
self.nearest_previous_waypoint_ind = closest_waypoints[0] | |
self.nearest_next_waypoint_ind = closest_waypoints[1] | |
self.centre_variance = params["distance_from_center"] / params["track_width"] | |
def status_to_string(self): | |
status = self.params | |
if 'waypoints' in status: del status['waypoints'] | |
status['debug_log'] = self.log_message | |
print(status) | |
# Accumulates all logging messages into one string which you may need to write to the log (uncomment line | |
# self.status_to_string() in evaluate() if you want to log status and calculation outputs. | |
def log_feature(self, message): | |
if message is None: | |
message = 'NULL' | |
self.log_message = self.log_message + str(message) + '|' | |
def fatal_reward(self): | |
return not self.all_wheels_on_track or self.is_reversed | |
def speed_reward(self, reward): | |
next_point = max(self.closest_waypoints) | |
if next_point in self.WAYPOINTS_FAST: | |
if self.speed >= self.HIGH_SPEED: | |
reward += self.WEIGHT_SPEED | |
else: | |
reward -= self.WEIGHT_SPEED | |
elif next_point in self.WAYPOINTS_SLOW: | |
if self.speed <= self.SLOW_SPEED: | |
reward += self.WEIGHT_SPEED | |
else: | |
reward -= self.WEIGHT_SPEED | |
return reward | |
def lane_reward(self, reward): | |
next_point = max(self.closest_waypoints) | |
if next_point in self.WAYPOINTS_KEEP_LEFT and self.is_left_of_center: | |
reward += self.WEIGHT_CORRECT_LANE | |
elif next_point in self.WAYPOINTS_KEEP_RIGHT and not self.is_left_of_center: | |
reward += self.WEIGHT_CORRECT_LANE | |
elif next_point in self.WAYPOINTS_KEEP_RIGHT and self.centre_variance < self.CENTRE_TOLERANCE: | |
reward += self.WEIGHT_CORRECT_LANE | |
else: | |
reward -= self.WEIGHT_CORRECT_LANE | |
return reward | |
def incremental_progress(self, reward): | |
reward += (self.progress * self.WEIGHT_PROGRESS) | |
return reward | |
def faster_than_target_time(self, reward): | |
if (self.steps % self.REWARD_STEP) == 0 and self.progress/100 > (self.steps/self.TOTAL_NUM_STEPS): | |
reward += self.progress - (self.steps/self.TOTAL_NUM_STEPS)*100 | |
def look_ahead(self, reward): | |
next_point = max(self.closest_waypoints) | |
n_waypoints_ahead = self.waypoints[( | |
next_point + self.LOOK_AHEAD_COUNT) % len(self.waypoints)] | |
y2 = n_waypoints_ahead[1] | |
x2 = n_waypoints_ahead[0] | |
n_waypoints_ahead_direction = math.degrees(math.atan2(y2 - y, x2 - x)) | |
# Heading is based on Positive from X Axis CCW and Negative below X Axis (-180 to 180 deg) | |
heading_difference = abs(n_waypoints_ahead_direction - self.heading) | |
if heading_difference > 180: | |
heading_difference = 360 - heading_difference | |
# Reward Better Aligned direction towards n waypoints ahead | |
if heading_difference > 45: | |
reward -= (self.WEIGHT_HEADING * 2) | |
elif heading_difference > 30: | |
reward -= self.WEIGHT_HEADING | |
elif heading_difference > 15: | |
reward -= (self.WEIGHT_HEADING * 0.5) | |
return reward | |
def evaluate(self): | |
reward_output = self.reward | |
try: | |
speed_component = self.speed_reward(self, 1.0) | |
lane_component = self.lane_reward(self, 1.0) | |
progress_component = self.incremental_progress(self, 1.0) | |
target_component = self.faster_than_target_time(self, 1.0) | |
look_ahead_component = self.look_ahead(self, 1.0) | |
if(self.fatal_reward): | |
reward_output = self.WORST_REWARD | |
else: | |
reward_output = float(speed_component + lane_component + progress_component + target_component + look_ahead_component) | |
except Exception as e: | |
print("Error : " + str(e)) | |
print(traceback.format_exc()) | |
self.log_feature(reward_output) | |
self.status_to_string() | |
return float(reward_output) | |
def reward_function(params): | |
reward = Reward(params) | |
return reward.evaluate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment