Source code for agents.leaderboard_agent

  1"""
  2Leaderboard_ 2.0 compatible version of the Lunatic Agent
  3
  4Attention:
  5    Command line overrides are currently not supported for this agent,
  6    therefore this module allows to define some global constants to
  7    that can adjust settings the settings if they are not set to None (default).
  8    
  9    These global settings are only used if :py:meth:`LunaticChallenger.setup`
 10    is called with a string pointing to a configuration file. Passing a
 11    :py:class:`.LaunchConfig` directly will skip the Hydra_ setup and the global
 12    values will not be used.
 13"""
 14from ast import literal_eval
 15import contextlib
 16import operator
 17import os
 18from pathlib import Path
 19from typing import TYPE_CHECKING, Any, Dict, Sequence, Tuple, Union, cast
 20
 21import carla
 22import pygame
 23from hydra import compose, initialize_config_dir
 24from hydra.core.utils import configure_log
 25from hydra.core.hydra_config import HydraConfig
 26from omegaconf import OmegaConf
 27from typing_extensions import TypedDict
 28
 29from agents.tools.debug_drawing import draw_route
 30from classes.exceptions import UserInterruption
 31from launch_tools import CarlaDataProvider, GameTime, singledispatchmethod
 32
 33try:
 34    from leaderboard.autoagents.autonomous_agent import AutonomousAgent, Track  # pyright: ignore[reportMissingImports]
 35    from leaderboard.utils.route_manipulation import downsample_route  # pyright: ignore[reportMissingImports]
 36except ModuleNotFoundError:
 37    # Leaderboard is not a submodule, cannot use it on readthedocs
 38    if "READTHEDOCS" in os.environ and not TYPE_CHECKING:
 39        class AutonomousAgent: pass # noqa
 40    else: raise  # noqa: E701
 41
 42from agents.lunatic_agent import LunaticAgent
 43from agents.tools.config_creation import LaunchConfig, LunaticAgentSettings
 44from agents.tools.logs import logger
 45from classes.constants import READTHEDOCS, Phase
 46from classes.keyboard_controls import RSSKeyboardControl
 47from classes.worldmodel import AD_RSS_AVAILABLE, GameFramework, WorldModel
 48
 49if TYPE_CHECKING:
 50    #from scenario_runner.srunner.autoagents.sensor_interface import SensorInterface
 51    from leaderboard.envs.sensor_interface import SensorInterface
 52
 53    from agents.navigation.local_planner import LocalPlanner
 54    from classes.constants import RoadOption
 55
 56import logging
 57
 58
 59def get_entry_point():
 60    """Must return the name of the class to be used"""
 61    return "LunaticChallenger"
 62
 63
 64# --- DEBUG OVERWRITES ---
 65# as the leaderboard agent cannot accept command line overrides these can be set here.
 66DEBUG = False
 67
 68WORLD_MODEL_DESTROY_SENSORS = True
 69
 70ENABLE_RSS = AD_RSS_AVAILABLE and literal_eval(os.environ.get("ENABLE_RSS", "None"))
 71"""If not :code:`None`, overwrites `.LunaticAgentSettings.rss.enabled`:py:attr:"""
 72
 73ENABLE_DETECTION_MATRIX = literal_eval(os.environ.get("ENABLE_DETECTION_MATRIX", "None"))
 74"""If not :code:`None`, overwrites `.LunaticAgentSettings.detection_matrix.enabled`:py:attr:"""
 75
 76DATA_MATRIX_ASYNC = literal_eval(os.environ.get("DATA_MATRIX_ASYNC", "None"))
 77"""Run the DetectionMatrix update in a separate thread; overwrites `.LunaticAgentSettings.detection_matrix.sync`:py:attr:."""
 78
 79DATA_MATRIX_INTERVAL = literal_eval(os.environ.get("DATA_MATRIX_INTERVAL", "None"))
 80"""
 81When running synchronously how many ticks should be between two updates. Overwrites `.LunaticAgentSettings.detection_matrix.sync_interval`:py:attr:
 82"""
 83
 84CAMERA_SPECTATOR = literal_eval(os.environ.get("CAMERA_SPECTATOR", "None"))
 85"""Whether to use the camera spectator; overwrites `camera.spectator<.CameraConfig.spectator>`:py:attr:"""
 86
 87USE_OPEN_DRIVE_DATA = False
 88
 89DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES = 5
 90"""
 91The smaller the the value the more exact will the agent stick to the original route,
 92BUT ONLY IF the route is provided as a fine-grained route.
 93
 94NOTE: We should NOT rely on the route to be available in a fine grained manner -> Should work with larger values
 95
 96Larger values will make the agent cut corners and drive more straight lines.
 97Needs extra tools to stick to the road.
 98"""
 99
