Skip to content

Instantly share code, notes, and snippets.

@adover
Created June 5, 2022 04:30
Show Gist options
  • Save adover/89dc0f54f1e1472294f03e3653b760af to your computer and use it in GitHub Desktop.
Save adover/89dc0f54f1e1472294f03e3653b760af to your computer and use it in GitHub Desktop.
Deepracer Reward fn
import math
import traceback
class Reward:
SLOW_SPEED = 2.5
HIGH_SPEED = 3
WEIGHT_ON_TRACK = 9
WEIGHT_SPEED = 6
WEIGHT_HEADING = 4
WEIGHT_CORRECT_LANE = 3
WEIGHT_PROGRESS = 0.5
WORST_REWARD = 1e-3
TOTAL_NUM_STEPS = 150 # 15 FPS = 15 steps per second. I want it done in 10
REWARD_STEP = 15
CENTRE_TOLERANCE = 0.4
LOOK_AHEAD_COUNT = 3
# Summit Speedway 2022 waypoints
WAYPOINTS_KEEP_LEFT = [47, 48, 49, 50, 51, 52, 53,
54, 55, 56, 57, 58, 59, 117, 118, 119]
WAYPOINTS_KEEP_CENTRAL = [34, 35, 36, 36, 37, 38, 39, 40, 41, 42, 43, 45, 46, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 90, 91,
92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 120, 121, 122, 123] # Fill in the waypoints
WAYPOINTS_KEEP_RIGHT = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
23, 24, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 82, 83, 84, 85, 86, 87, 88, 89, 124, 125, 126]
WAYPOINTS_FAST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 41, 42, 43, 45, 46,
47, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 82, 82, 83, 84, 85, 86, 87, 88, 89, 100, 101, 102, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 120, 121, 122, 123, 124, 125, 126]
WAYPOINTS_SLOW = [35, 36, 37, 38, 39, 40, 48, 49, 50, 51, 52, 90, 91,
92, 93, 94, 95, 96, 97, 98, 99, 103, 104, 105, 106, 118, 119, 120]
# Base reward - this is what we accumulate and output
reward = float(0.001)
# Params
all_wheels_on_track = None
x = None
y = None
distance_from_center = None
is_left_of_center = None
is_reversed = None
heading = None
progress = None
steps = None
speed = None
steering_angle = None
track_width = None
waypoints = None
closest_waypoints = None
nearest_previous_waypoint_ind = None
nearest_next_waypoint_ind = None
centre_variance = None
def __init__(self, params):
self.all_wheels_on_track = params['all_wheels_on_track']
self.x = params['x']
self.y = params['y']
self.distance_from_center = params['distance_from_center']
self.is_left_of_center = params['is_left_of_center']
self.is_reversed = params['is_reversed']
self.heading = params['heading']
self.progress = params['progress']
self.steps = params['steps']
self.speed = params['speed']
self.steering_angle = params['steering_angle']
self.track_width = params['track_width']
self.waypoints = params['waypoints']
closest_waypoints = params['closest_waypoints']
self.nearest_previous_waypoint_ind = closest_waypoints[0]
self.nearest_next_waypoint_ind = closest_waypoints[1]
self.centre_variance = params["distance_from_center"] / params["track_width"]
def status_to_string(self):
status = self.params
if 'waypoints' in status: del status['waypoints']
status['debug_log'] = self.log_message
print(status)
# Accumulates all logging messages into one string which you may need to write to the log (uncomment line
# self.status_to_string() in evaluate() if you want to log status and calculation outputs.
def log_feature(self, message):
if message is None:
message = 'NULL'
self.log_message = self.log_message + str(message) + '|'
def fatal_reward(self):
return not self.all_wheels_on_track or self.is_reversed
def speed_reward(self, reward):
next_point = max(self.closest_waypoints)
if next_point in self.WAYPOINTS_FAST:
if self.speed >= self.HIGH_SPEED:
reward += self.WEIGHT_SPEED
else:
reward -= self.WEIGHT_SPEED
elif next_point in self.WAYPOINTS_SLOW:
if self.speed <= self.SLOW_SPEED:
reward += self.WEIGHT_SPEED
else:
reward -= self.WEIGHT_SPEED
return reward
def lane_reward(self, reward):
next_point = max(self.closest_waypoints)
if next_point in self.WAYPOINTS_KEEP_LEFT and self.is_left_of_center:
reward += self.WEIGHT_CORRECT_LANE
elif next_point in self.WAYPOINTS_KEEP_RIGHT and not self.is_left_of_center:
reward += self.WEIGHT_CORRECT_LANE
elif next_point in self.WAYPOINTS_KEEP_RIGHT and self.centre_variance < self.CENTRE_TOLERANCE:
reward += self.WEIGHT_CORRECT_LANE
else:
reward -= self.WEIGHT_CORRECT_LANE
return reward
def incremental_progress(self, reward):
reward += (self.progress * self.WEIGHT_PROGRESS)
return reward
def faster_than_target_time(self, reward):
if (self.steps % self.REWARD_STEP) == 0 and self.progress/100 > (self.steps/self.TOTAL_NUM_STEPS):
reward += self.progress - (self.steps/self.TOTAL_NUM_STEPS)*100
def look_ahead(self, reward):
next_point = max(self.closest_waypoints)
n_waypoints_ahead = self.waypoints[(
next_point + self.LOOK_AHEAD_COUNT) % len(self.waypoints)]
y2 = n_waypoints_ahead[1]
x2 = n_waypoints_ahead[0]
n_waypoints_ahead_direction = math.degrees(math.atan2(y2 - y, x2 - x))
# Heading is based on Positive from X Axis CCW and Negative below X Axis (-180 to 180 deg)
heading_difference = abs(n_waypoints_ahead_direction - self.heading)
if heading_difference > 180:
heading_difference = 360 - heading_difference
# Reward Better Aligned direction towards n waypoints ahead
if heading_difference > 45:
reward -= (self.WEIGHT_HEADING * 2)
elif heading_difference > 30:
reward -= self.WEIGHT_HEADING
elif heading_difference > 15:
reward -= (self.WEIGHT_HEADING * 0.5)
return reward
def evaluate(self):
reward_output = self.reward
try:
speed_component = self.speed_reward(self, 1.0)
lane_component = self.lane_reward(self, 1.0)
progress_component = self.incremental_progress(self, 1.0)
target_component = self.faster_than_target_time(self, 1.0)
look_ahead_component = self.look_ahead(self, 1.0)
if(self.fatal_reward):
reward_output = self.WORST_REWARD
else:
reward_output = float(speed_component + lane_component + progress_component + target_component + look_ahead_component)
except Exception as e:
print("Error : " + str(e))
print(traceback.format_exc())
self.log_feature(reward_output)
self.status_to_string()
return float(reward_output)
def reward_function(params):
reward = Reward(params)
return reward.evaluate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment