Created
May 9, 2020 11:06
-
-
Save AshNguyen/1516d417bae3b4e6c12c80a011cc7d0b to your computer and use it in GitHub Desktop.
1 Lane Traffic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TrafficSimulator_1Lane: | |
def __init__(self, initial_density, length, v_max, p_slow_down, random_v = False): | |
''' | |
Initialize the 1 lane traffic simulator object, also keep track of the flow density as a list | |
Args: | |
initial_density (float, between 0 and 1): the initial density of cars on the lane | |
length (int): the length of the lane | |
v_max (int): the maximum velocity of any car on the lane | |
p_slow_down (float, between 0 and 1): the probability of randomly decreasing the velocity of a car | |
random_v (bool): whether or not the initial velocity of the car is randomly assigned, if False then | |
the initial velocities of the car are set to 0 | |
Outputs: a traffic simulator object | |
''' | |
#Simulator parameters | |
self.initial_density = initial_density | |
self.length = length | |
self.v_max = v_max | |
self.p_slow_down = p_slow_down | |
#Building the initial state | |
self.current_state = -np.ones(self.length) | |
if random_v: #Assigning random velocities to the cars, which is like taking a snapshot of a street | |
choices = np.random.choice(self.length, size=int(self.initial_density*self.length), replace=False) | |
for i in choices: | |
self.current_state[i] = np.random.choice(range(1, self.v_max), size=1) | |
else: #Assigning all zeros to the car velocity | |
self.current_state[np.random.choice(self.length, size=int(self.initial_density*self.length), replace=False)] \ | |
= 0 | |
self.next_state = np.copy(self.current_state) | |
#Time-keeping and other logs | |
self.time = 0 | |
self.q = [] | |
def flow_count(self): | |
''' | |
Count the number of cars going through the periodic boundary | |
Args: None | |
Outputs: the number of car going through the boundary | |
''' | |
count = 0 | |
for i in range(1, self.v_max+1): | |
if self.current_state[-i] >= i: | |
count += 1 | |
return count | |
def increment_time(self, actions=None): | |
''' | |
Simulate the update of velocity and update of position of the cars, for one time step | |
Update the current state and the next state attributes | |
Args: actions | |
Outputs: None | |
''' | |
##Calculating new velocities | |
car_indices = (self.next_state > -1) | |
self.current_state[car_indices] += actions | |
for car_id in list(np.where(np.array(car_indices) != 0)[0]): | |
# print(car_id) | |
if self.current_state[car_id] > self.v_max: | |
self.current_state[car_id] = self.v_max | |
elif self.current_state[car_id] < 0: | |
self.current_state[car_id] = 0 | |
car_below_maxv_indices = (self.next_state < self.v_max) | |
#increment all velocity by 1, if < v_max | |
self.next_state += np.array(car_indices*car_below_maxv_indices, dtype=int) | |
#figure out where the cars are, including first car new relative position | |
# because of the periodic boundary | |
car_where = np.where(car_indices == 1) | |
car_where = np.append(car_where, car_where[0][0] + self.length) | |
#count the distance between cars | |
car_distance_counts = np.ediff1d(car_where) - 1 | |
car_distance = -np.ones(shape=self.length) | |
car_distance[np.where(car_indices == 1)[0]] = car_distance_counts | |
#calculate deterministic new velocities, by element-wise minimum of increase velocity and distance | |
self.next_state = np.minimum(self.next_state, car_distance) | |
#randomly decrease velocity, due to driver behaviors | |
car_above_minv_indices = (self.next_state > 0) | |
random_decrease = (np.random.uniform(size=self.length) < self.p_slow_down) | |
self.next_state -= (car_above_minv_indices*random_decrease) | |
##Counting the flow in this step, after updating the veloctiy | |
self.q.append(self.flow_count()) | |
##Moving the cars | |
self.current_state = -np.ones(shape=self.current_state.shape) | |
next_positions = (np.where(self.next_state > -1)[0] + self.next_state[car_indices])%self.length | |
self.current_state[np.array(next_positions, dtype=int)] = self.next_state[car_indices] | |
self.next_state = np.copy(self.current_state) | |
self.time += 1 | |
return self.current_state, np.where(self.current_state > 0)[0].shape[0]/np.where(np.array(car_indices) != 0)[0].shape[0] | |
def display(self): | |
''' | |
Print out the current state of the simulation | |
Args: None | |
Outputs: None | |
''' | |
print(''.join('-' if x == -1 else str(int(x)) for x in self.current_state)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment