Last active
July 2, 2022 14:50
-
-
Save kargarisaac/57324b76c57a84cba5c8b9ec7412ba51 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import numpy as np | |
import random | |
import gym | |
from gym import spaces | |
from gym.utils import seeding | |
import glob | |
import os | |
import torch | |
import sys | |
sys.path.append('/home/isaac/software/CARLA_0.9.9/PythonAPI/carla') | |
try: | |
sys.path.append(glob.glob('/home/isaac/software/CARLA_0.9.9/PythonAPI/carla/dist/carla-*%d.%d-%s.egg' % ( | |
sys.version_info.major, | |
sys.version_info.minor, | |
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) | |
except IndexError: | |
pass | |
from agents.navigation.global_route_planner import GlobalRoutePlanner | |
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO | |
from agents.navigation.controller import VehiclePIDController | |
from agents.navigation.agent import Agent | |
from agents.navigation.behavior_agent import BehaviorAgent | |
import matplotlib.pyplot as plt | |
import carla | |
import matplotlib.pyplot as plt | |
class CarlaEnv2(gym.Env): | |
"""An OpenAI gym wrapper for CARLA simulator.""" | |
def __init__(self, params): | |
super(CarlaEnv2, self).__init__() | |
# parameters | |
self.number_of_vehicles = params['number_of_vehicles'] | |
self.dt = params['dt'] | |
self.max_time_episode = params['max_time_episode'] | |
self.out_lane_thres = params['out_lane_thres'] | |
self.ego_autopilot = params['ego_autopilot'] | |
self.clnt_no = params['clnt_no'] | |
self.obs_size = 256 | |
# Connect to carla server and get world object | |
print('connecting to Carla server...') | |
client = carla.Client('localhost', params['port']) | |
client.set_timeout(2.0) | |
try: | |
self.world = client.load_world(params['town']) | |
except: | |
pass | |
self.world = client.get_world() | |
print('Carla server connected!') | |
# Set weather | |
self.world.set_weather(carla.WeatherParameters.ClearNoon) | |
## added for 0.9.9 and tm autopilot | |
settings = self.world.get_settings() | |
settings.synchronous_mode = True # Enables synchronous mode | |
settings.fixed_delta_seconds = params['dt'] | |
self.world.apply_settings(settings) | |
# get all spawn points- used for ego starting point. For others we find nearest points to ego | |
self.all_vehicle_spawn_points = list( | |
self.world.get_map().get_spawn_points()) | |
random.shuffle(self.all_vehicle_spawn_points) | |
# Create the ego vehicle blueprint | |
self.ego_bp = self._create_vehicle_bluepprint( | |
params['ego_vehicle_filter'], color='49,8,8') | |
self.ego_bp.set_attribute('role_name', 'hero') | |
# Collision sensor | |
self.collision_hist = [] # The collision history | |
self.collision_hist_l = 1 # collision history length | |
self.collision_bp = self.world.get_blueprint_library().find('sensor.other.collision') | |
# Camera sensor | |
self.camera_img = np.zeros( | |
(self.obs_size, self.obs_size, 3), dtype=np.uint8) | |
self.camera_trans = carla.Transform(carla.Location(x=0.8, z=1.7)) | |
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.rgb') | |
# Modify the attributes of the blueprint to set image resolution and field of view. | |
self.camera_bp.set_attribute('image_size_x', str(self.obs_size)) | |
self.camera_bp.set_attribute('image_size_y', str(self.obs_size)) | |
self.camera_bp.set_attribute('fov', '110') | |
# Set the time in seconds between sensor captures | |
self.camera_bp.set_attribute('sensor_tick', '0.02') | |
# Set fixed simulation step for synchronous mode | |
self.settings = self.world.get_settings() | |
self.settings.fixed_delta_seconds = self.dt | |
# Record the time of total steps and resetting steps | |
self.reset_step = 0 | |
self.total_step = 0 | |
# self.dests = None | |
dst = self.all_vehicle_spawn_points[1].location | |
self.dests = [[ | |
dst.x, | |
dst.y, | |
dst.z | |
]] | |
## traffic manager for 0.9.10 | |
self.tm = client.get_trafficmanager(params['tm_port']) | |
self.others_behavior_agents = [] | |
# first reset and plan a global rout(insode it) | |
self.reset() | |
def reset(self): | |
# Clear sensor objects | |
self.collision_sensor = None | |
# self.lidar_sensor = None | |
self.camera_sensor = None | |
# Delete sensors, vehicles and walkers | |
self._clear_all_actors(['sensor.other.collision', 'sensor.camera.rgb', 'vehicle.*', | |
'controller.ai.walker', 'walker.*']) | |
# Disable sync mode | |
self._set_synchronous_mode(False) | |
# Spawn the ego vehicle | |
transform = self.all_vehicle_spawn_points[0] | |
self._try_spawn_ego_vehicle_at(transform) | |
## get nearest spawn locations | |
all_wps = list( | |
self.world.get_map().get_spawn_points()) | |
near = [] | |
origin = self.ego.get_location() | |
for wp in all_wps: | |
dist = np.sqrt((wp.location.x - origin.x)**2 + (wp.location.y - origin.y)**2) | |
if dist < 100: | |
near.append(wp) | |
self.vehicle_spawn_points = near | |
# Spawn surrounding vehicles | |
random.shuffle(self.vehicle_spawn_points) | |
count = self.number_of_vehicles | |
if count > 0: | |
for spawn_point in self.vehicle_spawn_points: | |
# print(count) | |
if self._try_spawn_random_vehicle_at(spawn_point, number_of_wheels=[4]): | |
count -= 1 | |
if count <= 0: | |
break | |
while count > 0: | |
if self._try_spawn_random_vehicle_at(random.choice(self.vehicle_spawn_points), | |
number_of_wheels=[4]): | |
count -= 1 | |
# Add collision sensor | |
self.collision_sensor = self.world.spawn_actor( | |
self.collision_bp, carla.Transform(), attach_to=self.ego) | |
self.collision_sensor.listen(lambda event: get_collision_hist(event)) | |
def get_collision_hist(event): | |
impulse = event.normal_impulse | |
intensity = np.sqrt( | |
impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2) | |
self.collision_hist.append(intensity) | |
if len(self.collision_hist) > self.collision_hist_l: | |
self.collision_hist.pop(0) | |
self.collision_hist = [] | |
# Add camera sensor | |
self.camera_sensor = self.world.spawn_actor( | |
self.camera_bp, self.camera_trans, attach_to=self.ego) | |
self.camera_sensor.listen(lambda data: get_camera_img(data)) | |
def get_camera_img(data): | |
array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8")) | |
array = np.reshape(array, (data.height, data.width, 4)) | |
array = array[:, :, :3] | |
array = array[:, :, ::-1] | |
self.camera_img = array | |
# Update timesteps | |
self.time_step = 0 | |
self.reset_step += 1 | |
# Enable sync mode | |
self.settings.synchronous_mode = True | |
self.world.apply_settings(self.settings) | |
return self._get_obs() | |
def step(self, action): | |
# print(self.ego.get_transform()) | |
self.world.tick() | |
###------- ego controller for wp following -------- | |
# self.ego_behavior_agent.update_information(self.world) | |
# num_min_waypoints = 21 | |
# # Set new destination when target has been reached | |
# if len(self.ego_behavior_agent.get_local_planner().waypoints_queue) < num_min_waypoints: | |
# self.ego_behavior_agent.reroute(self.vehicle_spawn_points) | |
# speed_limit = self.ego.get_speed_limit() | |
# self.ego_behavior_agent.get_local_planner().set_speed(speed_limit) | |
# control = self.ego_behavior_agent.run_step() | |
# self.ego.apply_control(control) | |
###-----others control | |
# for ba in self.others_behavior_agents: | |
# ba.update_information(self.world) | |
# num_min_waypoints = 21 | |
# # Set new destination when target has been reached | |
# if len(ba.get_local_planner().waypoints_queue) < num_min_waypoints: | |
# ba.reroute(self.vehicle_spawn_points) | |
# speed_limit = ba.vehicle.get_speed_limit() | |
# ba.get_local_planner().set_speed(speed_limit) | |
# control = ba.run_step() | |
# ba.vehicle.apply_control(control) | |
return (self._get_obs(), self._terminal()) | |
def _create_vehicle_bluepprint(self, actor_filter, color=None, number_of_wheels=[4]): | |
"""Create the blueprint for a specific actor type. | |
Args: | |
actor_filter: a string indicating the actor type, e.g, 'vehicle.lincoln*'. | |
Returns: | |
bp: the blueprint object of carla. | |
""" | |
blueprints = self.world.get_blueprint_library().filter(actor_filter) | |
blueprint_library = [] | |
for nw in number_of_wheels: | |
blueprint_library = blueprint_library + [x for x in blueprints if | |
int(x.get_attribute('number_of_wheels')) == nw] | |
bp = random.choice(blueprint_library) | |
if bp.has_attribute('color'): | |
if not color: | |
color = random.choice( | |
bp.get_attribute('color').recommended_values) | |
bp.set_attribute('color', color) | |
return bp | |
def _set_synchronous_mode(self, synchronous=True): | |
"""Set whether to use the synchronous mode. | |
""" | |
self.settings.synchronous_mode = synchronous | |
self.world.apply_settings(self.settings) | |
def _try_spawn_random_vehicle_at(self, transform, number_of_wheels=[4]): | |
blueprint = self._create_vehicle_bluepprint( | |
'vehicle.*', number_of_wheels=number_of_wheels) | |
blueprint.set_attribute('role_name', 'others') | |
vehicle = self.world.try_spawn_actor(blueprint, transform) | |
tm_port = self.tm.get_port() | |
if vehicle is not None: | |
# vehicle.set_autopilot(True) | |
vehicle.set_autopilot(True, tm_port) | |
# start_point = transform.location | |
# behaviors = ['cautious', 'normal', 'aggressive'] | |
# behavior_idx = np.random.randint(0, 3) | |
# behavior_agent = BehaviorAgent(vehicle, behavior=behaviors[behavior_idx]) # cautious, normal, aggressive | |
# if self.vehicle_spawn_points[0].location != behavior_agent.vehicle.get_location(): | |
# end_point = self.vehicle_spawn_points[0].location | |
# else: | |
# end_point = self.vehicle_spawn_points[1].location | |
# behavior_agent.set_destination(start_point, end_point, clean=True) | |
# self.others_behavior_agents.append(behavior_agent) | |
return True | |
return False | |
def _try_spawn_ego_vehicle_at(self, transform): | |
"""Try to spawn the ego vehicle at specific transform. | |
Args: | |
transform: the carla transform object. | |
Returns: | |
Bool indicating whether the spawn is successful. | |
""" | |
vehicle = self.world.try_spawn_actor(self.ego_bp, transform) | |
tm_port = self.tm.get_port() | |
if vehicle is not None: | |
self.ego = vehicle | |
# start_point = transform.location | |
# dest_id = 0 | |
# end_point = carla.Location( | |
# x=self.dests[dest_id][0], y=self.dests[dest_id][1], z=self.dests[dest_id][2]) | |
# self.ego_behavior_agent = BehaviorAgent(self.ego, behavior='normal') # cautious, normal, aggressive | |
# if self.all_vehicle_spawn_points[0].location != self.ego_behavior_agent.vehicle.get_location(): | |
# destination = self.all_vehicle_spawn_points[0].location | |
# else: | |
# destination = self.all_vehicle_spawn_points[1].location | |
# # if self.ego_autopilot: | |
# self.ego_behavior_agent.set_destination(start_point, end_point, clean=True) | |
# self.waypoints = self.ego_behavior_agent._local_planner.waypoints_queue | |
self.ego.set_autopilot(True, tm_port) | |
print('ego vehicle is in autolipot mode') | |
return True | |
return False | |
def _set_carla_transform(self, pose): | |
"""Get a carla tranform object given pose. | |
Args: | |
pose: [x, y, yaw]. | |
Returns: | |
transform: the carla transform object | |
""" | |
transform = carla.Transform() | |
transform.location.x = pose[0] | |
transform.location.y = pose[1] | |
transform.rotation.yaw = pose[2] | |
return transform | |
def _get_obs(self): | |
"""Get the observations.""" | |
frontview = self.camera_img | |
return frontview | |
def _get_ego_pos(self): | |
"""Get the ego vehicle pose (x, y).""" | |
ego_trans = self.ego.get_transform() | |
ego_x = ego_trans.location.x | |
ego_y = ego_trans.location.y | |
return ego_x, ego_y | |
def _terminal(self): | |
"""Calculate whether to terminate the current episode.""" | |
# Get ego state | |
ego_x, ego_y = self._get_ego_pos() | |
# If collides | |
if len(self.collision_hist) > 0: | |
return True | |
# If reach maximum timestep | |
if self.time_step > self.max_time_episode: | |
return True | |
# If at destination | |
if self.dests is not None: # If at destination | |
for dest in self.dests: | |
if np.sqrt((ego_x - dest[0]) ** 2 + (ego_y - dest[1]) ** 2) < 6: | |
return True | |
return False | |
def _clear_all_actors(self, actor_filters): | |
"""Clear specific actors.""" | |
for actor_filter in actor_filters: | |
for actor in self.world.get_actors().filter(actor_filter): | |
if actor.is_alive: | |
if actor.type_id == 'controller.ai.walker': | |
actor.stop() | |
actor.destroy() | |
def destroy_sensors(self): | |
try: | |
self.collision_sensor.destroy() | |
self.collision_sensor = None | |
except: | |
print('no collision sensor to destroy') | |
self.collision_sensor = None | |
try: | |
self.camera_sensor.destroy() | |
self.camera_sensor = None | |
except: | |
print('no camera sensor to destroy') | |
self.camera_sensor = None | |
def main(): | |
# parameters for the gym_carla environment | |
params = { | |
'number_of_vehicles': 100, | |
'dt': 0.05, # time interval between two frames | |
'ego_vehicle_filter': 'vehicle.lincoln*', # filter for defining ego vehicle | |
'port': 4000, # connection port | |
'tm_port': 4050, | |
'town': 'Town01', # which town to simulate | |
'max_time_episode': 2000, # maximum timesteps per episode | |
'out_lane_thres': 2.0, # 2 # threshold for out of lane | |
'desired_speed': 8, # desired speed (m/s) | |
'ego_autopilot': False, | |
'clnt_no': 0, | |
} | |
# Set gym-carla environment | |
env = CarlaEnv2(params=params) | |
obs = env.reset() | |
while True: | |
action = [1.0, 0.0] | |
obs,done = env.step(action) | |
print('running ...') | |
if done: | |
obs = env.reset() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hello