100args: LaunchConfig
101"""Global access to the launch config; set in :py:meth:`LunaticChallenger.setup`"""
102
103
[docs] 104class LunaticChallenger(AutonomousAgent, LunaticAgent): 105 """ 106 Variant of the :py:class:`.LunaticAgent` that is compatible with the 107 `Leaderboard 2.0 <https://leaderboard.carla.org/>`_ interface. 108 109 Attention: 110 If the :py:class:`LunaticChallenger` is used without the Leaderboard 2.0 framework 111 the :py:meth:`__call__` method should be used instead of :py:meth:`run_step` 112 to acquire the next control. 113 """ 114 115 sensor_interface: "SensorInterface" #: :meta private: 116 117 _global_plan: "list[tuple[_GPSDataDict, RoadOption]]" = None # type: ignore[assignment] 118 _global_plan_world_coord: "list[tuple[carla.Transform, RoadOption]]" = None # type: ignore[assignment] 119 _global_plan_waypoints: "list[tuple[carla.Waypoint, RoadOption]]" = None # type: ignore[assignment] 120 121 def __init__(self, carla_host, carla_port, debug=False): 122 """ 123 Initializes the LunaticChallenger. This does not yet load the config or calls :py:meth:`LunaticAgent.__init__ <agents.lunatic_agent.LunaticAgent.__init__>`. 124 125 Further initialization is done in :py:meth:`LunaticChallenger.setup`. 126 """ 127 print("Initializing LunaticChallenger") 128 # Correct types are set at setup 129 self.world_model: WorldModel = None # type: ignore[assignment] 130 self.game_framework: GameFramework = None # type: ignore[assignment] 131 self._destroyed = False 132 # AutonomousAgent init 133 super().__init__(carla_host, carla_port, debug) 134 self.track = Track.MAP 135 self._opendrive_data = None 136 self._local_planner: "LocalPlanner" = None # type: ignore[assignment] 137
[docs] 138 def setup(self, path_to_conf_file: Union[str, LaunchConfig]): 139 """ 140 Initializes the underlying :py:class:`.LunaticAgent` as well as instances of :py:class:`.GameFramework` and :py:class:`.WorldModel`. 141 142 To some extends initializes the Hydra_ framework and load the configuration. 143 144 Parameters: 145 path_to_conf_file: 146 Can either be a string pointing to a configuration file to load a 147 :py:class:`.LaunchConfig` or a :py:class:`.LaunchConfig` to be used directly. 148 149 Note: 150 If a :py:class:`.LaunchConfig` is passed directly the Hydra_ setup will be skipped. 151 """ 152 self._destroyed = False 153 self.track = Track.MAP 154 if isinstance(path_to_conf_file, str): 155 print("Setup with conf file", path_to_conf_file) 156 logger.info("Setup with conf file %s", path_to_conf_file) 157 config_dir, config_name = os.path.split(path_to_conf_file) 158 # TODO: Maybe move to init so its available during set_global_plan 159 global args # Unused 160 overrides = ["agent=leaderboard"] 161 if not GameFramework.hydra_initialized(): 162 initialize_config_dir(version_base=None, 163 config_dir=os.path.abspath(config_dir), 164 job_name="LeaderboardAgent") 165 if ENABLE_DETECTION_MATRIX is not None: 166 overrides.append("agent.detection_matrix.enabled=" + str(ENABLE_DETECTION_MATRIX).lower()) 167 overrides.append("agent.detection_matrix.sync=" + str(DATA_MATRIX_ASYNC).lower()) 168 if ENABLE_RSS is not None: 169 overrides.append("agent.rss.enabled=" + str(ENABLE_RSS).lower()) 170 if CAMERA_SPECTATOR is not None: 171 overrides.append("camera.spectator=" + str(CAMERA_SPECTATOR).lower()) 172 args = cast(LaunchConfig, 173 compose(config_name=config_name, 174 return_hydra_config=True, 175 overrides=overrides) # uses conf/agent/leaderboard 176 ) 177 args.debug = DEBUG 178 # Let scenario manager decide 179 if args.map: 180 logger.warning(f"Map should be set by scenario manager and be None in the config file found map is {args.map}.") 181 if args.handle_ticks: 182 logger.warning("When using the leaderboard agent, `handle_ticks` should be False.") 183 args.handle_ticks = False 184 if args.sync is not None: 185 logger.warning("When using the leaderboard agent, `sync` should be None.") 186 args.sync = None 187 188 # Setup Hydra 189 if OmegaConf.is_missing(args.hydra.runtime, "output_dir"): 190 args.hydra.runtime.output_dir = args.hydra.run.dir 191 HydraConfig.instance().set_config(args) # type: ignore[arg-type] 192 Path(args.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True) 193 # Assure that our logger works 194 configure_log(args.hydra.job_logging, logger.name) # type: ignore[arg-type] 195 196 if DATA_MATRIX_ASYNC is not None: 197 args.agent.detection_matrix.sync = not DATA_MATRIX_ASYNC 198 if DATA_MATRIX_INTERVAL is not None: 199 args.agent.detection_matrix.sync_interval = DATA_MATRIX_INTERVAL 200 logger.info(OmegaConf.to_yaml(args)) 201 else: 202 args = cast(LaunchConfig, 203 compose(config_name=config_name, 204 return_hydra_config=True, 205 overrides=overrides) # uses conf/agent/leaderboard 206 ) 207 logger.setLevel(logging.DEBUG) 208 self.args = args 209 210 config = LunaticAgentSettings.create(self.args.agent, assure_copy=True, as_dictconfig=True) 211 else: 212 self.args = path_to_conf_file 213 config = self.args.agent 214 if OmegaConf.is_missing(config.planner, "dt"): 215 config.planner.dt = 1 / 20 # TODO: maybe get from somewhere else 216 217 self.game_framework = GameFramework(self.args, config) 218 print("Game framework setup") 219 # TODO: How to make args optional 220 self.world_model = WorldModel(config, args=self.args) 221 self.game_framework.world_model = self.world_model 222 print("World Model setup") 223 self.controller = self.game_framework.make_controller(self.world_model, RSSKeyboardControl, start_in_autopilot=False) # Note: stores weakref to controller 224 print("Initializing agent") 225 LunaticAgent.__init__(self=self, settings=config, world_model=self.world_model) 226 # super(AutonomousAgent, self).__init__(self, config, self.world_model) 227 print("LunaticAgent initialized") 228 229 from agents.rules.lane_changes.random_changes import RandomLaneChangeRule # noqa: PLC0415 230 for rules in self.rules.values(): 231 for rule in rules: 232 if isinstance(rule, RandomLaneChangeRule): 233 rule.enabled = False 234 235 # Set plan 236 if self._global_plan_waypoints: 237 self._local_planner_set_plan(self._global_plan_waypoints) 238 239 self.game_framework.agent = self # TODO: Remove this circular reference 240 self.agent_engaged = False 241 # Print controller docs 242 with contextlib.suppress(AttributeError): 243 print(self.controller.get_docstring())
244
[docs] 245 def sensors(self) -> "list[dict]": 246 """ 247 Define the sensor suite required by the agent 248 249 Returns: 250 A list containing the required sensors in the following format 251 252 .. code-block:: python 253 254 [ 255 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': -0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 256 'width': 300, 'height': 200, 'fov': 100, 'id': 'Left'}, 257 258 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': 0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 259 'width': 300, 'height': 200, 'fov': 100, 'id': 'Right'}, 260 261 {'type': 'sensor.lidar.ray_cast', 'x': 0.7, 'y': 0.0, 'z': 1.60, 'yaw': 0.0, 'pitch': 0.0, 'roll': 0.0, 262 'id': 'LIDAR'} 263 ] 264 265 Note: 266 The LunaticChallenger does not use any sensors; the usage of 'sensor.opendrive_map' is experimental, however 267 there is yet no parsing done for the data. 268 """ 269 sensors: list = super().sensors() # This should be empty 270 271 # temp; remove 272 try: 273 i = [x['type'] for x in args.leaderboard.sensors].index('sensor.opendrive_map') 274 except (ValueError, AttributeError): 275 pass 276 else: 277 args.leaderboard.sensors[i].use = USE_OPEN_DRIVE_DATA 278 279 # add sensors if they have the use flag in the config 280 with contextlib.suppress(Exception): 281 sensors.extend(filter(operator.itemgetter('use'), args.leaderboard.sensors)) 282 logger.info("Using sensors: %s", sensors) 283 return sensors
284 285 @staticmethod 286 def _print_input_data(input_data: Dict[str, Tuple[int, Any]]): 287 """ 288 Debug method to view the input data 289 """ 290 if not input_data: 291 return None 292 print("=====================>") 293 for key, val in input_data.items(): 294 if hasattr(val[1], 'shape'): 295 shape = val[1].shape 296 print(f"[{key} -- {val[0]:06d}] with shape {shape}") 297 else: 298 print(f"[{key} -- {val[0]:06d}] ") 299 print("<=====================") 300 return True 301 302 # This allows BlockingRules to pick up the coorect function
[docs] 303 @singledispatchmethod 304 def run_step(self, debug: bool = False, second_pass=False) -> carla.VehicleControl: # noqa: ARG002 305 """ 306 Attention: 307 Use :py:meth:`__call__` instead of this method! 308 """ 309 # TODO: Possibly singledispatch to __call__ instead 310 return super(AutonomousAgent, self).run_step(debug=self.args.debug, second_pass=second_pass)
311 312 @run_step.register(dict) 313 def _(self, input_data: Dict[str, Tuple[int, Any]], timestamp: float = -1) -> carla.VehicleControl: # noqa: ARG002 314 """Function that is called by leaderboard framework""" 315 try: 316 if self._print_input_data(input_data) and "OpenDRIVE" in input_data: 317 frame, data_value = input_data["OpenDRIVE"] 318 data: str = data_value["opendrive"] 319 if self._opendrive_data != data: 320 if self._opendrive_data is not None: 321 #breakpoint() 322 pass 323 self._opendrive_data = data 324 print(frame, data[:5000]) 325 Path("opendrive.xml").write_text(data) 326 else: 327 print("OpenDRIVE data unchanged") 328 self.agent_engaged = True # remove this, if not used 329 330 with self.game_framework: 331 control = self.run_step(self.args.debug) # Call Lunatic Agent run_step 332 # Handle render updates 333 334 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN, prior_results=control) 335 if self.controller.parse_events(self.get_control()): 336 print("Exiting by user input.") 337 raise UserInterruption("Exiting by user input.") # noqa: TRY301 338 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.END, prior_results=None) 339 340 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control) 341 final_controls: carla.VehicleControl = self.get_control() # type: ignore[assignment] 342 343 # TODO: Update HUD controls info 344 except Exception as e: 345 if not isinstance(e, UserInterruption): 346 logger.exception("Error in LunaticChallenger.run_step:") 347 self.destroy() 348 raise 349 else: 350 return final_controls 351 352 # NOTE: to update the doc use 353 #run_step.func.__doc__ += "\n" + LunaticAgent.run_step.__doc__ 354 355 @staticmethod 356 def _transform_to_waypoint(transform: "carla.Transform", project_to_road=True, lane_type=carla.LaneType.Driving) -> "carla.Waypoint": 357 # maybe move to misc / tools 358 return CarlaDataProvider.get_map().get_waypoint(transform.location, project_to_road=project_to_road, lane_type=lane_type) # pyright: ignore[reportReturnType] 359 360 def _local_planner_set_plan(self, plan): 361 super(AutonomousAgent, self).set_global_plan(plan, stop_waypoint_creation=True, clean_queue=True) 362 if self.game_framework._args.debug: 363 draw_route(CarlaDataProvider.get_world(), plan, vertical_shift=0.5, size=0.15, downsample=1, life_time=1000.0) 364
[docs] 365 def set_global_plan(self, global_plan_gps: Sequence["tuple[_GPSDataDict, RoadOption]"], global_plan_world_coord: Sequence["tuple[carla.Transform, RoadOption]"]): 366 """ 367 Set the plan (route) for the agent 368 """ 369 # old: 370 # super().set_global_plan(global_plan_gps, global_plan_world_coord) 371 # new: 372 #print("==============Road updated============") 373 #print("Plan GPS", global_plan_gps[:10]) 374 #print("Plan World Coord", global_plan_world_coord[:10]) 375 376 ds_ids: "list[int]" = downsample_route(global_plan_world_coord, DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES) # Downsample to less distance. TODO: should increase this 377 #print("Downsampled ids", ds_ids) 378 379 # Reduce the global plan to the downsampled ids 380 self._global_plan_world_coord = [(global_plan_world_coord[x][0], global_plan_world_coord[x][1]) for x in ds_ids] 381 assert self._global_plan_world_coord == [global_plan_world_coord[x] for x in ds_ids] 382 self._global_plan = [global_plan_gps[x] for x in ds_ids] 383 self._global_plan_waypoints = [(self._transform_to_waypoint(transform), road_option) for transform, road_option in self._global_plan_world_coord] 384 if self._local_planner is not None: 385 # TODO: maybe waypoints is not necessary as we extract locations 386 self._local_planner_set_plan(self._global_plan_waypoints)
387
[docs] 388 def __call__(self) -> carla.VehicleControl: 389 """ 390 Executes the next step and returns the control for the vehicle. 391 392 Attention: 393 Use this function instead of :py:meth:`run_step`! 394 """ 395 input_data = self.sensor_interface.get_data(GameTime.get_frame()) 396 397 timestamp = GameTime.get_time() 398 399 if args.leaderboard.print_time_info: 400 if not self.wallclock_t0: 401 self.wallclock_t0 = GameTime.get_wallclocktime() 402 wallclock = GameTime.get_wallclocktime() 403 wallclock_diff = (wallclock - self.wallclock_t0).total_seconds() 404 sim_ratio = 0 if wallclock_diff == 0 else timestamp / wallclock_diff 405 406 print('=== [Agent] -- Wallclock = {} -- System time = {} -- Game time = {} -- Ratio = {}x'.format( 407 str(wallclock)[:-3], format(wallclock_diff, '.3f'), format(timestamp, '.3f'), format(sim_ratio, '.3f'))) 408 409 control = self.run_step(input_data, timestamp) 410 control.manual_gear_shift = False 411 412 return control
413
[docs] 414 def destroy(self): 415 self._destroyed = True 416 print("Destroying Lunatic Challenger") 417 if self._detection_matrix is not None: 418 self._detection_matrix.stop() 419 self._detection_matrix = None 420 super().destroy() 421 if self.world_model: 422 if not WORLD_MODEL_DESTROY_SENSORS: 423 self.world_model.actors.clear() 424 self.world_model.destroy() 425 self.world_model = None # type: ignore[assignment] 426 if self.game_framework: 427 self.game_framework.agent = None 428 self.game_framework = None # type: ignore[assignment] 429 pygame.quit() 430 print("Destroyed", self)
431 432 def __del__(self): 433 if not self._destroyed: 434 self.destroy()
435 436 437# --- Type hints ---
[docs] 438class _GPSDataDict(TypedDict if not READTHEDOCS or TYPE_CHECKING else object): 439 """ 440 Bases: :py:class:`typing.TypedDict` 441 442 Type hint for the global plan in GPS coordinates, 443 used in :py:meth:`.LunaticChallenger.set_global_plan`. 444 445 :meta public: 446 """ 447 lat: float 448 lon: float 449 z: float