Source code for agents.leaderboard_agent

  1"""
  2Leaderboard_ 2.0 compatible version of the Lunatic Agent
  3
  4Attention:
  5    Command line overrides are currently not supported for this agent,
  6    therefore this module allows to define some global constants to
  7    that can adjust settings the settings if they are not set to None (default).
  8
  9    These global settings are only used if :py:meth:`LunaticChallenger.setup`
 10    is called with a string pointing to a configuration file. Passing a
 11    :py:class:`.LaunchConfig` directly will skip the Hydra_ setup and the global
 12    values will not be used.
 13"""
 14
 15from ast import literal_eval
 16import contextlib
 17import operator
 18import os
 19from pathlib import Path
 20from typing import TYPE_CHECKING, Any, Dict, Sequence, Tuple, Union, cast
 21
 22import carla
 23import pygame
 24from hydra import compose, initialize_config_dir
 25from hydra.core.utils import configure_log
 26from hydra.core.hydra_config import HydraConfig
 27from omegaconf import OmegaConf
 28from typing_extensions import TypedDict
 29
 30from agents.tools.debug_drawing import draw_route
 31from classes import MockDummy
 32from classes.exceptions import UserInterruption
 33from launch_tools import CarlaDataProvider, GameTime, singledispatchmethod
 34
 35try:
 36    from leaderboard.autoagents.autonomous_agent import AutonomousAgent, Track  # pyright: ignore[reportMissingImports]
 37    from leaderboard.utils.route_manipulation import downsample_route  # pyright: ignore[reportMissingImports]
 38except ModuleNotFoundError:
 39    # Leaderboard is not a submodule, cannot use it on readthedocs
 40    if "READTHEDOCS" in os.environ and not TYPE_CHECKING:
 41
 42        class AutonomousAgent:
 43            pass  # noqa
 44
 45    else:
 46        raise  # noqa: E701
 47
 48from agents.lunatic_agent import LunaticAgent
 49from agents.tools.config_creation import LaunchConfig, LunaticAgentSettings
 50from agents.tools.logs import logger
 51from classes.constants import READTHEDOCS, Phase
 52from classes.ui.keyboard_controls import RSSKeyboardControl
 53from classes.worldmodel import AD_RSS_AVAILABLE, GameFramework, WorldModel
 54
 55if TYPE_CHECKING:
 56    # from scenario_runner.srunner.autoagents.sensor_interface import SensorInterface
 57    from leaderboard.envs.sensor_interface import SensorInterface
 58
 59    from agents.navigation.local_planner import LocalPlanner
 60    from classes.constants import RoadOption
 61
 62import logging
 63
 64
 65def get_entry_point():
 66    """Must return the name of the class to be used"""
 67    return "LunaticChallenger"
 68
 69
 70# --- DEBUG OVERWRITES ---
 71# as the leaderboard agent cannot accept command line overrides these can be set here.
 72DEBUG = False
 73
 74WORLD_MODEL_DESTROY_SENSORS = True
 75
 76ENABLE_RSS = AD_RSS_AVAILABLE and literal_eval(os.environ.get("ENABLE_RSS", "None"))
 77"""If not :code:`None`, overwrites `.LunaticAgentSettings.rss.enabled`:py:attr:"""
 78
 79ENABLE_DETECTION_MATRIX = literal_eval(os.environ.get("ENABLE_DETECTION_MATRIX", "None"))
 80"""If not :code:`None`, overwrites `.LunaticAgentSettings.detection_matrix.enabled`:py:attr:"""
 81
 82DATA_MATRIX_ASYNC = literal_eval(os.environ.get("DATA_MATRIX_ASYNC", "None"))
 83"""Run the DetectionMatrix update in a separate thread; overwrites `.LunaticAgentSettings.detection_matrix.sync`:py:attr:."""
 84
 85DATA_MATRIX_INTERVAL = literal_eval(os.environ.get("DATA_MATRIX_INTERVAL", "None"))
 86"""
 87When running synchronously how many ticks should be between two updates. Overwrites `.LunaticAgentSettings.detection_matrix.sync_interval`:py:attr:
 88"""
 89
 90CAMERA_SPECTATOR = literal_eval(os.environ.get("CAMERA_SPECTATOR", "None"))
 91"""Whether to use the camera spectator; overwrites `camera.spectator<.CameraConfig.spectator>`:py:attr:"""
 92
 93USE_OPEN_DRIVE_DATA = False
 94
 95DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES = 5
 96"""
 97The smaller the the value the more exact will the agent stick to the original route,
 98BUT ONLY IF the route is provided as a fine-grained route.
 99
100NOTE: We should NOT rely on the route to be available in a fine grained manner -> Should work with larger values
101
102Larger values will make the agent cut corners and drive more straight lines.
103Needs extra tools to stick to the road.
104"""
105
106args: LaunchConfig
107"""Global access to the launch config; set in :py:meth:`LunaticChallenger.setup`"""
108
109
[docs] 110class LunaticChallenger(AutonomousAgent, LunaticAgent): 111 """ 112 Variant of the :py:class:`.LunaticAgent` that is compatible with the 113 `Leaderboard 2.0 <https://leaderboard.carla.org/>`_ interface. 114 115 Attention: 116 If the :py:class:`LunaticChallenger` is used without the Leaderboard 2.0 framework 117 the :py:meth:`__call__` method should be used instead of :py:meth:`run_step` 118 to acquire the next control. 119 """ 120 121 sensor_interface: "SensorInterface" #: :meta private: 122 123 _global_plan: "list[tuple[_GPSDataDict, RoadOption]]" = None # type: ignore[assignment] 124 _global_plan_world_coord: "list[tuple[carla.Transform, RoadOption]]" = None # type: ignore[assignment] 125 _global_plan_waypoints: "list[tuple[carla.Waypoint, RoadOption]]" = None # type: ignore[assignment] 126 127 def __init__(self, carla_host, carla_port, debug=False): 128 """ 129 Initializes the LunaticChallenger. This does not yet load the config or calls :py:meth:`LunaticAgent.__init__ <agents.lunatic_agent.LunaticAgent.__init__>`. 130 131 Further initialization is done in :py:meth:`LunaticChallenger.setup`. 132 """ 133 print("Initializing LunaticChallenger") 134 # Correct types are set at setup 135 self.world_model: WorldModel = None # type: ignore[assignment] 136 self.game_framework: GameFramework = None # type: ignore[assignment] 137 self._destroyed = False 138 # AutonomousAgent init 139 super().__init__(carla_host, carla_port, debug) 140 self.track = Track.MAP 141 self._opendrive_data = None 142 self._local_planner: "LocalPlanner" = None # type: ignore[assignment] 143
[docs] 144 def setup(self, path_to_conf_file: Union[str, LaunchConfig]): 145 """ 146 Initializes the underlying :py:class:`.LunaticAgent` as well as instances of :py:class:`.GameFramework` and :py:class:`.WorldModel`. 147 148 To some extends initializes the Hydra_ framework and load the configuration. 149 150 Parameters: 151 path_to_conf_file: 152 Can either be a string pointing to a configuration file to load a 153 :py:class:`.LaunchConfig` or a :py:class:`.LaunchConfig` to be used directly. 154 155 Note: 156 If a :py:class:`.LaunchConfig` is passed directly the Hydra_ setup will be skipped. 157 """ 158 self._destroyed = False 159 self.track = Track.MAP 160 if isinstance(path_to_conf_file, str): 161 print("Setup with conf file", path_to_conf_file) 162 logger.info("Setup with conf file %s", path_to_conf_file) 163 config_dir, config_name = os.path.split(path_to_conf_file) 164 # TODO: Maybe move to init so its available during set_global_plan 165 global args # Unused 166 overrides = ["agent=leaderboard"] 167 if not GameFramework.hydra_initialized(): 168 initialize_config_dir( 169 version_base=None, config_dir=os.path.abspath(config_dir), job_name="LeaderboardAgent" 170 ) 171 if ENABLE_DETECTION_MATRIX is not None: 172 overrides.append("agent.detection_matrix.enabled=" + str(ENABLE_DETECTION_MATRIX).lower()) 173 overrides.append("agent.detection_matrix.sync=" + str(DATA_MATRIX_ASYNC).lower()) 174 if ENABLE_RSS is not None: 175 overrides.append("agent.rss.enabled=" + str(ENABLE_RSS).lower()) 176 if CAMERA_SPECTATOR is not None: 177 overrides.append("camera.spectator=" + str(CAMERA_SPECTATOR).lower()) 178 args = cast( 179 "LaunchConfig", 180 compose( 181 config_name=config_name, return_hydra_config=True, overrides=overrides 182 ), # uses conf/agent/leaderboard 183 ) 184 args.debug = DEBUG 185 # Let scenario manager decide 186 if args.map: 187 logger.warning( 188 f"Map should be set by scenario manager and be None in the config file found map is {args.map}." 189 ) 190 if args.handle_ticks: 191 logger.warning("When using the leaderboard agent, `handle_ticks` should be False.") 192 args.handle_ticks = False 193 if args.sync is not None: 194 logger.warning("When using the leaderboard agent, `sync` should be None.") 195 args.sync = None 196 197 # Setup Hydra 198 if OmegaConf.is_missing(args.hydra.runtime, "output_dir"): 199 args.hydra.runtime.output_dir = args.hydra.run.dir 200 HydraConfig.instance().set_config(args) # type: ignore[arg-type] 201 Path(args.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True) 202 # Assure that our logger works 203 configure_log(args.hydra.job_logging, logger.name) # type: ignore[arg-type] 204 205 if DATA_MATRIX_ASYNC is not None: 206 args.agent.detection_matrix.sync = not DATA_MATRIX_ASYNC 207 if DATA_MATRIX_INTERVAL is not None: 208 args.agent.detection_matrix.sync_interval = DATA_MATRIX_INTERVAL 209 logger.info(OmegaConf.to_yaml(args)) 210 else: 211 args = cast( 212 "LaunchConfig", 213 compose( 214 config_name=config_name, return_hydra_config=True, overrides=overrides 215 ), # uses conf/agent/leaderboard 216 ) 217 logger.setLevel(logging.DEBUG) 218 self.args = args 219 220 config = LunaticAgentSettings.create(self.args.agent, assure_copy=True, as_dictconfig=True) 221 else: 222 self.args = path_to_conf_file 223 config = self.args.agent 224 if OmegaConf.is_missing(config.planner, "dt"): 225 config.planner.dt = 1 / 20 # TODO: maybe get from somewhere else 226 227 self.game_framework = GameFramework(self.args, config) 228 print("Game framework setup") 229 # TODO: How to make args optional 230 self.world_model = WorldModel(config, args=self.args) 231 self.game_framework.world_model = self.world_model 232 print("World Model setup") 233 if self.args.pygame: 234 self.controller = self.game_framework.make_controller( 235 self.world_model, RSSKeyboardControl, start_in_autopilot=False 236 ) # Note: stores weakref to controller 237 else: 238 self.controller = MockDummy.create_dummy(RSSKeyboardControl) 239 print("Initializing agent") 240 LunaticAgent.__init__(self=self, settings=config, world_model=self.world_model) 241 # super(AutonomousAgent, self).__init__(self, config, self.world_model) 242 print("LunaticAgent initialized") 243 244 from agents.rules.lane_changes.random_changes import RandomLaneChangeRule # noqa: PLC0415 245 246 for rules in self.rules.values(): 247 for rule in rules: 248 if isinstance(rule, RandomLaneChangeRule): 249 rule.enabled = False 250 251 # Set plan 252 if self._global_plan_waypoints: 253 self._local_planner_set_plan(self._global_plan_waypoints) 254 255 self.game_framework.agent = self # TODO: Remove this circular reference 256 self.agent_engaged = False 257 # Print controller docs 258 with contextlib.suppress(AttributeError): 259 if not isinstance(self.controller, MockDummy): 260 print(self.controller.get_docstring())
261
[docs] 262 def sensors(self) -> "list[dict]": 263 """ 264 Define the sensor suite required by the agent 265 266 Returns: 267 A list containing the required sensors in the following format 268 269 .. code-block:: python 270 271 [ 272 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': -0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 273 'width': 300, 'height': 200, 'fov': 100, 'id': 'Left'}, 274 275 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': 0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 276 'width': 300, 'height': 200, 'fov': 100, 'id': 'Right'}, 277 278 {'type': 'sensor.lidar.ray_cast', 'x': 0.7, 'y': 0.0, 'z': 1.60, 'yaw': 0.0, 'pitch': 0.0, 'roll': 0.0, 279 'id': 'LIDAR'} 280 ] 281 282 Note: 283 The LunaticChallenger does not use any sensors; the usage of 'sensor.opendrive_map' is experimental, however 284 there is yet no parsing done for the data. 285 """ 286 sensors: list = super().sensors() # This should be empty 287 288 # temp; remove 289 try: 290 i = [x["type"] for x in args.leaderboard.sensors].index("sensor.opendrive_map") 291 except (ValueError, AttributeError): 292 pass 293 else: 294 args.leaderboard.sensors[i].use = USE_OPEN_DRIVE_DATA 295 296 # add sensors if they have the use flag in the config 297 with contextlib.suppress(Exception): 298 sensors.extend(filter(operator.itemgetter("use"), args.leaderboard.sensors)) 299 logger.info("Using sensors: %s", sensors) 300 return sensors
301 302 @staticmethod 303 def _print_input_data(input_data: Dict[str, Tuple[int, Any]]): 304 """ 305 Debug method to view the input data 306 """ 307 if not input_data: 308 return None 309 print("=====================>") 310 for key, val in input_data.items(): 311 if hasattr(val[1], "shape"): 312 shape = val[1].shape 313 print(f"[{key} -- {val[0]:06d}] with shape {shape}") 314 else: 315 print(f"[{key} -- {val[0]:06d}] ") 316 print("<=====================") 317 return True 318 319 # This allows BlockingRules to pick up the coorect function
[docs] 320 @singledispatchmethod 321 def run_step(self, debug: bool = False, second_pass=False) -> carla.VehicleControl: # noqa: ARG002 322 """ 323 Attention: 324 Use :py:meth:`__call__` instead of this method! 325 """ 326 # TODO: Possibly singledispatch to __call__ instead 327 return super(AutonomousAgent, self).run_step(debug=self.args.debug, second_pass=second_pass)
328 329 @run_step.register(dict) 330 def _(self, input_data: Dict[str, Tuple[int, Any]], timestamp: float = -1) -> carla.VehicleControl: # noqa: ARG002 331 """Function that is called by leaderboard framework""" 332 try: 333 if self._print_input_data(input_data) and "OpenDRIVE" in input_data: 334 frame, data_value = input_data["OpenDRIVE"] 335 data: str = data_value["opendrive"] 336 if self._opendrive_data != data: 337 if self._opendrive_data is not None: 338 # breakpoint() 339 pass 340 self._opendrive_data = data 341 print(frame, data[:5000]) 342 Path("opendrive.xml").write_text(data) 343 else: 344 print("OpenDRIVE data unchanged") 345 self.agent_engaged = True # remove this, if not used 346 347 with self.game_framework: 348 control = self.run_step(self.args.debug) # Call Lunatic Agent run_step 349 # Handle render updates 350 351 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN, prior_results=control) 352 if self.controller.parse_events(self.get_control()): 353 print("Exiting by user input.") 354 raise UserInterruption("Exiting by user input.") # noqa: TRY301 355 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.END, prior_results=None) 356 357 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control) 358 final_controls: carla.VehicleControl = self.get_control() # type: ignore[assignment] 359 360 # TODO: Update HUD controls info 361 except Exception as e: 362 if not isinstance(e, UserInterruption): 363 logger.exception("Error in LunaticChallenger.run_step:") 364 self.destroy() 365 raise 366 else: 367 return final_controls 368 369 # NOTE: to update the doc use 370 # run_step.func.__doc__ += "\n" + LunaticAgent.run_step.__doc__ 371 372 @staticmethod 373 def _transform_to_waypoint( 374 transform: "carla.Transform", project_to_road=True, lane_type=carla.LaneType.Driving 375 ) -> "carla.Waypoint": 376 # maybe move to misc / tools 377 return CarlaDataProvider.get_map().get_waypoint( 378 transform.location, project_to_road=project_to_road, lane_type=lane_type 379 ) # pyright: ignore[reportReturnType] 380 381 def _local_planner_set_plan(self, plan): 382 super(AutonomousAgent, self).set_global_plan(plan, stop_waypoint_creation=True, clean_queue=True) 383 if self.game_framework.launch_config.debug: 384 draw_route( 385 CarlaDataProvider.get_world(), plan, vertical_shift=0.5, size=0.15, downsample=1, life_time=1000.0 386 ) 387
[docs] 388 def set_global_plan( 389 self, 390 global_plan_gps: Sequence["tuple[_GPSDataDict, RoadOption]"], 391 global_plan_world_coord: Sequence["tuple[carla.Transform, RoadOption]"], 392 ): 393 """ 394 Set the plan (route) for the agent 395 """ 396 # old: 397 # super().set_global_plan(global_plan_gps, global_plan_world_coord) 398 # new: 399 # print("==============Road updated============") 400 # print("Plan GPS", global_plan_gps[:10]) 401 # print("Plan World Coord", global_plan_world_coord[:10]) 402 403 ds_ids: "list[int]" = downsample_route( 404 global_plan_world_coord, DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES 405 ) # Downsample to less distance. TODO: should increase this 406 # print("Downsampled ids", ds_ids) 407 408 # Reduce the global plan to the downsampled ids 409 self._global_plan_world_coord = [(global_plan_world_coord[x][0], global_plan_world_coord[x][1]) for x in ds_ids] 410 assert self._global_plan_world_coord == [global_plan_world_coord[x] for x in ds_ids] 411 self._global_plan = [global_plan_gps[x] for x in ds_ids] 412 self._global_plan_waypoints = [ 413 (self._transform_to_waypoint(transform), road_option) 414 for transform, road_option in self._global_plan_world_coord 415 ] 416 if self._local_planner is not None: 417 # TODO: maybe waypoints is not necessary as we extract locations 418 self._local_planner_set_plan(self._global_plan_waypoints)
419
[docs] 420 def __call__(self) -> carla.VehicleControl: 421 """ 422 Executes the next step and returns the control for the vehicle. 423 424 Attention: 425 Use this function instead of :py:meth:`run_step`! 426 """ 427 input_data = self.sensor_interface.get_data(GameTime.get_frame()) 428 429 timestamp = GameTime.get_time() 430 431 if args.leaderboard.print_time_info: 432 if not self.wallclock_t0: 433 self.wallclock_t0 = GameTime.get_wallclocktime() 434 wallclock = GameTime.get_wallclocktime() 435 wallclock_diff = (wallclock - self.wallclock_t0).total_seconds() 436 sim_ratio = 0 if wallclock_diff == 0 else timestamp / wallclock_diff 437 438 print( 439 "=== [Agent] -- Wallclock = {} -- System time = {} -- Game time = {} -- Ratio = {}x".format( 440 str(wallclock)[:-3], 441 format(wallclock_diff, ".3f"), 442 format(timestamp, ".3f"), 443 format(sim_ratio, ".3f"), 444 ) 445 ) 446 447 control = self.run_step(input_data, timestamp) 448 control.manual_gear_shift = False 449 450 return control
451
[docs] 452 def destroy(self): 453 self._destroyed = True 454 print("Destroying Lunatic Challenger") 455 if self._detection_matrix is not None: 456 self._detection_matrix.stop() 457 self._detection_matrix = None 458 super().destroy() 459 if self.world_model: 460 if not WORLD_MODEL_DESTROY_SENSORS: 461 self.world_model.actors.clear() 462 self.world_model.destroy() 463 self.world_model = None # type: ignore[assignment] 464 if self.game_framework: 465 self.game_framework.agent = None 466 self.game_framework = None # type: ignore[assignment] 467 pygame.quit() 468 print("Destroyed", self)
469 470 def __del__(self): 471 if not self._destroyed: 472 self.destroy()
473 474 475# --- Type hints ---
[docs] 476class _GPSDataDict(TypedDict if not READTHEDOCS or TYPE_CHECKING else object): 477 """ 478 Bases: :py:class:`typing.TypedDict` 479 480 Type hint for the global plan in GPS coordinates, 481 used in :py:meth:`.LunaticChallenger.set_global_plan`. 482 483 :meta public: 484 """ 485 486 lat: float 487 lon: float 488 z: float