Last active
December 11, 2022 09:36
-
-
Save rahulbhadani/92d3be52529a64372c796ca5e7cb3770 to your computer and use it in GitHub Desktop.
Markove Decision Process for Autonomous Driving
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class MDP: | |
# Author: Rahul Bhadani | |
# Initial Date: Dec 10, 2022 | |
def __init__(self, velocity_min, velocity_max, acceleration_min, acceleration_max, velocity_step, acceleration_step, acceleration_min_accelerate, acceleration_max_accelerate): | |
# Define minimum and maximum values for velocity and acceleration | |
self.VELOCITY_MIN = velocity_min | |
self.VELOCITY_MAX = velocity_max | |
self.ACCELERATION_MIN = acceleration_min | |
self.ACCELERATION_MAX = acceleration_max | |
# Define quantization step for velocity and acceleration | |
self.VELOCITY_STEP = velocity_step | |
self.ACCELERATION_STEP = acceleration_step | |
# Define minimum and maximum values for acceleration when accelerating or decelerating | |
self.ACCELERATION_MIN_ACCELERATE = acceleration_min_accelerate | |
self.ACCELERATION_MAX_ACCELERATE = acceleration_max_accelerate | |
# Calculate number of possible values for velocity and acceleration | |
self.num_velocity_values = int((self.VELOCITY_MAX - self.VELOCITY_MIN) / self.VELOCITY_STEP) + 1 | |
self.num_acceleration_values = int((self.ACCELERATION_MAX - self.ACCELERATION_MIN) / self.ACCELERATION_STEP) + 1 | |
# Create list of possible values for velocity and acceleration | |
self.velocity_values = [self.VELOCITY_MIN + i * self.VELOCITY_STEP for i in range(self.num_velocity_values)] | |
self.acceleration_values = [self.ACCELERATION_MIN + i * self.ACCELERATION_STEP for i in range(self.num_acceleration_values)] | |
# Function to calculate instantaneous fuel consumption based on velocity and acceleration | |
# fuel consumption is the function g | |
def fuel_consumption(self, v, a): | |
# Example function that calculates fuel consumption based on velocity and acceleration | |
return v * a | |
# Function to calculate available actions in a given state | |
def calculate_actions(self, v, a): | |
# Initialize list of available actions | |
actions = [] | |
# If current velocity is less than maximum, add option to accelerate | |
if v < self.VELOCITY_MAX: | |
for a_new in self.acceleration_values: | |
if self.ACCELERATION_MIN_ACCELERATE <= a_new <= self.ACCELERATION_MAX_ACCELERATE: | |
actions.append((v, a_new)) | |
# If current velocity is greater than minimum, add option to decelerate | |
if v > self.VELOCITY_MIN: | |
for a_new in self.acceleration_values: | |
if -self.ACCELERATION_MAX_ACCELERATE <= a_new <= -self.ACCELERATION_MIN_ACCELERATE: | |
actions.append((v, a_new)) | |
# Add option to maintain current velocity and acceleration | |
actions.append((v, a)) | |
return actions | |
# Function to evaluate the expected fuel consumption for a given state and action | |
def evaluate_fuel_consumption(self, v, a, v_new, a_new): | |
# Calculate expected fuel consumption for current state and action | |
fuel_current = self.fuel_consumption(v, a) | |
fuel_new = self.fuel_consumption(v_new, a_new) | |
return fuel_current + fuel_new | |
# Function to find the optimal action in a given state, based on minimizing expected fuel consumption | |
def find_optimal_action(self, v, a): | |
# Calculate available actions in current state | |
actions = self.calculate_actions(v, a) | |
# Initialize minimum expected fuel consumption | |
min_fuel = float("inf") | |
# Initialize optimal action | |
optimal_action = None | |
# Iterate over available actions and find action with minimum expected fuel consumption | |
for v_new, a_new in actions: | |
fuel = self.evaluate_fuel_consumption(v, a, v_new, a_new) | |
if fuel < min_fuel: | |
min_fuel = fuel | |
optimal_action = (v_new, a_new) | |
return optimal_action | |
# Function to calculate the optimal policy for the MDP | |
def calculate_optimal_policy(self): | |
# Initialize dictionary to store optimal policy | |
optimal_policy = {} | |
# Iterate over all possible states and calculate optimal action for each state | |
for v in self.velocity_values: | |
for a in self.acceleration_values: | |
optimal_policy[(v, a)] = self.find_optimal_action(v, a) | |
return optimal_policy | |
# Function to calculate the value function for the MDP | |
def calculate_value_function(self): | |
# Initialize dictionary to store values of each state | |
values = {} | |
# Iterate over all possible states and calculate value of each state | |
for v in self.velocity_values: | |
for a in self.acceleration_values: | |
values[(v, a)] = self.evaluate_value(v, a, values) | |
return values | |
# Function to evaluate the value of a state using the Bellman equation | |
def evaluate_value(self, v, a, values): | |
# Check if value of current state has already been calculated | |
if (v, a) in values: | |
return values[(v, a)] | |
# Calculate available actions in current state | |
actions = self.calculate_actions(v, a) | |
# Initialize maximum expected fuel consumption | |
max_fuel = float("-inf") | |
# Iterate over available actions and find action with maximum expected fuel consumption | |
for v_new, a_new in actions: | |
fuel = self.evaluate_fuel_consumption(v, a, v_new, a_new) | |
if fuel > max_fuel: | |
max_fuel = fuel | |
# Return maximum expected fuel consumption | |
return max_fuel | |
# Function to calculate the Q-function for the MDP | |
def calculate_q_function(self): | |
# Initialize dictionary to store values of each state-action pair | |
q_values = {} | |
# Iterate over all possible states and actions | |
for v in self.velocity_values: | |
for a in self.acceleration_values: | |
for v_new, a_new in self.calculate_actions(v, a): | |
q_values[((v, a), (v_new, a_new))] = self.evaluate_q_value(v, a, v_new, a_new, q_values) | |
return q_values | |
# Function to evaluate the Q-value of a state-action pair using the Bellman equation | |
def evaluate_q_value(self, v, a, v_new, a_new, q_values): | |
# Check if Q-value of current state-action pair has already been calculated | |
if ((v, a), (v_new, a_new)) in q_values: | |
return q_values[((v, a), (v_new, a_new))] | |
# Calculate expected fuel consumption in current state | |
fuel = self.evaluate_fuel_consumption(v, a, v_new, a_new) | |
# Calculate expected fuel consumption in next state by taking maximum over all possible actions | |
max_fuel = float("-inf") | |
for v_next, a_next in self.calculate_actions(v_new, a_new): | |
fuel_next = self.evaluate_q_value(v_new, a_new, v_next, a_next, q_values) | |
if fuel_next > max_fuel: | |
max_fuel = fuel_next | |
# Return expected fuel consumption in current state plus expected fuel consumption in next state | |
return fuel + max_fuel | |
# Define minimum and maximum values for velocity and acceleration | |
VELOCITY_MIN = 0.0 | |
VELOCITY_MAX = 50.0 | |
ACCELERATION_MIN = -4.5 | |
ACCELERATION_MAX = 3.0 | |
# Define quantization step for velocity and acceleration | |
VELOCITY_STEP = 0.1 | |
ACCELERATION_STEP = 0.1 | |
# Define minimum and maximum values for acceleration when accelerating or decelerating | |
ACCELERATION_MIN_ACCELERATE = -2.0 | |
ACCELERATION_MAX_ACCELERATE = 1.5 | |
# Create MDP instance | |
mdp = MDP(VELOCITY_MIN, VELOCITY_MAX, ACCELERATION_MIN, ACCELERATION_MAX, VELOCITY_STEP, ACCELERATION_STEP, ACCELERATION_MIN_ACCELERATE, ACCELERATION_MAX_ACCELERATE) | |
# Calculate optimal policy for the MDP | |
optimal_policy = mdp.calculate_optimal_policy() | |
# Print optimal policy for the first few states | |
for i in range(10): | |
for j in range(10): | |
print(optimal_policy[(mdp.velocity_values[i], mdp.acceleration_values[j])]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment