Last active
June 5, 2024 05:22
-
-
Save cmpute/df3d6325f3781191304279be11976502 to your computer and use it in GitHub Desktop.
Data collector in Carla with autopilot ego and random generated vehicle and walkers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""This script is modified from PythonAPI/examples/generate_traffic.py and PythonAPI/examples/automatic_control.py""" | |
from __future__ import print_function | |
import argparse | |
import collections | |
import datetime | |
import glob | |
import logging | |
import math | |
import os | |
import numpy.random as random | |
import re | |
import sys | |
import time | |
import weakref | |
try: | |
import pygame | |
from pygame.locals import KMOD_CTRL | |
from pygame.locals import K_ESCAPE | |
from pygame.locals import K_q | |
except ImportError: | |
raise RuntimeError('cannot import pygame, make sure pygame package is installed') | |
try: | |
import numpy as np | |
except ImportError: | |
raise RuntimeError( | |
'cannot import numpy, make sure numpy package is installed') | |
import carla | |
from carla import ColorConverter as cc | |
from agents.navigation.behavior_agent import BehaviorAgent # pylint: disable=import-error | |
from agents.navigation.basic_agent import BasicAgent # pylint: disable=import-error | |
from agents.navigation.constant_velocity_agent import ConstantVelocityAgent # pylint: disable=import-error | |
# ============================================================================== | |
# -- Global functions ---------------------------------------------------------- | |
# ============================================================================== | |
def find_weather_presets(): | |
"""Method to find weather presets""" | |
rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)') | |
def name(x): return ' '.join(m.group(0) for m in rgx.finditer(x)) | |
presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)] | |
return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets] | |
def get_actor_display_name(actor, truncate=250): | |
"""Method to get actor display name""" | |
name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:]) | |
return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name | |
def get_actor_blueprints(world, filter, generation): | |
bps = world.get_blueprint_library().filter(filter) | |
if generation.lower() == "all": | |
return bps | |
# If the filter returns only one bp, we assume that this one needed | |
# and therefore, we ignore the generation | |
if len(bps) == 1: | |
return bps | |
try: | |
int_generation = int(generation) | |
# Check if generation is in available generations | |
if int_generation in [1, 2, 3]: | |
bps = [x for x in bps if int(x.get_attribute('generation')) == int_generation] | |
return bps | |
else: | |
print(" Warning! Actor Generation is not valid. No actor will be spawned.") | |
return [] | |
except: | |
print(" Warning! Actor Generation is not valid. No actor will be spawned.") | |
return [] | |
# ============================================================================== | |
# -- Traffic Manager --------------------------------------------------------------- | |
# ============================================================================== | |
class TrafficManager(object): | |
def __init__(self, world, traffic_manager, args): | |
self.vehicles_list = [] | |
self.walkers_list = [] | |
self.all_id = [] | |
self.client = carla.Client(args.host, args.port) | |
self.client.set_timeout(10.0) | |
self.world = world | |
self.traffic_manager = traffic_manager | |
traffic_manager.set_global_distance_to_leading_vehicle(2.5) | |
if args.respawn: | |
traffic_manager.set_respawn_dormant_vehicles(True) | |
if args.hybrid: | |
traffic_manager.set_hybrid_physics_mode(True) | |
traffic_manager.set_hybrid_physics_radius(70.0) | |
if args.seed is not None: | |
traffic_manager.set_random_device_seed(args.seed) | |
settings = world.get_settings() | |
if not args.sync: | |
print("You are currently in asynchronous mode. If this is a traffic simulation, " | |
"you could experience some issues. If it's not working correctly, switch to synchronous " | |
"mode by using traffic_manager.set_synchronous_mode(True)") | |
# if args.no_rendering: | |
# settings.no_rendering_mode = True | |
world.apply_settings(settings) | |
blueprints = get_actor_blueprints(world, args.filterv, args.generationv) | |
if not blueprints: | |
raise ValueError("Couldn't find any vehicles with the specified filters") | |
blueprintsWalkers = get_actor_blueprints(world, args.filterw, args.generationw) | |
if not blueprintsWalkers: | |
raise ValueError("Couldn't find any walkers with the specified filters") | |
if args.safe: | |
blueprints = [x for x in blueprints if x.get_attribute('base_type') == 'car'] | |
blueprints = sorted(blueprints, key=lambda bp: bp.id) | |
spawn_points = world.get_map().get_spawn_points() | |
number_of_spawn_points = len(spawn_points) | |
if args.number_of_vehicles < number_of_spawn_points: | |
random.shuffle(spawn_points) | |
elif args.number_of_vehicles > number_of_spawn_points: | |
msg = 'requested %d vehicles, but could only find %d spawn points' | |
logging.warning(msg, args.number_of_vehicles, number_of_spawn_points) | |
args.number_of_vehicles = number_of_spawn_points | |
# @todo cannot import these directly. | |
SpawnActor = carla.command.SpawnActor | |
SetAutopilot = carla.command.SetAutopilot | |
FutureActor = carla.command.FutureActor | |
# -------------- | |
# Spawn vehicles | |
# -------------- | |
batch = [] | |
for n, transform in enumerate(spawn_points): | |
if n >= args.number_of_vehicles: | |
break | |
blueprint = random.choice(blueprints) | |
if blueprint.has_attribute('color'): | |
color = random.choice(blueprint.get_attribute('color').recommended_values) | |
blueprint.set_attribute('color', color) | |
if blueprint.has_attribute('driver_id'): | |
driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values) | |
blueprint.set_attribute('driver_id', driver_id) | |
else: | |
blueprint.set_attribute('role_name', 'autopilot') | |
# spawn the cars and set their autopilot and light state all together | |
batch.append(SpawnActor(blueprint, transform) | |
.then(SetAutopilot(FutureActor, True, traffic_manager.get_port()))) | |
for response in self.client.apply_batch_sync(batch, False): | |
if response.error: | |
logging.error(response.error) | |
else: | |
self.vehicles_list.append(response.actor_id) | |
# Set automatic vehicle lights update if specified | |
if args.car_lights_on: | |
all_vehicle_actors = world.get_actors(self.vehicles_list) | |
for actor in all_vehicle_actors: | |
traffic_manager.update_vehicle_lights(actor, True) | |
# ------------- | |
# Spawn Walkers | |
# ------------- | |
# some settings | |
percentagePedestriansRunning = 0.0 # how many pedestrians will run | |
percentagePedestriansCrossing = 0.0 # how many pedestrians will walk through the road | |
if args.seedw: | |
world.set_pedestrians_seed(args.seedw) | |
random.seed(args.seedw) | |
# 1. take all the random locations to spawn | |
spawn_points = [] | |
for i in range(args.number_of_walkers): | |
spawn_point = carla.Transform() | |
loc = world.get_random_location_from_navigation() | |
if (loc != None): | |
spawn_point.location = loc | |
spawn_points.append(spawn_point) | |
# 2. we spawn the walker object | |
batch = [] | |
walker_speed = [] | |
for spawn_point in spawn_points: | |
walker_bp = random.choice(blueprintsWalkers) | |
# set as not invincible | |
if walker_bp.has_attribute('is_invincible'): | |
walker_bp.set_attribute('is_invincible', 'false') | |
# set the max speed | |
if walker_bp.has_attribute('speed'): | |
if (random.random() > percentagePedestriansRunning): | |
# walking | |
walker_speed.append(walker_bp.get_attribute('speed').recommended_values[1]) | |
else: | |
# running | |
walker_speed.append(walker_bp.get_attribute('speed').recommended_values[2]) | |
else: | |
print("Walker has no speed") | |
walker_speed.append(0.0) | |
batch.append(SpawnActor(walker_bp, spawn_point)) | |
results = self.client.apply_batch_sync(batch, True) | |
walker_speed2 = [] | |
for i in range(len(results)): | |
if results[i].error: | |
logging.error(results[i].error) | |
else: | |
self.walkers_list.append({"id": results[i].actor_id}) | |
walker_speed2.append(walker_speed[i]) | |
walker_speed = walker_speed2 | |
# 3. we spawn the walker controller | |
batch = [] | |
walker_controller_bp = world.get_blueprint_library().find('controller.ai.walker') | |
for i in range(len(self.walkers_list)): | |
batch.append(SpawnActor(walker_controller_bp, carla.Transform(), self.walkers_list[i]["id"])) | |
results = self.client.apply_batch_sync(batch, True) | |
for i in range(len(results)): | |
if results[i].error: | |
logging.error(results[i].error) | |
else: | |
self.walkers_list[i]["con"] = results[i].actor_id | |
# 4. we put together the walkers and controllers id to get the objects from their id | |
for i in range(len(self.walkers_list)): | |
self.all_id.append(self.walkers_list[i]["con"]) | |
self.all_id.append(self.walkers_list[i]["id"]) | |
all_actors = world.get_actors(self.all_id) | |
# wait for a tick to ensure client receives the last transform of the walkers we have just created | |
if not args.sync: | |
world.wait_for_tick() | |
else: | |
world.tick() | |
# 5. initialize each controller and set target to walk to (list is [controler, actor, controller, actor ...]) | |
# set how many pedestrians can cross the road | |
world.set_pedestrians_cross_factor(percentagePedestriansCrossing) | |
for i in range(0, len(self.all_id), 2): | |
# start walker | |
all_actors[i].start() | |
# set walk to random point | |
all_actors[i].go_to_location(world.get_random_location_from_navigation()) | |
# max speed | |
all_actors[i].set_max_speed(float(walker_speed[int(i/2)])) | |
print('spawned %d vehicles and %d walkers, press Ctrl+C to exit.' % (len(self.vehicles_list), len(self.walkers_list))) | |
# Example of how to use Traffic Manager parameters | |
traffic_manager.global_percentage_speed_difference(30.0) | |
def destroy(self): | |
print('\ndestroying %d vehicles' % len(self.vehicles_list)) | |
self.client.apply_batch([carla.command.DestroyActor(x) for x in self.vehicles_list]) | |
# stop walker controllers (list is [controller, actor, controller, actor ...]) | |
all_actors = self.world.get_actors(self.all_id) | |
for i in range(0, len(self.all_id), 2): | |
all_actors[i].stop() | |
print('\ndestroying %d walkers' % len(self.walkers_list)) | |
self.client.apply_batch([carla.command.DestroyActor(x) for x in self.all_id]) | |
time.sleep(0.5) | |
# ============================================================================== | |
# -- World --------------------------------------------------------------- | |
# ============================================================================== | |
class World(object): | |
""" Class representing the surrounding environment """ | |
def __init__(self, carla_world, hud, args): | |
"""Constructor method""" | |
self._args = args | |
self.world = carla_world | |
try: | |
self.map = self.world.get_map() | |
except RuntimeError as error: | |
print('RuntimeError: {}'.format(error)) | |
print(' The server could not send the OpenDRIVE (.xodr) file:') | |
print(' Make sure it exists, has the same name of your town, and is correct.') | |
sys.exit(1) | |
self.player = None | |
self.collision_sensor = None | |
self.lane_invasion_sensor = None | |
self.gnss_sensor = None | |
self.camera_manager = None | |
self._weather_presets = find_weather_presets() | |
self._weather_index = 0 | |
self._actor_filter = args.filter | |
self._actor_generation = args.generation | |
self.hud = hud | |
self.headless = args.headless | |
if not self.headless: | |
self.restart(args) | |
self.world.on_tick(hud.on_world_tick) | |
else: | |
self.restart_headless(args) | |
self.recording_enabled = False | |
self.recording_start = 0 | |
def spawn_player(self): | |
# Get a random blueprint. | |
blueprint_list = get_actor_blueprints(self.world, self._actor_filter, self._actor_generation) | |
if not blueprint_list: | |
raise ValueError("Couldn't find any blueprints with the specified filters") | |
blueprint = random.choice(blueprint_list) | |
blueprint.set_attribute('role_name', 'hero') | |
if blueprint.has_attribute('color'): | |
color = random.choice(blueprint.get_attribute('color').recommended_values) | |
blueprint.set_attribute('color', color) | |
# Spawn the player. | |
if self.player is not None: | |
spawn_point = self.player.get_transform() | |
spawn_point.location.z += 2.0 | |
spawn_point.rotation.roll = 0.0 | |
spawn_point.rotation.pitch = 0.0 | |
self.destroy() | |
self.player = self.world.try_spawn_actor(blueprint, spawn_point) | |
self.modify_vehicle_physics(self.player) | |
while self.player is None: | |
if not self.map.get_spawn_points(): | |
print('There are no spawn points available in your map/town.') | |
print('Please add some Vehicle Spawn Point to your UE4 scene.') | |
sys.exit(1) | |
spawn_points = self.map.get_spawn_points() | |
spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform() | |
self.player = self.world.try_spawn_actor(blueprint, spawn_point) | |
self.modify_vehicle_physics(self.player) | |
def restart_headless(self, args): | |
"""Restart the world without HUD""" | |
# spawn the ego vehicle | |
self.spawn_player() | |
if self._args.sync: | |
self.world.tick() | |
else: | |
self.world.wait_for_tick() | |
# spawn necessary sensors | |
self.collision_sensor = CollisionSensor(self.player, self.hud) | |
def restart(self, args): | |
"""Restart the world""" | |
# spawn the ego vehicle | |
self.spawn_player() | |
if self._args.sync: | |
self.world.tick() | |
else: | |
self.world.wait_for_tick() | |
# Keep same camera config if the camera manager exists. | |
cam_index = self.camera_manager.index if self.camera_manager is not None else 0 | |
cam_pos_id = self.camera_manager.transform_index if self.camera_manager is not None else 0 | |
# spawn necessary sensors | |
self.collision_sensor = CollisionSensor(self.player, self.hud) | |
self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud) | |
self.gnss_sensor = GnssSensor(self.player) | |
self.camera_manager = CameraManager(self.player, self.hud) | |
self.camera_manager.transform_index = cam_pos_id | |
self.camera_manager.set_sensor(cam_index, notify=False) | |
actor_type = get_actor_display_name(self.player) | |
self.hud.notification(actor_type) | |
def next_weather(self, reverse=False): | |
"""Get next weather setting""" | |
self._weather_index += -1 if reverse else 1 | |
self._weather_index %= len(self._weather_presets) | |
preset = self._weather_presets[self._weather_index] | |
self.hud.notification('Weather: %s' % preset[1]) | |
self.player.get_world().set_weather(preset[0]) | |
def modify_vehicle_physics(self, actor): | |
#If actor is not a vehicle, we cannot use the physics control | |
try: | |
physics_control = actor.get_physics_control() | |
physics_control.use_sweep_wheel_collision = True | |
actor.apply_physics_control(physics_control) | |
except Exception: | |
pass | |
def tick(self, clock): | |
"""Method for every tick""" | |
if not self.headless: | |
self.hud.tick(self, clock) | |
def render(self, display): | |
"""Render world""" | |
self.camera_manager.render(display) | |
self.hud.render(display) | |
def destroy_sensors(self): | |
"""Destroy sensors""" | |
self.camera_manager.sensor.destroy() | |
self.camera_manager.sensor = None | |
self.camera_manager.index = None | |
def destroy(self): | |
"""Destroys all actors""" | |
actors = [ | |
getattr(self.camera_manager, "sensor", None), | |
getattr(self.collision_sensor, "sensor", None), | |
getattr(self.lane_invasion_sensor, "sensor", None), | |
getattr(self.gnss_sensor, "sensor", None), | |
self.player, | |
] | |
for actor in actors: | |
if actor is not None: | |
actor.destroy() | |
# ============================================================================== | |
# -- DataStorage --------------------------------------------------------------- | |
# ============================================================================== | |
class DataStorage(object): | |
def __init__(self, world): | |
self.world = world | |
self.timers = [] | |
self.log_interval = 1 | |
def tick(self): | |
self.timers.append(time.time()) | |
if self.timers[-1] - self.timers[0] > self.log_interval: | |
fps = (len(self.timers) - 1) / (self.timers[-1] - self.timers[0]) | |
ego_location = self.world.player.get_location() | |
colhist = self.world.collision_sensor.get_collision_history() | |
print("FPS: {:.2f}, ego location: {:.2f}, {:.2f}, collision frames: {}".format(fps, ego_location.x, ego_location.y, len(colhist))) | |
self.timers.clear() | |
# ============================================================================== | |
# -- KeyboardControl ----------------------------------------------------------- | |
# ============================================================================== | |
class KeyboardControl(object): | |
def __init__(self, world): | |
world.hud.notification("Press 'H' or '?' for help.", seconds=4.0) | |
def parse_events(self): | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
return True | |
if event.type == pygame.KEYUP: | |
if self._is_quit_shortcut(event.key): | |
return True | |
@staticmethod | |
def _is_quit_shortcut(key): | |
"""Shortcut for quitting""" | |
return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL) | |
# ============================================================================== | |
# -- HUD ----------------------------------------------------------------------- | |
# ============================================================================== | |
class HUD(object): | |
"""Class for HUD text""" | |
def __init__(self, width, height): | |
"""Constructor method""" | |
self.dim = (width, height) | |
font = pygame.font.Font(pygame.font.get_default_font(), 20) | |
font_name = 'courier' if os.name == 'nt' else 'mono' | |
fonts = [x for x in pygame.font.get_fonts() if font_name in x] | |
default_font = 'ubuntumono' | |
mono = default_font if default_font in fonts else fonts[0] | |
mono = pygame.font.match_font(mono) | |
self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14) | |
self._notifications = FadingText(font, (width, 40), (0, height - 40)) | |
self.help = HelpText(pygame.font.Font(mono, 24), width, height) | |
self.server_fps = 0 | |
self.frame = 0 | |
self.simulation_time = 0 | |
self._show_info = True | |
self._info_text = [] | |
self._server_clock = pygame.time.Clock() | |
def on_world_tick(self, timestamp): | |
"""Gets informations from the world at every tick""" | |
self._server_clock.tick() | |
self.server_fps = self._server_clock.get_fps() | |
self.frame = timestamp.frame_count | |
self.simulation_time = timestamp.elapsed_seconds | |
def tick(self, world, clock): | |
"""HUD method for every tick""" | |
self._notifications.tick(world, clock) | |
if not self._show_info: | |
return | |
transform = world.player.get_transform() | |
vel = world.player.get_velocity() | |
control = world.player.get_control() | |
heading = 'N' if abs(transform.rotation.yaw) < 89.5 else '' | |
heading += 'S' if abs(transform.rotation.yaw) > 90.5 else '' | |
heading += 'E' if 179.5 > transform.rotation.yaw > 0.5 else '' | |
heading += 'W' if -0.5 > transform.rotation.yaw > -179.5 else '' | |
colhist = world.collision_sensor.get_collision_history() | |
collision = [colhist[x + self.frame - 200] for x in range(0, 200)] | |
max_col = max(1.0, max(collision)) | |
collision = [x / max_col for x in collision] | |
vehicles = world.world.get_actors().filter('vehicle.*') | |
self._info_text = [ | |
'Server: % 16.0f FPS' % self.server_fps, | |
'Client: % 16.0f FPS' % clock.get_fps(), | |
'', | |
'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20), | |
'Map: % 20s' % world.map.name.split('/')[-1], | |
'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)), | |
'', | |
'Speed: % 15.0f km/h' % (3.6 * math.sqrt(vel.x**2 + vel.y**2 + vel.z**2)), | |
u'Heading:% 16.0f\N{DEGREE SIGN} % 2s' % (transform.rotation.yaw, heading), | |
'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (transform.location.x, transform.location.y)), | |
'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)), | |
'Height: % 18.0f m' % transform.location.z, | |
''] | |
if isinstance(control, carla.VehicleControl): | |
self._info_text += [ | |
('Throttle:', control.throttle, 0.0, 1.0), | |
('Steer:', control.steer, -1.0, 1.0), | |
('Brake:', control.brake, 0.0, 1.0), | |
('Reverse:', control.reverse), | |
('Hand brake:', control.hand_brake), | |
('Manual:', control.manual_gear_shift), | |
'Gear: %s' % {-1: 'R', 0: 'N'}.get(control.gear, control.gear)] | |
elif isinstance(control, carla.WalkerControl): | |
self._info_text += [ | |
('Speed:', control.speed, 0.0, 5.556), | |
('Jump:', control.jump)] | |
self._info_text += [ | |
'', | |
'Collision:', | |
collision, | |
'', | |
'Number of vehicles: % 8d' % len(vehicles)] | |
if len(vehicles) > 1: | |
self._info_text += ['Nearby vehicles:'] | |
def dist(l): | |
return math.sqrt((l.x - transform.location.x)**2 + (l.y - transform.location.y) | |
** 2 + (l.z - transform.location.z)**2) | |
vehicles = [(dist(x.get_location()), x) for x in vehicles if x.id != world.player.id] | |
for dist, vehicle in sorted(vehicles): | |
if dist > 200.0: | |
break | |
vehicle_type = get_actor_display_name(vehicle, truncate=22) | |
self._info_text.append('% 4dm %s' % (dist, vehicle_type)) | |
def toggle_info(self): | |
"""Toggle info on or off""" | |
self._show_info = not self._show_info | |
def notification(self, text, seconds=2.0): | |
"""Notification text""" | |
self._notifications.set_text(text, seconds=seconds) | |
def error(self, text): | |
"""Error text""" | |
self._notifications.set_text('Error: %s' % text, (255, 0, 0)) | |
def render(self, display): | |
"""Render for HUD class""" | |
if self._show_info: | |
info_surface = pygame.Surface((220, self.dim[1])) | |
info_surface.set_alpha(100) | |
display.blit(info_surface, (0, 0)) | |
v_offset = 4 | |
bar_h_offset = 100 | |
bar_width = 106 | |
for item in self._info_text: | |
if v_offset + 18 > self.dim[1]: | |
break | |
if isinstance(item, list): | |
if len(item) > 1: | |
points = [(x + 8, v_offset + 8 + (1 - y) * 30) for x, y in enumerate(item)] | |
pygame.draw.lines(display, (255, 136, 0), False, points, 2) | |
item = None | |
v_offset += 18 | |
elif isinstance(item, tuple): | |
if isinstance(item[1], bool): | |
rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6)) | |
pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1) | |
else: | |
rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6)) | |
pygame.draw.rect(display, (255, 255, 255), rect_border, 1) | |
fig = (item[1] - item[2]) / (item[3] - item[2]) | |
if item[2] < 0.0: | |
rect = pygame.Rect( | |
(bar_h_offset + fig * (bar_width - 6), v_offset + 8), (6, 6)) | |
else: | |
rect = pygame.Rect((bar_h_offset, v_offset + 8), (fig * bar_width, 6)) | |
pygame.draw.rect(display, (255, 255, 255), rect) | |
item = item[0] | |
if item: # At this point has to be a str. | |
surface = self._font_mono.render(item, True, (255, 255, 255)) | |
display.blit(surface, (8, v_offset)) | |
v_offset += 18 | |
self._notifications.render(display) | |
self.help.render(display) | |
# ============================================================================== | |
# -- FadingText ---------------------------------------------------------------- | |
# ============================================================================== | |
class FadingText(object): | |
""" Class for fading text """ | |
def __init__(self, font, dim, pos): | |
"""Constructor method""" | |
self.font = font | |
self.dim = dim | |
self.pos = pos | |
self.seconds_left = 0 | |
self.surface = pygame.Surface(self.dim) | |
def set_text(self, text, color=(255, 255, 255), seconds=2.0): | |
"""Set fading text""" | |
text_texture = self.font.render(text, True, color) | |
self.surface = pygame.Surface(self.dim) | |
self.seconds_left = seconds | |
self.surface.fill((0, 0, 0, 0)) | |
self.surface.blit(text_texture, (10, 11)) | |
def tick(self, _, clock): | |
"""Fading text method for every tick""" | |
delta_seconds = 1e-3 * clock.get_time() | |
self.seconds_left = max(0.0, self.seconds_left - delta_seconds) | |
self.surface.set_alpha(500.0 * self.seconds_left) | |
def render(self, display): | |
"""Render fading text method""" | |
display.blit(self.surface, self.pos) | |
# ============================================================================== | |
# -- HelpText ------------------------------------------------------------------ | |
# ============================================================================== | |
class HelpText(object): | |
""" Helper class for text render""" | |
def __init__(self, font, width, height): | |
"""Constructor method""" | |
lines = __doc__.split('\n') | |
self.font = font | |
self.dim = (680, len(lines) * 22 + 12) | |
self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1]) | |
self.seconds_left = 0 | |
self.surface = pygame.Surface(self.dim) | |
self.surface.fill((0, 0, 0, 0)) | |
for i, line in enumerate(lines): | |
text_texture = self.font.render(line, True, (255, 255, 255)) | |
self.surface.blit(text_texture, (22, i * 22)) | |
self._render = False | |
self.surface.set_alpha(220) | |
def toggle(self): | |
"""Toggle on or off the render help""" | |
self._render = not self._render | |
def render(self, display): | |
"""Render help text method""" | |
if self._render: | |
display.blit(self.surface, self.pos) | |
# ============================================================================== | |
# -- CollisionSensor ----------------------------------------------------------- | |
# ============================================================================== | |
class CollisionSensor(object): | |
""" Class for collision sensors""" | |
def __init__(self, parent_actor, hud): | |
"""Constructor method""" | |
self.sensor = None | |
self.history = [] | |
self._parent = parent_actor | |
self.hud = hud | |
world = self._parent.get_world() | |
blueprint = world.get_blueprint_library().find('sensor.other.collision') | |
self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent) | |
# We need to pass the lambda a weak reference to | |
# self to avoid circular reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) | |
def get_collision_history(self): | |
"""Gets the history of collisions""" | |
history = collections.defaultdict(int) | |
for frame, intensity in self.history: | |
history[frame] += intensity | |
return history | |
@staticmethod | |
def _on_collision(weak_self, event): | |
"""On collision method""" | |
self = weak_self() | |
if not self: | |
return | |
actor_type = get_actor_display_name(event.other_actor) | |
self.hud.notification('Collision with %r' % actor_type) | |
impulse = event.normal_impulse | |
intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2) | |
self.history.append((event.frame, intensity)) | |
if len(self.history) > 4000: | |
self.history.pop(0) | |
# ============================================================================== | |
# -- LaneInvasionSensor -------------------------------------------------------- | |
# ============================================================================== | |
class LaneInvasionSensor(object): | |
"""Class for lane invasion sensors""" | |
def __init__(self, parent_actor, hud): | |
"""Constructor method""" | |
self.sensor = None | |
self._parent = parent_actor | |
self.hud = hud | |
world = self._parent.get_world() | |
bp = world.get_blueprint_library().find('sensor.other.lane_invasion') | |
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) | |
# We need to pass the lambda a weak reference to self to avoid circular | |
# reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) | |
@staticmethod | |
def _on_invasion(weak_self, event): | |
"""On invasion method""" | |
self = weak_self() | |
if not self: | |
return | |
lane_types = set(x.type for x in event.crossed_lane_markings) | |
text = ['%r' % str(x).split()[-1] for x in lane_types] | |
self.hud.notification('Crossed line %s' % ' and '.join(text)) | |
# ============================================================================== | |
# -- GnssSensor -------------------------------------------------------- | |
# ============================================================================== | |
class GnssSensor(object): | |
""" Class for GNSS sensors""" | |
def __init__(self, parent_actor): | |
"""Constructor method""" | |
self.sensor = None | |
self._parent = parent_actor | |
self.lat = 0.0 | |
self.lon = 0.0 | |
world = self._parent.get_world() | |
blueprint = world.get_blueprint_library().find('sensor.other.gnss') | |
self.sensor = world.spawn_actor(blueprint, carla.Transform(carla.Location(x=1.0, z=2.8)), | |
attach_to=self._parent) | |
# We need to pass the lambda a weak reference to | |
# self to avoid circular reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) | |
@staticmethod | |
def _on_gnss_event(weak_self, event): | |
"""GNSS method""" | |
self = weak_self() | |
if not self: | |
return | |
self.lat = event.latitude | |
self.lon = event.longitude | |
# ============================================================================== | |
# -- CameraManager ------------------------------------------------------------- | |
# ============================================================================== | |
class CameraManager(object): | |
""" Class for camera management""" | |
def __init__(self, parent_actor, hud): | |
"""Constructor method""" | |
self.sensor = None | |
self.surface = None | |
self._parent = parent_actor | |
self.hud = hud | |
self.recording = False | |
bound_x = 0.5 + self._parent.bounding_box.extent.x | |
bound_y = 0.5 + self._parent.bounding_box.extent.y | |
bound_z = 0.5 + self._parent.bounding_box.extent.z | |
attachment = carla.AttachmentType | |
self._camera_transforms = [ | |
(carla.Transform(carla.Location(x=-2.0*bound_x, y=+0.0*bound_y, z=2.0*bound_z), carla.Rotation(pitch=8.0)), attachment.SpringArmGhost), | |
(carla.Transform(carla.Location(x=+0.8*bound_x, y=+0.0*bound_y, z=1.3*bound_z)), attachment.Rigid), | |
(carla.Transform(carla.Location(x=+1.9*bound_x, y=+1.0*bound_y, z=1.2*bound_z)), attachment.SpringArmGhost), | |
(carla.Transform(carla.Location(x=-2.8*bound_x, y=+0.0*bound_y, z=4.6*bound_z), carla.Rotation(pitch=6.0)), attachment.SpringArmGhost), | |
(carla.Transform(carla.Location(x=-1.0, y=-1.0*bound_y, z=0.4*bound_z)), attachment.Rigid)] | |
self.transform_index = 1 | |
self.sensors = [ | |
['sensor.camera.rgb', cc.Raw, 'Camera RGB'], | |
['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)'], | |
['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)'], | |
['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)'], | |
['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)'], | |
['sensor.camera.semantic_segmentation', cc.CityScapesPalette, | |
'Camera Semantic Segmentation (CityScapes Palette)'], | |
['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)']] | |
world = self._parent.get_world() | |
bp_library = world.get_blueprint_library() | |
for item in self.sensors: | |
blp = bp_library.find(item[0]) | |
if item[0].startswith('sensor.camera'): | |
blp.set_attribute('image_size_x', str(hud.dim[0])) | |
blp.set_attribute('image_size_y', str(hud.dim[1])) | |
elif item[0].startswith('sensor.lidar'): | |
blp.set_attribute('range', '50') | |
item.append(blp) | |
self.index = None | |
def toggle_camera(self): | |
"""Activate a camera""" | |
self.transform_index = (self.transform_index + 1) % len(self._camera_transforms) | |
self.set_sensor(self.index, notify=False, force_respawn=True) | |
def set_sensor(self, index, notify=True, force_respawn=False): | |
"""Set a sensor""" | |
index = index % len(self.sensors) | |
needs_respawn = True if self.index is None else ( | |
force_respawn or (self.sensors[index][0] != self.sensors[self.index][0])) | |
if needs_respawn: | |
if self.sensor is not None: | |
self.sensor.destroy() | |
self.surface = None | |
self.sensor = self._parent.get_world().spawn_actor( | |
self.sensors[index][-1], | |
self._camera_transforms[self.transform_index][0], | |
attach_to=self._parent, | |
attachment_type=self._camera_transforms[self.transform_index][1]) | |
# We need to pass the lambda a weak reference to | |
# self to avoid circular reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) | |
if notify: | |
self.hud.notification(self.sensors[index][2]) | |
self.index = index | |
def next_sensor(self): | |
"""Get the next sensor""" | |
self.set_sensor(self.index + 1) | |
def toggle_recording(self): | |
"""Toggle recording on or off""" | |
self.recording = not self.recording | |
self.hud.notification('Recording %s' % ('On' if self.recording else 'Off')) | |
def render(self, display): | |
"""Render method""" | |
if self.surface is not None: | |
display.blit(self.surface, (0, 0)) | |
@staticmethod | |
def _parse_image(weak_self, image): | |
self = weak_self() | |
if not self: | |
return | |
if self.sensors[self.index][0].startswith('sensor.lidar'): | |
points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) | |
points = np.reshape(points, (int(points.shape[0] / 4), 4)) | |
lidar_data = np.array(points[:, :2]) | |
lidar_data *= min(self.hud.dim) / 100.0 | |
lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1]) | |
lidar_data = np.fabs(lidar_data) # pylint: disable=assignment-from-no-return | |
lidar_data = lidar_data.astype(np.int32) | |
lidar_data = np.reshape(lidar_data, (-1, 2)) | |
lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3) | |
lidar_img = np.zeros(lidar_img_size) | |
lidar_img[tuple(lidar_data.T)] = (255, 255, 255) | |
self.surface = pygame.surfarray.make_surface(lidar_img) | |
else: | |
image.convert(self.sensors[self.index][1]) | |
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) | |
array = np.reshape(array, (image.height, image.width, 4)) | |
array = array[:, :, :3] | |
array = array[:, :, ::-1] | |
self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) | |
if self.recording: | |
image.save_to_disk('_out/%08d' % image.frame) | |
# ============================================================================== | |
# -- Game Loop --------------------------------------------------------- | |
# ============================================================================== | |
def carla_setup(args): | |
client = carla.Client(args.host, args.port) | |
client.set_timeout(10.0) | |
traffic_manager = client.get_trafficmanager(args.tm_port) | |
world = client.get_world() | |
if args.sync: | |
settings = world.get_settings() | |
settings.synchronous_mode = True | |
settings.fixed_delta_seconds = 0.05 | |
world.apply_settings(settings) | |
traffic_manager.set_synchronous_mode(True) | |
return client, traffic_manager, world | |
def agent_setup(args, world): | |
if args.agent == "Basic": | |
return BasicAgent(world.player, 30) | |
agent.follow_speed_limits(True) | |
elif args.agent == "Constant": | |
return ConstantVelocityAgent(world.player, 30) | |
ground_loc = world.world.ground_projection(world.player.get_location(), 5) | |
if ground_loc: | |
world.player.set_location(ground_loc.location + carla.Location(z=0.01)) | |
agent.follow_speed_limits(True) | |
elif args.agent == "Behavior": | |
return BehaviorAgent(world.player, behavior=args.behavior) | |
def game_loop(args): | |
""" | |
Main loop of the simulation. It handles updating all the HUD information, | |
ticking the agent and, if needed, the world. | |
""" | |
pygame.init() | |
pygame.font.init() | |
if args.seed: | |
random.seed(args.seed) | |
client, sim_trafficman, sim_world = carla_setup(args) | |
world = trafficman = None | |
try: | |
display = pygame.display.set_mode( | |
(args.width, args.height), | |
pygame.HWSURFACE | pygame.DOUBLEBUF) | |
hud = HUD(args.width, args.height) | |
world = World(sim_world, hud, args) | |
controller = KeyboardControl(world) | |
trafficman = TrafficManager(sim_world, sim_trafficman, args) | |
datastore = DataStorage(world) | |
# Set the agent destination | |
agent = agent_setup(args, world) | |
spawn_points = world.map.get_spawn_points() | |
destination = random.choice(spawn_points).location | |
agent.set_destination(destination) | |
clock = pygame.time.Clock() | |
while True: | |
clock.tick() | |
if args.sync: | |
sim_world.tick() | |
else: | |
sim_world.wait_for_tick() | |
if controller.parse_events(): | |
return | |
world.tick(clock) | |
world.render(display) | |
pygame.display.flip() | |
datastore.tick() | |
if agent.done(): | |
if args.loop: | |
agent.set_destination(random.choice(spawn_points).location) | |
world.hud.notification("Target reached", seconds=4.0) | |
print("The target has been reached, searching for another target") | |
else: | |
print("The target has been reached, stopping the simulation") | |
break | |
control = agent.run_step() | |
control.manual_gear_shift = False | |
world.player.apply_control(control) | |
finally: | |
settings = sim_world.get_settings() | |
settings.synchronous_mode = False | |
settings.fixed_delta_seconds = None | |
sim_world.apply_settings(settings) | |
sim_trafficman.set_synchronous_mode(True) | |
if world is not None: | |
world.destroy() | |
if trafficman is not None: | |
trafficman.destroy() | |
pygame.quit() | |
def headless_loop(args): | |
""" | |
Main loop of the simulation in headless mode. | |
""" | |
sim_world = sim_trafficman = None | |
world = trafficman = None | |
try: | |
if args.seed: | |
random.seed(args.seed) | |
client, sim_trafficman, sim_world = carla_setup(args) | |
world = World(sim_world, None, args) | |
trafficman = TrafficManager(sim_world, sim_trafficman, args) | |
datastore = DataStorage(world) | |
# Set the agent destination | |
agent = agent_setup(args, world) | |
spawn_points = world.map.get_spawn_points() | |
destination = random.choice(spawn_points).location | |
agent.set_destination(destination) | |
while True: | |
if args.sync: | |
sim_world.tick() | |
else: | |
sim_world.wait_for_tick() | |
world.tick(None) | |
datastore.tick() | |
if agent.done(): | |
if args.loop: | |
agent.set_destination(random.choice(spawn_points).location) | |
print("The target has been reached, searching for another target") | |
else: | |
print("The target has been reached, stopping the simulation") | |
break | |
control = agent.run_step() | |
control.manual_gear_shift = False | |
world.player.apply_control(control) | |
finally: | |
settings = sim_world.get_settings() | |
settings.synchronous_mode = False | |
settings.fixed_delta_seconds = None | |
sim_world.apply_settings(settings) | |
sim_trafficman.set_synchronous_mode(True) | |
if world is not None: | |
world.destroy() | |
if trafficman is not None: | |
trafficman.destroy() | |
# ============================================================================== | |
# -- main() -------------------------------------------------------------- | |
# ============================================================================== | |
def main(): | |
"""Main method""" | |
argparser = argparse.ArgumentParser(description='CARLA Automatic Control Client') | |
common_group = argparser.add_argument_group('common', "Common settings related to Carla") | |
common_group.add_argument( | |
'-v', '--verbose', | |
action='store_true', | |
dest='debug', | |
help='Print debug information') | |
common_group.add_argument( | |
'--host', | |
metavar='H', | |
default='127.0.0.1', | |
help='IP of the host server (default: 127.0.0.1)') | |
common_group.add_argument( | |
'-p', '--port', | |
metavar='P', | |
default=2000, | |
type=int, | |
help='TCP port to listen to (default: 2000)') | |
common_group.add_argument( | |
'-m', '--tm-port', | |
metavar='P', | |
default=6000, | |
type=int, | |
help='Port to communicate with TM (default: 6000)') | |
common_group.add_argument( | |
'--res', | |
metavar='WIDTHxHEIGHT', | |
default='1280x720', | |
help='Window resolution (default: 1280x720)') | |
common_group.add_argument( | |
'--sync', | |
action='store_true', | |
help='Synchronous mode execution') | |
common_group.add_argument( | |
'-x', '--headless', | |
help='Run the collection script without Pygame window', | |
action="store_true" | |
) | |
common_group.add_argument( | |
'-s', '--seed', | |
help='Set seed for repeating executions (default: None)', | |
default=None, | |
type=int) | |
agent_group = argparser.add_argument_group('agent', "Agent settings") | |
agent_group.add_argument( | |
'--filter', | |
metavar='PATTERN', | |
default='vehicle.*', | |
help='Actor filter (default: "vehicle.*")') | |
agent_group.add_argument( | |
'--generation', | |
metavar='G', | |
default='2', | |
help='restrict to certain actor generation (values: "1","2","All" - default: "2")') | |
agent_group.add_argument( | |
'-l', '--loop', | |
action='store_true', | |
help='Sets a new random destination upon reaching the previous one (default: False)') | |
agent_group.add_argument( | |
"-a", "--agent", type=str, | |
choices=["Behavior", "Basic", "Constant"], | |
help="select which agent to run", | |
default="Behavior") | |
agent_group.add_argument( | |
'-b', '--behavior', type=str, | |
choices=["cautious", "normal", "aggressive"], | |
help='Choose one of the possible agent behaviors (default: normal) ', | |
default='normal') | |
traffic_group = argparser.add_argument_group('traffic', "Traffic manager settings") | |
traffic_group.add_argument( | |
'--seedw', | |
metavar='S', | |
default=0, | |
type=int, | |
help='Set the seed for pedestrians module') | |
traffic_group.add_argument( | |
'--car-lights-on', | |
action='store_true', | |
default=False, | |
help='Enable automatic car light management') | |
traffic_group.add_argument( | |
'--respawn', | |
action='store_true', | |
default=False, | |
help='Automatically respawn dormant vehicles (only in large maps)') | |
traffic_group.add_argument( | |
'-n', '--number-of-vehicles', | |
metavar='N', | |
default=30, | |
type=int, | |
help='Number of vehicles (default: 30)') | |
traffic_group.add_argument( | |
'-w', '--number-of-walkers', | |
metavar='W', | |
default=10, | |
type=int, | |
help='Number of walkers (default: 10)') | |
traffic_group.add_argument( | |
'--safe', | |
action='store_true', | |
help='Avoid spawning vehicles prone to accidents') | |
traffic_group.add_argument( | |
'--filterv', | |
metavar='PATTERN', | |
default='vehicle.*', | |
help='Filter vehicle model (default: "vehicle.*")') | |
traffic_group.add_argument( | |
'--generationv', | |
metavar='G', | |
default='All', | |
help='restrict to certain vehicle generation (values: "1","2","All" - default: "All")') | |
traffic_group.add_argument( | |
'--filterw', | |
metavar='PATTERN', | |
default='walker.pedestrian.*', | |
help='Filter pedestrian type (default: "walker.pedestrian.*")') | |
traffic_group.add_argument( | |
'--generationw', | |
metavar='G', | |
default='2', | |
help='restrict to certain pedestrian generation (values: "1","2","All" - default: "2")') | |
traffic_group.add_argument( | |
'--hybrid', | |
action='store_true', | |
help='Activate hybrid mode for Traffic Manager') | |
args = argparser.parse_args() | |
args.width, args.height = [int(x) for x in args.res.split('x')] | |
log_level = logging.DEBUG if args.debug else logging.INFO | |
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) | |
logging.info('listening to server %s:%s', args.host, args.port) | |
print(__doc__) | |
try: | |
if args.headless: | |
headless_loop(args) | |
else: | |
game_loop(args) | |
except KeyboardInterrupt: | |
print('\nCancelled by user. Bye!') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import curses | |
import os | |
import argparse | |
import carla | |
ACTION_MAP = { | |
"w": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_forward_vector() * ego.get_physics_control().mass * 10), | |
"s": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_forward_vector() * ego.get_physics_control().mass * -10), | |
"d": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_right_vector() * ego.get_physics_control().mass * 10), | |
"a": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_right_vector() * ego.get_physics_control().mass * -10), | |
"q": lambda world, ego: carla.command.ApplyAngularImpulse(ego.id, ego.get_transform().get_up_vector() * ego.get_physics_control().mass * -2e6), | |
"e": lambda world, ego: carla.command.ApplyAngularImpulse(ego.id, ego.get_transform().get_up_vector() * ego.get_physics_control().mass * 2e6), | |
"1": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Position), | |
"2": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.LowBeam), | |
"3": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.HighBeam), | |
"4": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Brake), | |
"5": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.RightBlinker), | |
"6": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.LeftBlinker), | |
"7": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Reverse), | |
"8": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Fog), | |
"9": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Interior), | |
"0": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ (carla.VehicleLightState.Special1 | carla.VehicleLightState.Special2)), | |
"-": lambda world, ego: carla.command.SetVehicleLightState(ego.id, carla.VehicleLightState.NONE), | |
"+": lambda world, ego: carla.command.SetVehicleLightState(ego.id, carla.VehicleLightState.All), | |
# TODO: support open door, hand brake | |
} | |
def window_main(win, client, world, player): | |
win.nodelay(True) | |
win.clear() | |
inst = """Instructions: | |
[awds]: Apply force pulse in certain direction relative to the hero object. | |
[qe]: Apply torque pulse in z-axis of the hero object. | |
[0-9|-+]: Switch vehicle light state""" | |
inst_lines = inst.count('\n') + 1 | |
win.addstr(0, 0, inst) | |
win.hline(inst_lines, 0, '-', 30) | |
win.addstr(inst_lines + 1, 0, "Press key to trigger actions:") | |
pad_lines = inst_lines + 2 | |
pending_keys = [] | |
last_snapshot = None | |
def print_pending(): | |
for i, key in enumerate(pending_keys): | |
win.addstr(pad_lines + i, 0, str(key)) | |
win.clrtobot() | |
while True: | |
# process key events | |
try: | |
key = win.getkey() | |
except Exception as e: | |
key = None | |
if key: | |
pending_keys.append(key) | |
# apply actions | |
snapshot = world.wait_for_tick() # TODO: change to on_tick with queue | |
if last_snapshot and last_snapshot.frame < snapshot.frame: | |
actions = [ACTION_MAP[key](world, player) for key in pending_keys] | |
client.apply_batch(actions) | |
pending_keys.clear() | |
last_snapshot = snapshot | |
# print status | |
print_pending() | |
# TODO: check ego status | |
def main(): | |
argparser = argparse.ArgumentParser( | |
description='CARLA Manual Control Client') | |
argparser.add_argument( | |
'--host', | |
metavar='H', | |
default='127.0.0.1', | |
help='IP of the host server (default: 127.0.0.1)') | |
argparser.add_argument( | |
'-p', '--port', | |
metavar='P', | |
default=2000, | |
type=int, | |
help='TCP port to listen to (default: 2000)') | |
argparser.add_argument( | |
'--rolename', | |
metavar='NAME', | |
default='hero', | |
help='actor role name (default: "hero")') | |
args = argparser.parse_args() | |
client = carla.Client(args.host, args.port) | |
client.set_timeout(60.0) | |
world = client.get_world() | |
for actor in world.get_actors(): | |
if actor.attributes.get('role_name') == args.rolename: | |
player = actor | |
break | |
else: | |
# player = None | |
raise RuntimeError("can't find an actor with the given role name in the simulation") | |
curses.wrapper(window_main, client, world, player) | |
if __name__ == "__main__": | |
main() | |
# curses.wrapper(window_main, None, None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, argparse | |
import carla | |
import curses | |
import math | |
def vec_to_rot(vec: carla.Vector3D): | |
pitch = math.atan2(vec.z, math.hypot(vec.x, vec.y)) | |
yaw = math.atan2(vec.y, vec.x) | |
roll = 0 | |
return carla.Rotation(pitch, yaw, roll) | |
def window_main(win, client, world, args): | |
world_map = world.get_map() | |
spectator = world.get_spectator() | |
win.nodelay(True) | |
win.clear() | |
inst = """Instructions: | |
[a]: Save the current casted point (the yellow point) on the ground | |
[s]: Save the current nearest waypoint (the cyan point) on the ground | |
[z]: Add the current casted point to the polyline | |
[x]: Add the current nearest waypoint to the polyline | |
[c]: Save and clear the current polyline""" | |
inst_lines = inst.count('\n') + 1 | |
win.addstr(0, 0, inst) | |
win.hline(inst_lines, 0, '-', 30) | |
win.addstr(inst_lines + 1, 0, "Press key to trigger actions:") | |
saved_points = [] | |
saved_polylines = [] | |
current_polyline = [] | |
def get_marker_size(distance, max_size = 1.0): | |
return max_size / math.exp(distance / args.ray_length) | |
def format_location(location: carla.Location): | |
return f"({location.x:.3f}, {location.y:.3f}, {location.z:.3f})" | |
def print_total(): | |
return f" ({len(saved_points)} points & {len(saved_polylines)} lines saved in total)" | |
def print_poly_total(): | |
return f" ({len(current_polyline)} points saved in current polyline)" | |
while True: | |
# find casted points from spectator | |
pose = spectator.get_transform() | |
direction = pose.get_forward_vector() | |
casted = world.cast_ray(pose.location, pose.location + direction * args.ray_length) | |
# draw polyline | |
if len(current_polyline) > 1: | |
for a, b in zip(current_polyline[:-1], current_polyline[1:]): | |
world.debug.draw_line(a, b, color=carla.Color(0, 200, 0), life_time=0.1) | |
# draw the casted points | |
if len(casted) > 0: | |
first = casted[0] | |
cast_dists = [(p.location - pose.location).length() for p in casted] | |
hit_text = ["%dhits" % len(casted)] + ["%.2f" % d for d in cast_dists] | |
text = "{} @ {} | {}".format(first.label, format_location(first.location), ", ".join(hit_text)) | |
if len(current_polyline) > 0: | |
# print the direction | |
# TODO: change to use a key to activate the rotation display from last saved point (and draw line) | |
direction = first.location - current_polyline[-1] | |
text += f"\n{vec_to_rot(direction)}" | |
world.debug.draw_point(first.location, color=carla.Color(200, 200, 0), size=get_marker_size(cast_dists[0]), life_time=0.1) | |
world.debug.draw_string(first.location, text, color=carla.Color(255, 255, 255), draw_shadow=True) | |
# draw nearby waypoints | |
waypoint = world_map.get_waypoint(first.location, lane_type=carla.LaneType.Driving) | |
if waypoint is not None: | |
world.debug.draw_point(waypoint.transform.location, color=carla.Color(0, 200, 200), size=get_marker_size(cast_dists[0]), life_time=0.1) | |
world.debug.draw_string( | |
waypoint.transform.location, | |
"{} @ {} | ({}, {}, {:.3f})".format(waypoint.lane_type, format_location(waypoint.transform.location), waypoint.road_id, waypoint.lane_id, waypoint.s), | |
color=carla.Color(255, 255, 255), draw_shadow=True | |
) | |
# draw polyline to the current point | |
if len(current_polyline) > 0: | |
world.debug.draw_line(current_polyline[-1], first.location, color=carla.Color(0, 200, 0), life_time=0.1) | |
else: | |
first = waypoint = None | |
# process key events | |
try: | |
key = win.getkey() | |
except Exception as e: | |
key = None | |
if key == 'a' or key == 'z': | |
if first is not None: | |
loc = first.location | |
if key == 'a': | |
saved_points.append(loc) | |
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)}." + print_total()) | |
else: # key == 'z' | |
current_polyline.append(loc) | |
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)} to polyline." + print_poly_total()) | |
else: | |
win.addstr(inst_lines + 2, 0, f"No point selected") | |
elif key == 's' or key == 'x': | |
if waypoint is not None: | |
loc = waypoint.transform.location | |
if key == 's': | |
saved_points.append(loc) | |
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)}." + print_total()) | |
else: # key == 'x' | |
current_polyline.append(loc) | |
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)} to polyline." + print_poly_total()) | |
else: | |
win.addstr(inst_lines + 2, 0, f"No waypoint selected") | |
elif key == 'c': | |
saved_polylines.append(current_polyline) | |
win.addstr(inst_lines + 2, 0, f"Saved polyline with {len(current_polyline)} points") | |
current_polyline = [] | |
else: | |
pass | |
win.clrtobot() | |
world.wait_for_tick() | |
def main(): | |
argparser = argparse.ArgumentParser( | |
description='Viewpoint visualization') | |
argparser.add_argument( | |
'--host', | |
metavar='H', | |
default='127.0.0.1', | |
help='IP of the host server (default: 127.0.0.1)') | |
argparser.add_argument( | |
'-p', '--port', | |
metavar='P', | |
default=2000, | |
type=int, | |
help='TCP port to listen to (default: 2000)') | |
argparser.add_argument( | |
'-l', '--ray-length', | |
metavar='L', | |
default=10.0, | |
type=float, | |
help='Length of ray-casting' | |
) | |
args = argparser.parse_args() | |
client = carla.Client(args.host, args.port) | |
client.set_timeout(60.0) | |
world = client.get_world() | |
curses.wrapper(window_main, client, world, args) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment