Skip to content

Instantly share code, notes, and snippets.

@AshNguyen
Created May 9, 2020 11:06
Show Gist options
  • Save AshNguyen/1516d417bae3b4e6c12c80a011cc7d0b to your computer and use it in GitHub Desktop.
Save AshNguyen/1516d417bae3b4e6c12c80a011cc7d0b to your computer and use it in GitHub Desktop.
1 Lane Traffic
class TrafficSimulator_1Lane:
def __init__(self, initial_density, length, v_max, p_slow_down, random_v = False):
'''
Initialize the 1 lane traffic simulator object, also keep track of the flow density as a list
Args:
initial_density (float, between 0 and 1): the initial density of cars on the lane
length (int): the length of the lane
v_max (int): the maximum velocity of any car on the lane
p_slow_down (float, between 0 and 1): the probability of randomly decreasing the velocity of a car
random_v (bool): whether or not the initial velocity of the car is randomly assigned, if False then
the initial velocities of the car are set to 0
Outputs: a traffic simulator object
'''
#Simulator parameters
self.initial_density = initial_density
self.length = length
self.v_max = v_max
self.p_slow_down = p_slow_down
#Building the initial state
self.current_state = -np.ones(self.length)
if random_v: #Assigning random velocities to the cars, which is like taking a snapshot of a street
choices = np.random.choice(self.length, size=int(self.initial_density*self.length), replace=False)
for i in choices:
self.current_state[i] = np.random.choice(range(1, self.v_max), size=1)
else: #Assigning all zeros to the car velocity
self.current_state[np.random.choice(self.length, size=int(self.initial_density*self.length), replace=False)] \
= 0
self.next_state = np.copy(self.current_state)
#Time-keeping and other logs
self.time = 0
self.q = []
def flow_count(self):
'''
Count the number of cars going through the periodic boundary
Args: None
Outputs: the number of car going through the boundary
'''
count = 0
for i in range(1, self.v_max+1):
if self.current_state[-i] >= i:
count += 1
return count
def increment_time(self, actions=None):
'''
Simulate the update of velocity and update of position of the cars, for one time step
Update the current state and the next state attributes
Args: actions
Outputs: None
'''
##Calculating new velocities
car_indices = (self.next_state > -1)
self.current_state[car_indices] += actions
for car_id in list(np.where(np.array(car_indices) != 0)[0]):
# print(car_id)
if self.current_state[car_id] > self.v_max:
self.current_state[car_id] = self.v_max
elif self.current_state[car_id] < 0:
self.current_state[car_id] = 0
car_below_maxv_indices = (self.next_state < self.v_max)
#increment all velocity by 1, if < v_max
self.next_state += np.array(car_indices*car_below_maxv_indices, dtype=int)
#figure out where the cars are, including first car new relative position
# because of the periodic boundary
car_where = np.where(car_indices == 1)
car_where = np.append(car_where, car_where[0][0] + self.length)
#count the distance between cars
car_distance_counts = np.ediff1d(car_where) - 1
car_distance = -np.ones(shape=self.length)
car_distance[np.where(car_indices == 1)[0]] = car_distance_counts
#calculate deterministic new velocities, by element-wise minimum of increase velocity and distance
self.next_state = np.minimum(self.next_state, car_distance)
#randomly decrease velocity, due to driver behaviors
car_above_minv_indices = (self.next_state > 0)
random_decrease = (np.random.uniform(size=self.length) < self.p_slow_down)
self.next_state -= (car_above_minv_indices*random_decrease)
##Counting the flow in this step, after updating the veloctiy
self.q.append(self.flow_count())
##Moving the cars
self.current_state = -np.ones(shape=self.current_state.shape)
next_positions = (np.where(self.next_state > -1)[0] + self.next_state[car_indices])%self.length
self.current_state[np.array(next_positions, dtype=int)] = self.next_state[car_indices]
self.next_state = np.copy(self.current_state)
self.time += 1
return self.current_state, np.where(self.current_state > 0)[0].shape[0]/np.where(np.array(car_indices) != 0)[0].shape[0]
def display(self):
'''
Print out the current state of the simulation
Args: None
Outputs: None
'''
print(''.join('-' if x == -1 else str(int(x)) for x in self.current_state))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment