Skip to content

Instantly share code, notes, and snippets.

@rahulbhadani
Last active December 11, 2022 09:36
Show Gist options
  • Save rahulbhadani/92d3be52529a64372c796ca5e7cb3770 to your computer and use it in GitHub Desktop.
Save rahulbhadani/92d3be52529a64372c796ca5e7cb3770 to your computer and use it in GitHub Desktop.
Markove Decision Process for Autonomous Driving
import numpy as np
class MDP:
# Author: Rahul Bhadani
# Initial Date: Dec 10, 2022
def __init__(self, velocity_min, velocity_max, acceleration_min, acceleration_max, velocity_step, acceleration_step, acceleration_min_accelerate, acceleration_max_accelerate):
# Define minimum and maximum values for velocity and acceleration
self.VELOCITY_MIN = velocity_min
self.VELOCITY_MAX = velocity_max
self.ACCELERATION_MIN = acceleration_min
self.ACCELERATION_MAX = acceleration_max
# Define quantization step for velocity and acceleration
self.VELOCITY_STEP = velocity_step
self.ACCELERATION_STEP = acceleration_step
# Define minimum and maximum values for acceleration when accelerating or decelerating
self.ACCELERATION_MIN_ACCELERATE = acceleration_min_accelerate
self.ACCELERATION_MAX_ACCELERATE = acceleration_max_accelerate
# Calculate number of possible values for velocity and acceleration
self.num_velocity_values = int((self.VELOCITY_MAX - self.VELOCITY_MIN) / self.VELOCITY_STEP) + 1
self.num_acceleration_values = int((self.ACCELERATION_MAX - self.ACCELERATION_MIN) / self.ACCELERATION_STEP) + 1
# Create list of possible values for velocity and acceleration
self.velocity_values = [self.VELOCITY_MIN + i * self.VELOCITY_STEP for i in range(self.num_velocity_values)]
self.acceleration_values = [self.ACCELERATION_MIN + i * self.ACCELERATION_STEP for i in range(self.num_acceleration_values)]
# Function to calculate instantaneous fuel consumption based on velocity and acceleration
# fuel consumption is the function g
def fuel_consumption(self, v, a):
# Example function that calculates fuel consumption based on velocity and acceleration
return v * a
# Function to calculate available actions in a given state
def calculate_actions(self, v, a):
# Initialize list of available actions
actions = []
# If current velocity is less than maximum, add option to accelerate
if v < self.VELOCITY_MAX:
for a_new in self.acceleration_values:
if self.ACCELERATION_MIN_ACCELERATE <= a_new <= self.ACCELERATION_MAX_ACCELERATE:
actions.append((v, a_new))
# If current velocity is greater than minimum, add option to decelerate
if v > self.VELOCITY_MIN:
for a_new in self.acceleration_values:
if -self.ACCELERATION_MAX_ACCELERATE <= a_new <= -self.ACCELERATION_MIN_ACCELERATE:
actions.append((v, a_new))
# Add option to maintain current velocity and acceleration
actions.append((v, a))
return actions
# Function to evaluate the expected fuel consumption for a given state and action
def evaluate_fuel_consumption(self, v, a, v_new, a_new):
# Calculate expected fuel consumption for current state and action
fuel_current = self.fuel_consumption(v, a)
fuel_new = self.fuel_consumption(v_new, a_new)
return fuel_current + fuel_new
# Function to find the optimal action in a given state, based on minimizing expected fuel consumption
def find_optimal_action(self, v, a):
# Calculate available actions in current state
actions = self.calculate_actions(v, a)
# Initialize minimum expected fuel consumption
min_fuel = float("inf")
# Initialize optimal action
optimal_action = None
# Iterate over available actions and find action with minimum expected fuel consumption
for v_new, a_new in actions:
fuel = self.evaluate_fuel_consumption(v, a, v_new, a_new)
if fuel < min_fuel:
min_fuel = fuel
optimal_action = (v_new, a_new)
return optimal_action
# Function to calculate the optimal policy for the MDP
def calculate_optimal_policy(self):
# Initialize dictionary to store optimal policy
optimal_policy = {}
# Iterate over all possible states and calculate optimal action for each state
for v in self.velocity_values:
for a in self.acceleration_values:
optimal_policy[(v, a)] = self.find_optimal_action(v, a)
return optimal_policy
# Function to calculate the value function for the MDP
def calculate_value_function(self):
# Initialize dictionary to store values of each state
values = {}
# Iterate over all possible states and calculate value of each state
for v in self.velocity_values:
for a in self.acceleration_values:
values[(v, a)] = self.evaluate_value(v, a, values)
return values
# Function to evaluate the value of a state using the Bellman equation
def evaluate_value(self, v, a, values):
# Check if value of current state has already been calculated
if (v, a) in values:
return values[(v, a)]
# Calculate available actions in current state
actions = self.calculate_actions(v, a)
# Initialize maximum expected fuel consumption
max_fuel = float("-inf")
# Iterate over available actions and find action with maximum expected fuel consumption
for v_new, a_new in actions:
fuel = self.evaluate_fuel_consumption(v, a, v_new, a_new)
if fuel > max_fuel:
max_fuel = fuel
# Return maximum expected fuel consumption
return max_fuel
# Function to calculate the Q-function for the MDP
def calculate_q_function(self):
# Initialize dictionary to store values of each state-action pair
q_values = {}
# Iterate over all possible states and actions
for v in self.velocity_values:
for a in self.acceleration_values:
for v_new, a_new in self.calculate_actions(v, a):
q_values[((v, a), (v_new, a_new))] = self.evaluate_q_value(v, a, v_new, a_new, q_values)
return q_values
# Function to evaluate the Q-value of a state-action pair using the Bellman equation
def evaluate_q_value(self, v, a, v_new, a_new, q_values):
# Check if Q-value of current state-action pair has already been calculated
if ((v, a), (v_new, a_new)) in q_values:
return q_values[((v, a), (v_new, a_new))]
# Calculate expected fuel consumption in current state
fuel = self.evaluate_fuel_consumption(v, a, v_new, a_new)
# Calculate expected fuel consumption in next state by taking maximum over all possible actions
max_fuel = float("-inf")
for v_next, a_next in self.calculate_actions(v_new, a_new):
fuel_next = self.evaluate_q_value(v_new, a_new, v_next, a_next, q_values)
if fuel_next > max_fuel:
max_fuel = fuel_next
# Return expected fuel consumption in current state plus expected fuel consumption in next state
return fuel + max_fuel
# Define minimum and maximum values for velocity and acceleration
VELOCITY_MIN = 0.0
VELOCITY_MAX = 50.0
ACCELERATION_MIN = -4.5
ACCELERATION_MAX = 3.0
# Define quantization step for velocity and acceleration
VELOCITY_STEP = 0.1
ACCELERATION_STEP = 0.1
# Define minimum and maximum values for acceleration when accelerating or decelerating
ACCELERATION_MIN_ACCELERATE = -2.0
ACCELERATION_MAX_ACCELERATE = 1.5
# Create MDP instance
mdp = MDP(VELOCITY_MIN, VELOCITY_MAX, ACCELERATION_MIN, ACCELERATION_MAX, VELOCITY_STEP, ACCELERATION_STEP, ACCELERATION_MIN_ACCELERATE, ACCELERATION_MAX_ACCELERATE)
# Calculate optimal policy for the MDP
optimal_policy = mdp.calculate_optimal_policy()
# Print optimal policy for the first few states
for i in range(10):
for j in range(10):
print(optimal_policy[(mdp.velocity_values[i], mdp.acceleration_values[j])])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment