Source code for agents.lunatic_agent

   1"""
   2This module implements an agent that roams around a track following random
   3waypoints and avoiding other vehicles. The agent also responds to traffic lights,
   4traffic signs, and has different possible configurations.
   5"""
   6
   7# Decide how to handle imports
   8# ruff: noqa: PLC0415
   9
  10from __future__ import annotations
  11
  12import contextlib
  13import sys
  14from copy import deepcopy
  15from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, List, NoReturn, Optional, Set, Union
  16from typing import cast as assure_type
  17
  18import carla  # pyright: ignore[reportMissingTypeStubs]
  19import omegaconf
  20from omegaconf import DictConfig, OmegaConf
  21from typing_extensions import Literal, Self, Unpack
  22
  23from agents import substep_managers
  24from agents.dynamic_planning.dynamic_local_planner import DynamicLocalPlannerWithRss
  25from agents.navigation.behavior_agent import BehaviorAgent
  26from agents.navigation.global_route_planner import GlobalRoutePlanner
  27from agents.rules import rule_from_config
  28from agents.tools.config_creation import (
  29    AgentConfig,
  30    LaunchConfig,
  31    LiveInfo,
  32    LunaticAgentSettings,
  33    RssRoadBoundariesModeAlias,
  34    RuleCreatingParameters,
  35)
  36from agents.tools.logs import logger
  37from agents.tools.lunatic_agent_tools import (
  38    detect_vehicles,
  39    must_clear_hazard,
  40    # Left here as a reminder that this can be used as a wrapper
  41    phase_callback,  # type: ignore[unused-import] # noqa: F401
  42    result_to_context,
  43)
  44from agents.tools.misc import lanes_have_same_direction
  45from classes.constants import AD_RSS_AVAILABLE, READTHEDOCS, AgentState, Hazard, HazardSeverity, Phase, RoadOption
  46from classes.exceptions import (
  47    AgentDoneException,
  48    ContinueLoopException,
  49    EmergencyStopException,
  50    LunaticAgentException,
  51    NoFurtherRulesException,
  52    SkipInnerLoopException,
  53    UpdatedPathException,
  54    UserInterruption,
  55)
  56from classes.rule import BlockingRule, Context, Rule
  57from classes.worldmodel import CarlaDataProvider, WorldModel
  58from classes.detection_matrix import AsyncDetectionMatrix, DetectionMatrix
  59from classes.information_manager import InformationManager
  60
  61if TYPE_CHECKING:
  62    from agents.tools.hints import ObstacleDetectionResult, TrafficLightDetectionResult
  63    import pygame
  64
  65
[docs] 66class LunaticAgent(BehaviorAgent): 67 """ 68 BasicAgent implements an agent that navigates the scene. 69 This agent respects traffic lights and other vehicles, but ignores stop signs. 70 It has several functions available to specify the route that the agent must follow, 71 as well as to change its parameters in case a different driving mode is desired. 72 """ 73 74 BASE_SETTINGS: "type[LunaticAgentSettings]" = LunaticAgentSettings 75 """ 76 Base AgentConfig class for this agent. This is used to create the default settings for the agent 77 if none are provided. 78 """ 79 80 DEFAULT_RULES: ClassVar[Dict[Phase, List[Rule]]] = {k: [] for k in Phase.get_phases()} 81 """ 82 Default rules of this agent class when initialized. 83 84 :meta hide-value: 85 """ 86 87 rules: Dict[Phase, List[Rule]] 88 """ 89 The rules of the this agent. 90 When initialized the rules are deep copied from :py:attr:`DEFAULT_RULES`. 91 """ 92 93 ctx: Context 94 """The context object of the current step""" 95 96 # Information from the InformationManager 97 walkers_nearby: List[carla.Walker] 98 vehicles_nearby: List[carla.Vehicle] 99 static_obstacles_nearby: List[carla.Actor] 100 """Static obstacles detected by the :py:class:`.InformationManager`""" 101 102 obstacles_nearby: List[carla.Actor] 103 """ 104 Combination of :py:attr:`vehicles_nearby`, :py:attr:`walkers_nearby` 105 and :py:attr:`static_obstacles_nearby`. 106 """ 107 108 traffic_lights_nearby: List[carla.TrafficLight] 109 traffic_signs_nearby: List[carla.TrafficSign] = NotImplemented 110 """ 111 Not yet implemented 112 113 :meta private: 114 """ 115 116 current_states: Dict[AgentState, int] 117 """The current states of the agent. The count of the steps being each state is stored as value.""" 118 119 # 120 121 _world_model: WorldModel 122 """Reference to the attached :py:class:`WorldModel`.""" 123 124 _validate_phases = False 125 """ 126 Debugging flag to sanity check if the agent passes trough the phases in the correct order. 127 Attention: Currently straight forward anymore. 128 """ 129 130 _active_blocking_rules: Set[BlockingRule] 131 """Blocking rules that are currently active and have taken over the agents loop.""" 132 133 # ------------------ Initialization ------------------ # 134 135 from agents.tools.lunatic_agent_tools import create_agent_config as _create_agent_config 136
[docs] 137 @classmethod 138 def create_world_and_agent( 139 cls, 140 args: LaunchConfig, 141 *, 142 vehicle: Optional[carla.Vehicle] = None, 143 sim_world: carla.World, 144 settings_archetype: "Optional[type[AgentConfig]]" = None, 145 agent_config: Optional["LunaticAgentSettings"] = None, 146 overwrites: Optional[Dict[str, Any]] = None, 147 ) -> tuple[Self, WorldModel, GlobalRoutePlanner]: 148 """ 149 Setup function to create the agent from the :py:class:`LaunchConfig` settings. 150 151 Note: 152 - :py:meth:`.GameFramework.init_agent_and_interface` is the preferred way to create to 153 instantiate the agent, only use this method if you try not do create a 154 :py:class:`GameFramework` object. 155 """ 156 if overwrites is None: 157 overwrites = {} 158 159 if agent_config is None: 160 if hasattr(args, "agent"): 161 if settings_archetype is not None: 162 logger.warning("settings_archetype was passed but using args.agent. Ignoring settings_archetype.") 163 agent_config = args.agent 164 elif settings_archetype is not None and isinstance(settings_archetype, object): 165 logger.warning("settings_archetype is an instance. To pass an instance use agent_config instead.") 166 agent_config = settings_archetype # type: ignore 167 elif settings_archetype is not None: 168 logger.debug("Creating config from settings_archetype") 169 behavior = settings_archetype(overwrites) 170 agent_config = cls.BASE_SETTINGS.cast(behavior.to_dict_config()) 171 else: 172 logger.debug("Using %s._base_settings %s to create config.", cls.__name__, cls.BASE_SETTINGS) 173 agent_config = cls.BASE_SETTINGS.cast(cls.BASE_SETTINGS.to_dict_config()) 174 assert agent_config is not None 175 else: 176 logger.debug("A config was passed, using it as is.") 177 178 world_model = WorldModel(agent_config, args=args, carla_world=sim_world, player=vehicle) # TEST: without args 179 agent_config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage] 180 181 agent = cls(agent_config, world_model) 182 return agent, world_model, agent.get_global_planner()
183 184 def __init__( 185 self, 186 settings: Union[str, LunaticAgentSettings], 187 world_model: Optional[WorldModel] = None, 188 *, 189 vehicle: Optional[carla.Vehicle] = None, 190 overwrite_options: Optional[dict[str, Any]] = None, 191 debug: bool = True, 192 ): 193 """ 194 Initialize the LunaticAgent. 195 196 Args: 197 settings : 198 The settings of the agent to construct the :py:attr:`LunaticAgent.config`. 199 It can be either a string pointing to a yaml file or a LunaticAgentSettings. 200 world_model : The world model to use. If None, a new WorldModel will be created. 201 vehicle : The vehicle controlled by the agent. Can be None then the :code:`world_model.player` will be used. 202 overwrite_options : Additional options to overwrite the default agent configuration. 203 debug : Whether to enable debug mode. 204 """ 205 self._debug = debug 206 """Enables additional debug output.""" 207 if world_model is None and len(sys.argv) > 1: 208 logger.error( 209 "BUG: Beware when not passing a WorldModel, the WorldModel currently ignores command line overrides, " 210 "i.e. will only use the config file.\n> Use `%s.create_world_and_agent` and provide the LaunchConfig instead.", 211 self.__class__.__name__, 212 ) 213 214 # -------------------- Settings ------------------------ 215 # Depending on the input type, the settings are created differently. 216 217 self.config = self._create_agent_config(settings, world_model, overwrite_options) 218 219 # ------------------------------- 220 221 logger.info("\n\nAgent config is %s", OmegaConf.to_yaml(self.config)) 222 223 # World Model 224 if world_model is None: 225 world_model = WorldModel(self.config, player=vehicle) 226 self.config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage] 227 228 self._world_model: WorldModel = world_model 229 self._world: carla.World = world_model.world 230 231 # Register Vehicle 232 assert world_model.player is not None 233 self.set_vehicle(world_model.player) 234 235 self.current_phase: Phase = Phase.NONE 236 """current phase of the agent inside the loop""" 237 238 self.ctx = None # type: ignore 239 240 self._last_traffic_light: Optional[carla.TrafficLight] = None 241 """Current red traffic light""" 242 243 # TODO: No more hardcoded defaults / set them from opt_dict which must have all parameters; check which are parameters and which are set by other functions (e.g. _look_ahead_steps) 244 245 # Parameters from BehaviorAgent ------------------------------------------ 246 # todo: check redefinitions 247 248 self._look_ahead_steps: int = 0 249 """ 250 updated in _update_information used for local_planner.get_incoming_waypoint_and_direction 251 """ 252 253 # Initialize the planners 254 self._global_planner = ( 255 CarlaDataProvider.get_global_route_planner() 256 ) # NOTE: THIS does not use self.config.planner.sampling_resolution 257 if not self._global_planner: 258 logger.error( 259 "Global Route Planner not set - This should not happen, if the CarlaDataProvider has been initialized." 260 ) 261 self._global_planner = GlobalRoutePlanner( 262 CarlaDataProvider.get_map(), self.config.planner.sampling_resolution 263 ) 264 CarlaDataProvider._grp = self._global_planner # pyright: ignore[reportPrivateUsage] 265 266 # Get the static elements of the scene 267 self._traffic_light_map: Dict[carla.TrafficLight, carla.Transform] = CarlaDataProvider._traffic_light_map # pyright: ignore[reportPrivateUsage] 268 self._lights_list = CarlaDataProvider._traffic_light_map.keys() # pyright: ignore[reportPrivateUsage] 269 self._lights_map: Dict[int, carla.Waypoint] = {} 270 """Dictionary mapping a traffic light to a wp corresponding to its trigger volume location""" 271 272 # Vehicle Lights 273 self._vehicle_lights = carla.VehicleLightState.NONE 274 275 # Collision Sensor # NOTE: another in the WorldModel for the HUD 276 self._set_collision_sensor() 277 278 # Rule Framework 279 # 1. add all rules from the class, if any - else this is a dict with empty lists 280 self.rules = deepcopy(self.__class__.DEFAULT_RULES) # Copies the ClassVar to the instance 281 282 # 2. add rules from the config 283 self.add_config_rules() 284 285 # Information Manager 286 self.information_manager = InformationManager(self) 287 self.current_states = self.information_manager.state_counter # share the dict 288 289 # Blocking Rules 290 self._active_blocking_rules = set() 291 292 # --------------------- Collision Callback ------------------------ 293 294 def _set_collision_sensor(self): 295 # see: https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector 296 # and https://carla.readthedocs.io/en/latest/python_api/#carla.Sensor.listen 297 blueprint = CarlaDataProvider._blueprint_library.find("sensor.other.collision") # pyright: ignore[reportPrivateUsage] 298 self._collision_sensor: carla.Sensor = assure_type( 299 "carla.Sensor", 300 CarlaDataProvider.get_world().spawn_actor(blueprint, carla.Transform(), attach_to=self._vehicle), 301 ) 302 303 self._collision_sensor.listen(self._collision_event) 304 305 from agents.substep_managers import collision_manager as _collision_manager 306 307 def _collision_event(self, event: carla.CollisionEvent): 308 """ 309 Callback function for the collision sensor. 310 311 By default uses :py:func:`agents.substep_managers.collision_manager`. 312 313 Executes Phases: 314 - :py:class:`Phase.COLLISION | Phase.BEGIN<.Phase>` 315 - :py:class:`Phase.COLLISION | Phase.END<.Phase>` 316 """ 317 # https://carla.readthedocs.io/en/latest/python_api/#carla.CollisionEvent 318 # e.g. setting ignore_vehicles to False, if it was True before. 319 # do an emergency stop (in certain situations) 320 try: 321 self.execute_phase(Phase.COLLISION | Phase.BEGIN, prior_results=event) 322 except AttributeError as e: 323 if "'NoneType' object has no attribute 'prior_result'" not in str(e): 324 raise 325 # context not yet set up, very early collision 326 result = self._collision_manager(event) 327 try: 328 self.execute_phase(Phase.COLLISION | Phase.END, prior_results=result) 329 except AttributeError as e: 330 if "'NoneType' object has no attribute 'prior_result'" not in str(e): 331 raise 332 # context not yet set up, very early collision 333 334 # --------------------- Adding rules ------------------------ 335
[docs] 336 def add_rule(self, rule: Rule, position: Union[int, None] = None): 337 """ 338 Add a rule to the agent. The rule will be inserted at the given position. 339 340 Args: 341 rule : The rule to add 342 position : 343 The position to insert the rule at. 344 If :python:`None` the rule list will be sorted by priority. 345 Defaults to :python:`None`. 346 """ 347 for p in rule.phases: 348 if p not in self.rules: 349 logger.warning("Phase %s from Rule %s is not a default phase. Adding a new phase.", p, rule) 350 self.rules[p] = [] 351 if position is None: 352 self.rules[p].append(rule) 353 self.rules[p].sort(key=lambda r: r.priority, reverse=True) 354 else: 355 self.rules[p].insert(position, rule)
356
[docs] 357 def add_rules(self, rules: "Rule | Iterable[Rule]"): 358 """Add a list of rules and sort the agents rules by priority.""" 359 if isinstance(rules, Rule): 360 rules = [rules] 361 for rule in rules: 362 for phase in rule.phases: 363 self.rules[phase].append(rule) 364 for phase in self.rules.keys(): # noqa: SIM118 365 self.rules[phase].sort(key=lambda r: r.priority, reverse=True)
366
[docs] 367 def add_config_rules(self, config: Optional[Union[LunaticAgentSettings, List[RuleCreatingParameters]]] = None): 368 """ 369 Adds rules 370 """ 371 if config is None: 372 config = self.config 373 rule_list: List[RuleCreatingParameters] 374 # "rules" in config is also false for rules == MISSING 375 if isinstance(config, (AgentConfig, DictConfig)) and "rules" in config.keys(): # noqa: SIM118, RUF100 376 try: 377 rule_list = config.rules 378 except omegaconf.MissingMandatoryValue: 379 logger.debug("`rules` key was missing, skipping rule addition.") 380 return 381 else: 382 rule_list: List[RuleCreatingParameters] = config # type: ignore[assignment] 383 logger.debug("Adding rules from config:\n%s", OmegaConf.to_yaml(rule_list)) 384 for rule in rule_list: 385 self.add_rules(rule_from_config(rule)) # each call could produce one or more rules
386 387 # --------------------- Properties ------------------------ 388 389 @property 390 def live_info(self) -> LiveInfo: 391 return self.config.live_info 392 393 @property 394 def detection_matrix(self): 395 """ 396 Returns :any:`DetectionMatrix.getMatrix` if the matrix is set. 397 """ 398 if self._detection_matrix: 399 return self._detection_matrix.getMatrix() 400 return None 401 402 # ------------------ Hazard ------------------ # 403 404 @property 405 def detected_hazards(self) -> Set[Hazard]: 406 return self.ctx.detected_hazards 407 408 @property 409 def detected_hazards_info(self) -> Dict[Hazard, Any]: 410 """ 411 Information about the detected hazards, e.g. severity. 412 """ 413 return self.ctx.detected_hazards_info 414 415 @detected_hazards.setter 416 def detected_hazards(self, hazards: Set[Hazard]): 417 if not isinstance(hazards, set): # pyright: ignore[reportUnnecessaryIsInstance] 418 raise TypeError("detected_hazards must be a set of Hazards.") 419 self.ctx._detected_hazards = hazards # pyright: ignore[reportPrivateUsage] 420 421 # These have the same signature and can be used interchangeably 422 discard_hazard = Context.discard_hazard 423 add_hazard = Context.add_hazard 424 has_hazard = Context.has_hazard 425 426 # 427 @property 428 def current_traffic_light(self) -> "carla.TrafficLight | None": 429 """Alias to :py:attr:`self._last_traffic_light <_last_traffic_light>`.""" 430 return self._last_traffic_light 431 432 @current_traffic_light.setter 433 def current_traffic_light(self, traffic_light: carla.TrafficLight): 434 self._last_traffic_light = traffic_light 435 436 @property 437 def phase_results(self) -> Dict[Phase, Any]: 438 """ 439 Retrieves :py:attr:`agent.ctx.phase_results <classes.rule.Context.phase_results>` 440 441 Stores the results of the phases the agent has been in. 442 By default the keys are set to :any:`Context.PHASE_NOT_EXECUTED`. 443 """ 444 return self.ctx.phase_results 445 446 @property 447 def active_blocking_rules(self) -> Set[BlockingRule]: 448 """ 449 Blocking rules that are currently active and have taken over the agents loop. 450 """ 451 return self._active_blocking_rules 452 453 @property 454 def _map(self) -> carla.Map: 455 """Get the current map of the world.""" # Needed *only* for set_destination 456 return CarlaDataProvider.get_map() 457 458 # ------------------ 459 460 # @property 461 # def ctx(self) -> Union[Context, None]: 462 # print("Getting Context", self._ctx()) 463 # return self._ctx() # might be None 464 465 # ------------------ Information functions ------------------ # 466
[docs] 467 def update_information(self, second_pass: bool = False): 468 """ 469 Updates the information regarding the ego vehicle based on the surrounding world. 470 471 Parameters: 472 second_pass : *Internal usage* set to :python:`True` if this function is called a second 473 time in the same tick, e.g. after a route update. 474 475 See Also: 476 - :py:attr:`information_manager` 477 478 Executes the phases: 479 - :py:class:`Phase.UPDATE_INFORMATION | Phase.BEGIN<.Phase>` 480 - :py:class:`Phase.UPDATE_INFORMATION | Phase.END<.Phase>` 481 """ 482 self.execute_phase(Phase.UPDATE_INFORMATION | Phase.BEGIN, prior_results=None) 483 self._update_information(second_pass=second_pass) 484 self.execute_phase(Phase.UPDATE_INFORMATION | Phase.END, prior_results=None)
485 486 def _update_information(self, *, second_pass: bool = False): 487 """ 488 This method updates the information regarding the ego 489 vehicle based on the surrounding world. 490 491 second_pass = True will skip some calculations, 492 especially useful if a second call to _update_information is necessary in the same tick. 493 494 Assumes: second_pass == True => agent.done() == False 495 496 Note: 497 Does not execute any Phase. 498 """ 499 # -------------------------------------------------------------------------- 500 # Information that is CONSTANT DURING THIS TICK and INDEPENDENT OF THE ROUTE 501 # -------------------------------------------------------------------------- 502 if not second_pass: 503 if self._debug: # noqa: SIM102 504 if not self._lights_list and len(CarlaDataProvider._traffic_light_map.keys()): # pyright: ignore[reportPrivateUsage] 505 logger.error("Traffic light list is empty, but map is not.") 506 507 # ---------------------------- 508 # First Pass for expensive and tick-constant information 509 510 # --- InformationManager --- 511 information: InformationManager.Information = ( 512 self.information_manager.tick() 513 ) # NOTE: # Warning: Currently not route-dependant, might need to be changed later 514 self.tick_information = information 515 516 self._current_waypoint = information.current_waypoint 517 self.current_states = information.current_states 518 519 # Maybe access over subattribute, or properties 520 # NOTE: If other scripts, e.g. the scenario_runner, are used in parallel or sync=False 521 # the stored actors might could have been destroyed later on. 522 523 # Global Information 524 self.all_vehicles = information.vehicles 525 self.all_walkers = information.walkers 526 self.all_static_obstacles = information.static_obstacles 527 # Combination of the three lists 528 self.all_obstacles = information.obstacles 529 530 # Filtered by config.obstacles.nearby_vehicles_max_distance and nearby_vehicles_max_distance 531 self.vehicles_nearby = information.vehicles_nearby 532 self.walkers_nearby = information.walkers_nearby 533 self.static_obstacles_nearby = information.static_obstacles_nearby 534 535 # Combination of the three lists 536 self.all_obstacles_nearby = information.obstacles_nearby 537 538 self.traffic_lights_nearby = information.traffic_lights_nearby 539 540 # Find vehicles and walkers nearby; could be moved to the information manager 541 542 # ---------------------------- 543 544 # Data Matrix 545 # update not every frame to save performance 546 if self._detection_matrix and self._detection_matrix.sync: 547 self._road_matrix_counter += 1 548 if (self._road_matrix_counter % self.config.detection_matrix.sync_interval) == 0: 549 # logger.debug("Updating Road Matrix") 550 # TODO: Still prevent async mode from using too much resources and slowing fps down too much. 551 self._detection_matrix.update() # NOTE: Does nothing if in async mode. self.road_matrix is updated by another thread. 552 # else: 553 # pass 554 555 # used for self._local_planner.get_incoming_waypoint_and_direction 556 self._look_ahead_steps = int( 557 (self.live_info.current_speed_limit) / 10 558 ) # TODO: Maybe make this an interpolation and make more use of it 559 560 # ----- Follow speed limits ------- 561 # NOTE: This is once again set in the local planner, but only on the Context config! 562 if self.config.speed.follow_speed_limits: 563 self.config.speed.target_speed = self.config.live_info.current_speed_limit 564 565 # NOTE: This is the direction used by the planner in the *last* step. 566 self.live_info.executed_direction = self._local_planner.target_road_option 567 568 assert self.live_info.executed_direction == self._local_planner.target_road_option, ( 569 "Executed direction should not change." 570 ) 571 572 # ------------------------------------------------------------------- 573 # Information that NEEDS TO BE UPDATED AFTER a plan / ROUTE CHANGE. 574 # ------------------------------------------------------------------- 575 if not self.done(): 576 # NOTE: This should be called after 577 self.live_info.incoming_waypoint, self.live_info.incoming_direction = ( 578 self._local_planner.get_incoming_waypoint_and_direction( # pyright: ignore[reportAttributeAccessIssue] 579 steps=self._look_ahead_steps 580 ) 581 ) 582 else: 583 assert second_pass is False, ( 584 "In the second pass the agent should have replanned and agent.done() should be False" 585 ) 586 # Assumes second_pass is False 587 # Queue is empty 588 self.live_info.incoming_waypoint = None 589 self.live_info.incoming_direction = RoadOption.VOID 590 591 # Information that requires updated waypoint and route information: 592 self.live_info.is_taking_turn = self.is_taking_turn() 593 self.live_info.is_changing_lane = self.is_changing_lane() 594 595 # logger.debug(f"Incoming Direction: {str(self.live_info.incoming_direction):<20} - Second Pass: {second_pass}") 596 597 # RSS 598 # todo uncomment if agent is created after world model 599 # self.rss_set_road_boundaries_mode() # in case this was adjusted during runtime. # TODO: maybe implement this update differently. As here it is called unnecessarily often. 600 601 if self._debug: 602 OmegaConf.to_container(self.live_info, resolve=True, throw_on_missing=True) 603
[docs] 604 def is_taking_turn(self) -> bool: 605 """Checks if the agent is taking a turn in a few steps""" 606 return self.live_info.incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT)
607
[docs] 608 def is_changing_lane(self) -> bool: 609 """Checks if the agent is changing lanes in a few steps""" 610 return self.live_info.incoming_direction in (RoadOption.CHANGELANELEFT, RoadOption.CHANGELANERIGHT)
611 612 # ------------------ Step & Loop Logic ------------------ # 613
[docs] 614 def execute_phase( 615 self, phase: Phase, *, prior_results: Any, update_controls: Optional[carla.VehicleControl] = None 616 ) -> Context: 617 """ 618 Sets the current phase of the agent and executes all rules that are associated with it. 619 620 Parameters: 621 phase : The phase to execute. 622 prior_results : The results of the previous phase, e.g. :py:attr:`detected_hazards`. 623 update_controls : Optionally controls that should be used from now onward. 624 """ 625 normal_next = self.current_phase.next_phase() # sanity checking if everything is correct 626 if self._validate_phases: 627 assert ( 628 normal_next in {phase, Phase.USER_CONTROLLED} 629 or phase & Phase.EXCEPTIONS 630 or phase & Phase.USER_CONTROLLED 631 ), ( 632 f"Phase {phase} is not the next phase of {self.current_phase} or an exception phase. Expected {normal_next}" 633 ) 634 635 self.current_phase = phase # set next phase 636 637 if update_controls is not None: 638 self.set_control(update_controls) 639 self.ctx.prior_result = prior_results 640 self.ctx.phase_results[phase] = prior_results 641 642 rules_to_check = self.rules.get(phase, ()) # use get if a custom phase is added, without a rule 643 try: 644 for rule in rules_to_check: # todo: maybe dict? grouped by phase? 645 assert self.current_phase in rule.phases, ( 646 f"Current phase {self.current_phase} not in Rule {rule.phases}" 647 ) # TODO remove: 648 rule(self.ctx) 649 # NOTE: Blocking rules can change the and above assertion will fail. 650 if phase != self.current_phase: 651 logger.warning( 652 "Phase was changed by rule %s to %s. " 653 "Resting self.current_phase to %s. " 654 "To prevent his raise an exception in the rule or adjust the phase.", 655 rule, 656 self.current_phase, 657 phase, 658 ) 659 self.current_phase = phase 660 661 except NoFurtherRulesException: 662 pass 663 except omegaconf.ReadonlyConfigError: 664 print( 665 "WARNING: A action likely tried to change `ctx.config` which is non-permanent. Use `ctx.agent.config.` instead." 666 ) 667 raise 668 return self.ctx
669 670 def _plan_path_phase(self, *, second_pass: bool, debug: bool = False): 671 try: 672 self.execute_phase(Phase.PLAN_PATH | Phase.BEGIN, prior_results=None) 673 # User defined action 674 # TODO: when going around corners / junctions and the distance between waypoints is too big, 675 # We should replan and and make a more fine grained plan, to stay on the road. 676 self.execute_phase(Phase.PLAN_PATH | Phase.END, prior_results=None) 677 except UpdatedPathException as e: 678 if second_pass: 679 logger.warning( 680 "UpdatedPathException was raised in the second pass. This should not happen: %s. Restrict your rule on ctx.second_pass.", 681 e, 682 ) 683 return self.run_step(debug, second_pass=True) 684 return None 685 686 def _make_context(self, last_context: Union[Context, None], **kwargs: Any) -> Context: 687 """Creates a new context object for the agent at the start of a step.""" 688 if last_context is not None: 689 del last_context.last_context 690 ctx = Context(agent=self, last_context=last_context, **kwargs) 691 self.ctx = ctx 692 return ctx 693
[docs] 694 def __call__(self, debug: bool = False) -> carla.VehicleControl: 695 """Calculates the next vehicle control object.""" 696 return self.run_step(debug, second_pass=False) # debug should be positional!
697 698 # Python 3.8+ add / for positional only arguments
[docs] 699 def run_step(self, debug: bool = False, second_pass: bool = False) -> carla.VehicleControl: 700 """ 701 Calculates the next vehicle control object. 702 703 Arguments: 704 debug : Whether to enable debug mode. 705 This prints some more information and debug drawings. 706 second_pass : **Internal usage** set to :python:`True` 707 if this function is called a second time, e.g. after a route update. 708 709 Warning: 710 To be compatible with the :py:class:`.LunaticChallenger`, **always pass** :code:`debug` **as a positional argument**, 711 or use the :py:meth:`__call__` method. 712 """ 713 714 if not second_pass: 715 ctx = self._make_context(last_context=self.ctx) 716 else: 717 ctx = self.ctx 718 ctx.second_pass = second_pass 719 try: 720 # ---------------------------- 721 # Phase 0 - Update Information 722 # ---------------------------- 723 # > Phase.UPDATE_INFORMATION | Phase.BEGIN 724 self.update_information(second_pass=second_pass) 725 # > Phase.UPDATE_INFORMATION | Phase.END 726 727 # ---------------------------- 728 # Phase 1 - Plan Path 729 # ---------------------------- 730 731 # Question: What TODO if the last phase was COLLISION (async), EMERGENCY 732 # Some information to PLAN_PATH should reflect this 733 734 # NOTE: Currently no option to diverge from existing path here, or plan a new path 735 # NOTE: Currently done in the local planner and behavior functions 736 self._plan_path_phase(second_pass=second_pass, debug=debug) 737 738 # Check whether the agent has reached its destination. 739 if self.done(): 740 # NOTE: Might be in NONE phase here. 741 self.execute_phase(Phase.DONE | Phase.BEGIN, prior_results=None) 742 if self.done(): 743 # No Rule set a new destination 744 logger.info("The target has been reached, stopping the simulation") 745 self.execute_phase(Phase.TERMINATING | Phase.BEGIN, prior_results=None) 746 raise AgentDoneException 747 self.execute_phase(Phase.DONE | Phase.END, prior_results=None) 748 return self.run_step( 749 debug, second_pass=True 750 ) # NOTE! For child classes like the leaderboard agent this calls the higher level run_step. 751 752 # ---------------------------- 753 # Phase NONE - Before Running step 754 # ---------------------------- 755 try: 756 planned_control = self._inner_step(debug=debug) # debug=True draws waypoints 757 if self.detected_hazards: 758 # The must_clear_hazard decorator will raise this, but in case the _inner_step is overwritten 759 raise EmergencyStopException(self.detected_hazards) # noqa: TRY301 760 # Reraise Exceptions 761 except UserInterruption: 762 raise 763 764 # Handled Exceptions 765 except EmergencyStopException as emergency: 766 # ---------------------------- 767 # Phase Emergency 768 # no Rule with Phase.EMERGENCY | BEGIN cleared the provided hazards in ctx.prior_results 769 # ---------------------------- 770 771 emergency_controls = self.emergency_manager(reasons=emergency.hazards_detected) 772 773 # TODO: somehow backup the control defined before. 774 self.execute_phase( 775 Phase.EMERGENCY | Phase.END, 776 update_controls=emergency_controls, 777 prior_results=emergency.hazards_detected, 778 ) 779 planned_control = self.get_control() # type: carla.VehicleControl # type: ignore[assignment] 780 781 except SkipInnerLoopException as skip: 782 self.set_control(skip.planned_control) 783 self.current_phase = Phase.USER_CONTROLLED 784 planned_control = skip.planned_control 785 except UpdatedPathException as update_exception: 786 if second_pass > 5: 787 logger.warning( 788 "UpdatedPathException was raised more than %s times. Warning: This might be an infinite loop.", 789 second_pass, 790 ) 791 elif second_pass > 50: 792 raise RecursionError( 793 "UpdatedPathException was raised more than 50 times. Assuming an infinite loop and terminating" 794 ) from update_exception 795 else: 796 logger.warning( 797 "%s was raised in the inner step, this should be done in Phase.PLAN_PATH instead.", 798 update_exception, 799 ) 800 return self.run_step(second_pass=int(second_pass) + 1) # type: ignore 801 except LunaticAgentException as lae: 802 if self.ctx.control is None: 803 msg = ( 804 f"A VehicleControl object must be set on the agent when {type(lae).__name__} is " 805 "raised during `._inner_step`" 806 ) 807 raise ValueError(msg) from lae 808 planned_control = self.get_control() # type: ignore[assignment] 809 # assert ctx.control 810 811 # ---------------------------- 812 # No known Phase multiple exit points 813 # ---------------------------- 814 815 # ---------------------------- 816 # Phase RSS - Check RSS 817 # ---------------------------- 818 819 ctx = self.execute_phase( 820 Phase.RSS_EVALUATION | Phase.BEGIN, prior_results=None, update_controls=planned_control 821 ) 822 if AD_RSS_AVAILABLE and self.config.rss and self.config.rss.enabled: 823 rss_updated_controls = self._world_model.rss_check_control(ctx.control) # type: ignore[arg-type] 824 else: 825 rss_updated_controls = None 826 # NOTE: rss_updated_controls could be None. 827 ctx = self.execute_phase(Phase.RSS_EVALUATION | Phase.END, prior_results=rss_updated_controls) 828 829 # if ctx.control is not planned_control: 830 # logger.debug("RSS updated control accepted.") 831 832 except ContinueLoopException as e: 833 logger.debug("ContinueLoopException skipping rest of loop.") 834 if self.ctx.control is None: 835 msg = f"A VehicleControl object must be set on the agent when {type(e).__name__} is raised during `._inner_step`" 836 raise ValueError(msg) from e 837 838 planned_control = self.ctx.control # type: carla.VehicleControl # type: ignore[assignment] 839 planned_control.manual_gear_shift = False 840 return self.get_control() # type: ignore[return-value]
841
[docs] 842 @must_clear_hazard 843 @result_to_context("control") 844 def _inner_step(self, debug: bool = False) -> carla.VehicleControl: 845 """ 846 This is is the internal function to provide the next control object for 847 the agent; it should run every tick. 848 849 Raises: 850 EmergencyStopException: If :py:attr:`detected_hazards` is not empty when the 851 function returns. 852 853 :meta public: 854 """ 855 self.debug = debug 856 857 # ---------------------------- 858 # Phase 2 - Detection of Pedestrians and Traffic Lights 859 # ---------------------------- 860 861 # Detect hazards 862 # phases are executed in detect_hazard 863 # > Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN # phases executed inside 864 pedestrians_and_tlight_hazard = self.detect_hazard() 865 # > Phase.DETECT_PEDESTRIANS | Phase.END 866 867 # Pedestrian avoidance behaviors 868 # currently doing either emergency (detect_hazard) stop or nothing 869 if self.detected_hazards: 870 # ---------------------------- 871 # Phase Hazard Detected (traffic light or pedestrian) 872 # If no Rule with Phase.EMERGENCY | BEGIN clears pedestrians_or_traffic_light 873 # An EmergencyStopException is raised 874 # ---------------------------- 875 self.react_to_hazard(pedestrians_and_tlight_hazard) # Optional[NoReturn] 876 877 # ----------------------------- 878 # Phase Detect Static Obstacles 879 # ----------------------------- 880 881 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.BEGIN, prior_results=None) 882 static_obstacle_detection_result = self.detect_obstacles_in_path(self.static_obstacles_nearby) 883 if static_obstacle_detection_result.obstacle_was_found: 884 self.current_states[AgentState.BLOCKED_BY_STATIC] += 1 885 # Must plan around it 886 self.add_hazard(Hazard.STATIC_OBSTACLE) 887 else: 888 self.current_states[AgentState.BLOCKED_BY_STATIC] = 0 889 # TODO: add a basic rule for circumventing static obstacles 890 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.END, prior_results=static_obstacle_detection_result) 891 # Not throwing an error here yet 892 893 # ---------------------------- 894 # Phase 3 - Detection of Cars 895 # ---------------------------- 896 897 self.execute_phase(Phase.DETECT_CARS | Phase.BEGIN, prior_results=None) # TODO: Maybe add some prio result 898 vehicle_detection_result = self.detect_obstacles_in_path(self.vehicles_nearby) 899 900 # TODO: add a way to let the execution overwrite 901 if vehicle_detection_result.obstacle_was_found: 902 # ---------------------------- 903 # Phase 2.A - React to cars in front 904 # TODO: turn this into a rule. 905 # remove CAR_DETECTED -> pass detection_result to rules 906 # TODO some way to circumvent returning control here, like above. 907 # TODO: Needs refinement with the car_following_behavior 908 # ---------------------------- 909 910 self.execute_phase(Phase.CAR_DETECTED | Phase.BEGIN, prior_results=vehicle_detection_result) 911 control = self.car_following_behavior(*vehicle_detection_result) # type: ignore[arg-type] 912 # NOTE: might throw EmergencyStopException 913 self.execute_phase( 914 Phase.CAR_DETECTED | Phase.END, update_controls=control, prior_results=vehicle_detection_result 915 ) 916 return self.get_control() # type: ignore[return-value] 917 918 # TODO: maybe new phase instead of END or remove CAR_DETECTED and handle as rules (maybe better) 919 self.execute_phase(Phase.DETECT_CARS | Phase.END, prior_results=None) # NOTE: avoiding tailgate here 920 921 # Intersection behavior 922 # NOTE: is_taking_turn <- incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT) 923 if self.live_info.incoming_waypoint.is_junction and self.is_taking_turn(): # pyright: ignore[reportOptionalMemberAccess] 924 # ---------------------------- 925 # Phase Turning at Junction 926 # ---------------------------- 927 928 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.BEGIN, prior_results=None) 929 control = self._calculate_control(debug=debug) 930 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.END, update_controls=control, prior_results=None) 931 return self.get_control() # type: ignore[return-value] 932 933 # ---------------------------- 934 # Phase 4 - Plan Path normally 935 # ---------------------------- 936 937 # Normal behavior 938 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.BEGIN, prior_results=None) 939 control = self._calculate_control(debug=debug) 940 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.END, prior_results=None, update_controls=control) 941 942 # Leave loop and apply controls outside 943 return self.get_control() # type: ignore[return-value]
944 945 def _calculate_control(self, debug: bool = False): 946 """ 947 Plan the next step of the agent. This will execute the local planner 948 to retrieve the next control fitting the current path and settings. 949 950 Note: 951 This is the innermost function of the agents run_step function. 952 It should be called each step to acquire a desired control object. 953 Use this function inside rules if a control object is desired. 954 955 **[Context.get_or_calculate_control](#Context.get_or_calculate_control) 956 is a safer alternative to this function** 957 958 Warning: 959 If you do not use this function in a [`BlockingRule`](#BlockingRule) 960 you should raise a `SkipInnerLoopException` or `ContinueLoopException` 961 else the planned path will skip a waypoint. 962 963 Warning: 964 This function only calculates and returns the control object directly. 965 **It does not set the :py:attr:`agent/ctx.control <control>` attribute which is the one 966 the agent uses in [`apply_control`](#apply_control) to apply the final controls.** 967 """ 968 if self.ctx.control is not None: 969 logger.error("Control was set before calling _calculate_control. This might lead to unexpected behavior.") 970 971 return self._local_planner.run_step(debug) 972
[docs] 973 def parse_keyboard_input( 974 self, allow_user_updates: bool = True, *, control: Optional[carla.VehicleControl] = None 975 ) -> None: 976 """ 977 Parse the current user input and allow manual updates of the controls. 978 979 Args: 980 allow_user_updates: If :python:`True`, the user can update the controls manually. 981 Otherwise only the normal hotkeys do work. 982 983 Executes Phases: 984 - :py:class:`Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN <classes.constants.Phase>` 985 - :py:class:`Phase.APPLY_MANUAL_CONTROLS | Phase.END <classes.constants.Phase>` 986 """ 987 planned_control = control or self.get_control() 988 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN, prior_results=planned_control) 989 990 # Controls can be updated inplace by the user. 991 if self._world_model.controller.parse_events(planned_control if allow_user_updates else carla.VehicleControl()): # pyright: ignore[reportOptionalMemberAccess] 992 print("Exiting by user input.") 993 raise UserInterruption("Exiting by user input.") 994 995 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.END, prior_results=None)
996
[docs] 997 def apply_control(self, control: Optional[carla.VehicleControl] = None): 998 """ 999 Applies the control to the agent's actor. 1000 Will execute the :py:class:`Phase.EXECUTION | Phase.BEGIN <classes.constants.Phase>` 1001 and :py:class:`Phase.EXECUTION | Phase.END <classes.constants.Phase>` phases. 1002 1003 Note: 1004 The final control object that is applied to the agent's actor 1005 is stored in the :py:attr:`ctx.control <ctx>` attribute. 1006 1007 Raises ValueError: 1008 If the control object is not set, i.e. :py:meth:`get_control` returns :python:`None`. 1009 """ 1010 if control is None: 1011 control = self.get_control() 1012 if control is None: 1013 raise ValueError( 1014 "The agent has not yet performed a step this tick and has no control object was passed." 1015 ) 1016 if self.current_phase != Phase.EXECUTION | Phase.BEGIN: 1017 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control, update_controls=control) 1018 else: 1019 logger.debug("Agent is already in execution phase.") 1020 # Set automatic control-related vehicle lights 1021 final_control: carla.VehicleControl = self.get_control() # type: ignore[assignment] 1022 self._update_lights(final_control) 1023 self._vehicle.apply_control(final_control) 1024 self.execute_phase(Phase.EXECUTION | Phase.END, prior_results=final_control)
1025 1026 # ------------------ Hazard Detection & Reaction ------------------ # 1027 1028 from agents.substep_managers import detect_traffic_light # -> TrafficLightDetectionResult 1029 1030 traffic_light_manager = detect_traffic_light # pyright: ignore[reportAssignmentType] 1031 """Alias of :py:meth:`detect_traffic_light`""" 1032
[docs] 1033 def detect_hazard(self) -> Set[Hazard]: 1034 """ 1035 Checks for red traffic lights and pedestrians in the agents path. 1036 1037 If :py:attr:`.LunaticAgentSettings.obstacles.detect_yellow_tlights` is set to :python:`True`, 1038 then yellow traffic lights will also be regarded as a hazard that can trigger an 1039 :py:exc:`EmergencyStopException` in :py:meth:`react_to_hazard` that is executed after this 1040 function. 1041 """ 1042 # Red lights and stops behavior 1043 1044 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN, prior_results=None) 1045 tlight_detection_result: TrafficLightDetectionResult = self.detect_traffic_light() 1046 if tlight_detection_result.traffic_light_was_found: 1047 if tlight_detection_result.traffic_light.state == carla.TrafficLightState.Red: # pyright: ignore[reportOptionalMemberAccess] 1048 self.add_hazard(Hazard.TRAFFIC_LIGHT_RED) 1049 else: # NOTE: self.config.obstacles.detect_yellow_tlights must be True 1050 self.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, HazardSeverity.WARNING) 1051 1052 # assert self.live_info.next_traffic_light.id == tlight_detection_result.traffic_light.id, "Next assumed traffic light should be the same as the detected one." # TEMP 1053 1054 # NOTE next tlight is the next bounding box and might not be the next "correct" one 1055 # self.live_info.next_traffic_light.id != tlight_detection_result.traffic_light.id: 1056 1057 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.END, prior_results=tlight_detection_result) 1058 1059 # Pedestrian avoidance behaviors 1060 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.BEGIN, prior_results=None) 1061 is_dangerous, detection_result = self.pedestrian_avoidance_behavior() 1062 if detection_result.obstacle_was_found: 1063 if is_dangerous: 1064 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.EMERGENCY) 1065 else: 1066 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.WARNING) 1067 # NOTE: its a flag, could pack all in one bin 1068 # Pro: easier to check 1069 # Con: when to remove other states like warning 1070 # Make it a dict with the state as key and the detection result as value! 1071 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.END, prior_results=(is_dangerous, detection_result)) 1072 1073 return self.detected_hazards
1074
[docs] 1075 def react_to_hazard(self, hazard_detected: Union[Hazard, Iterable[Hazard], None]) -> Optional[NoReturn]: 1076 """ 1077 Called when a hazard was detected- 1078 1079 Will store the detected hazards in the Context: `ctx.prior_result` 1080 If no rule clears this variable, the agent will throw a EmergencyStopException 1081 1082 Raises: 1083 EmergencyStopException: If a hazard was detected and no rule cleared it. 1084 """ 1085 1086 # update state? prevent flodding of log information 1087 # logger.info("Hazard(s) detected: %s", self.detected_hazards) 1088 if hazard_detected: 1089 if not isinstance(hazard_detected, Hazard): 1090 self.detected_hazards.update(hazard_detected) 1091 else: 1092 self.detected_hazards.add(hazard_detected) 1093 1094 if self.detected_hazards: 1095 self.execute_phase(Phase.EMERGENCY | Phase.BEGIN, prior_results=hazard_detected) 1096 else: 1097 logger.info("react_to_hazard was called without any detected hazards.") 1098 if self.detected_hazards: 1099 raise EmergencyStopException(self.detected_hazards) 1100 logger.info("Hazards have been cleared.")
1101 1102 # ------------------ Behaviors ------------------ # 1103 # TODO: Section needs overhaul -> turn into rules 1104
[docs] 1105 def pedestrian_avoidance_behavior(self) -> tuple[bool, ObstacleDetectionResult]: 1106 """ 1107 Detects pedestrians in the agents path. 1108 1109 Returns: 1110 A tuple containing a boolean indicating if the detected pedestrian is dangerous 1111 and the detection result. 1112 """ 1113 # note ego_vehicle_wp is the current waypoint self._current_waypoint 1114 detection_result = self.detect_obstacles_in_path(self.walkers_nearby) 1115 if detection_result.obstacle_was_found and ( 1116 detection_result.distance 1117 - max( 1118 detection_result.obstacle.bounding_box.extent.y, # pyright: ignore[reportOptionalMemberAccess] 1119 detection_result.obstacle.bounding_box.extent.x, 1120 ) # pyright: ignore[reportOptionalMemberAccess] 1121 - max(self._vehicle.bounding_box.extent.y, self._vehicle.bounding_box.extent.x) 1122 < self.config.distance.emergency_braking_distance 1123 ): 1124 # print("Detected walker", detection_result.obstacle) 1125 # NOTE: should slow down here 1126 return True, detection_result 1127 # NOTE detected but not stopping -> ADD avoidance behavior 1128 if detection_result.obstacle_was_found: 1129 logger.debug("Detected a pedestrian but determined no intervention necessary (too far away).") 1130 return False, detection_result
1131
[docs] 1132 def car_following_behavior( 1133 self, 1134 vehicle_detected: bool, # noqa: ARG002,RUF100 # pylint: disable=unused-argument 1135 vehicle: carla.Actor, 1136 distance: float, 1137 ) -> carla.VehicleControl: 1138 """ 1139 Parameters: 1140 Must match :py:class:`.ObstacleDetectionResult` 1141 1142 Assumes: 1143 - That an obstacle was detected: 1144 :py:attr:`vehicle_detected<.ObstacleDetectionResult.obstacle_was_found> is True and 1145 :py:attr:`vehicle<.ObstacleDetectionResult.obstacle>` is the detected vehicle. 1146 """ 1147 exact_distance = ( 1148 distance 1149 - max(vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x) 1150 - max(self._vehicle.bounding_box.extent.y, self._vehicle.bounding_box.extent.x) 1151 ) 1152 1153 if exact_distance < self.config.distance.emergency_braking_distance: 1154 # Note: If the passed set is not cleared by a Phase.EMERGENCY | Phase.BEGIN rule, 1155 # an EmergencyStopException is raised. 1156 self.add_hazard(Hazard.CAR) 1157 self.react_to_hazard(hazard_detected={Hazard.CAR}) # Optional[NoReturn] 1158 controls = self.car_following_manager(vehicle, exact_distance) 1159 return controls
1160 1161 # ------------------ External Helpers ------------------ # 1162 1163 # Moved outside of the class for organization 1164 from agents.substep_managers import ( 1165 car_following_manager, # pyright: ignore[reportAssignmentType] 1166 emergency_manager, 1167 ) 1168 1169 # Subfunction of traffic_light_manager. In traffic_light_manager the parameters are chosen automatically 1170 # which is why traffic_light_manager should be used instead. 1171 # Kept for backwards compatibility and possible future use. 1172 from agents.substep_managers.traffic_light import affected_by_traffic_light 1173 from agents.tools.lunatic_agent_tools import detect_obstacles_in_path 1174 1175 # ---- 1176 1177 # @override
[docs] 1178 def add_emergency_stop( 1179 self, control: carla.VehicleControl, reasons: "Optional[set[str]]" = None 1180 ) -> carla.VehicleControl: 1181 """ 1182 Modifies the control values to perform an emergency stop. 1183 The steering remains unchanged to avoid going out of the lane during turns. 1184 1185 :param control: (carla.VehicleControl) control to be modified 1186 :param enable_random_steer: (bool, optional) Flag to enable random steering 1187 """ 1188 return self.emergency_manager(reasons=reasons, control=control) # type: ignore[arg-type]
1189
[docs] 1190 def lane_change( 1191 self, 1192 direction: Literal["left", "right"], 1193 same_lane_time: float = 0, 1194 other_lane_time: float = 0, 1195 lane_change_time: float = 2, 1196 *, 1197 check: bool = False, 1198 ): 1199 """ 1200 Changes the path so that the vehicle performs a lane change. 1201 Use 'direction' to specify either a 'left' or 'right' lane change, 1202 and the time parameters to fine tune the maneuver. 1203 1204 Steps for the lane change: 1205 1. **same_lane_time** seconds in the same lane. 1206 2. **lane_change_time** seconds to reach the other lane. 1207 3. **other_lane_time** seconds to stay in the other lane. 1208 1209 Parameters: 1210 waypoint: The starting waypoint. 1211 direction: The direction of the lane change, either 'left' or 'right'. 1212 Defaults to 'left'. 1213 same_lane_time: The time to follow the same lane before the lane change. 1214 other_lane_time: The time to follow the other lane after the lane change. 1215 lane_change_time: The time to reach the center of the last lane. 1216 A low value will make a fast lane change, while a high value will make slow lane change. 1217 check: If :python:`True`, the function will check if the lane change is possible, i.e. 1218 if there is a lane of :py:class:`carla.LaneType.Driving <carla.LaneType>` in 1219 the desired direction. Otherwise it can change to other lane types as well. 1220 Defaults to :code:`False`. 1221 1222 See Also: 1223 - :py:func:`agents.tools.lunatic_agent_tools.generate_lane_change_path` 1224 - :py:meth:`set_global_plan` 1225 """ 1226 speed = self.live_info.current_speed / 3.6 # m/s 1227 # This is a staticfunction from BasicAgent function 1228 path: list[tuple[carla.Waypoint, RoadOption]] = self._generate_lane_change_path( 1229 self._current_waypoint, # NOTE: Assuming exact_waypoint 1230 direction, 1231 same_lane_time * speed, # get direction in meters t*V 1232 other_lane_time * speed, 1233 lane_change_time * speed, 1234 check=check, 1235 lane_changes=1, # changes only one lane 1236 step_distance=self.config.planner.sampling_resolution, 1237 ) 1238 if not path: 1239 logger.info("Ignoring the lane change as no path was found") 1240 1241 # Change path to take now 1242 # NOTE: use super with arguments here. 1243 super(LunaticAgent, self).set_global_plan(path) # noqa: UP008
1244 # TODO: # CRITICAL: Keep old global plan if it is some end goal -> Restore it. 1245 1246 # TODO: Use generate_lane_change_path to finetune
[docs] 1247 def make_lane_change( 1248 self, 1249 order: Iterable[Literal["left", "right"]] = ["left", "right"], 1250 up_angle_th: int = 180, 1251 low_angle_th: int = 0, 1252 ) -> "Literal[True] | None": 1253 """ 1254 Move to the left/right lane if possible 1255 1256 Args: 1257 order : The order in 1258 which the agent should try to change lanes. If a single string is given, the agent 1259 will try to change to that lane. Default is :python:`["left", "right"]`. 1260 up_angle_th : The angle threshold for the upper limit of obstacle detection in the other lane. 1261 Default is 180 degrees, meaning that the agent will detect obstacles ahead. 1262 low_angle_th : The angle threshold for the lower limit of obstacle detection in the other lane. 1263 Default is 0 degrees, meaning that the agent will detect obstacles behind. 1264 1265 Assumes: 1266 - :python:`(self.config.live_info.incoming_direction == RoadOption.LANEFOLLOW \ 1267 and not waypoint.is_junction and self.config.live_info.current_speed > 10)` 1268 - :python:`check_behind.obstacle_was_found and self.config.live_info.current_speed < get_speed(check_behind.obstacle)` 1269 """ 1270 vehicle_list = self.vehicles_nearby 1271 waypoint = self._current_waypoint # todo use a getter 1272 1273 # There is a faster car behind us 1274 if isinstance(order, str): 1275 order = [order] # type: ignore 1276 1277 for direction in order: 1278 if direction == "right": 1279 right_turn = waypoint.right_lane_marking.lane_change 1280 can_change = right_turn in {carla.LaneChange.Right, carla.LaneChange.Both} 1281 other_wpt = waypoint.get_right_lane() 1282 lane_offset = 1 1283 elif direction == "left": 1284 left_turn = waypoint.left_lane_marking.lane_change 1285 can_change = left_turn in {carla.LaneChange.Left, carla.LaneChange.Both} 1286 other_wpt = waypoint.get_left_lane() 1287 lane_offset = -1 1288 else: 1289 msg = f"Direction must be 'left' or 'right', was {direction}" 1290 raise ValueError(msg) 1291 if ( 1292 can_change # other_wpt is not None 1293 and lanes_have_same_direction(waypoint, other_wpt) # type: ignore[arg-type] 1294 and other_wpt.lane_type == carla.LaneType.Driving 1295 ): # type: ignore[attr-defined] 1296 # Detect if right lane is free 1297 detection_result = detect_vehicles( 1298 self, 1299 vehicle_list, 1300 self.max_detection_distance("other_lane"), 1301 up_angle_th=up_angle_th, 1302 low_angle_th=low_angle_th, 1303 lane_offset=lane_offset, 1304 ) 1305 if not detection_result.obstacle_was_found: 1306 logger.debug( 1307 "Change Lane, moving to the %s! Reason: %s", 1308 direction, 1309 "Overtaking" if tuple(order) == ("left", "right") else "Tailgating", 1310 ) 1311 1312 end_waypoint = self._local_planner.target_waypoint 1313 # TODO: How to set waypoint order? Or better use generate_lane_change_path! 1314 self.set_destination( 1315 end_location=other_wpt.transform.location, # type: ignore[arg-type] 1316 start_location=end_waypoint.transform.location, 1317 clean_queue=True, 1318 ) 1319 return True 1320 return None
1321 1322 # ------------------ Other Function ------------------ # 1323 1324 def _update_lights(self, vehicle_control: carla.VehicleControl): 1325 """Updates the light of the vehicle in the simulation.""" 1326 current_lights: carla.VehicleLightState = self._vehicle_lights 1327 if vehicle_control.brake: 1328 current_lights |= carla.VehicleLightState.Brake 1329 else: # Remove the Brake flag 1330 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Brake 1331 if vehicle_control.reverse: 1332 current_lights |= carla.VehicleLightState.Reverse 1333 else: # Remove the Reverse flag 1334 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Reverse 1335 if current_lights != self._vehicle_lights: # Change the light state only if necessary 1336 self._vehicle_lights = current_lights 1337 self._vehicle.set_light_state(carla.VehicleLightState(self._vehicle_lights)) 1338 1339 def render_detection_matrix(self, display: "pygame.Surface", **options: Unpack["DetectionMatrix.RenderOptions"]): 1340 """ 1341 See Also: 1342 - :py:meth:`DetectionMatrix.render` 1343 1344 Attention: 1345 **options** must match the arguments of :py:meth:`DetectionMatrix.render`. 1346 1347 :meta private: 1348 """ 1349 if self._detection_matrix: 1350 # options should align with CameraConfig.DetectionMatrixHudConfig 1351 self._detection_matrix.render(display, **options) 1352
[docs] 1353 def verify_settings( 1354 self, 1355 config: Optional[LunaticAgentSettings] = None, 1356 *, 1357 verify_dataclass: Union["type[AgentConfig]", bool] = True, 1358 strictness: Literal[-1, 0, 1, 2, 3, 4] = 0, 1359 ): 1360 """ 1361 Verifies the settings of the LunaticAgent. 1362 Foremost this checks if the :py:obj:`.planner.dt` value has been set 1363 to the speed of the world ticks in synchronous mode. 1364 Secondly if :python:`verify_dataclass=True` or a different AgentConfig class is provided, 1365 it will check for correct type usage. 1366 1367 Args: 1368 config: The configuration to verify. 1369 If not provided, the agent's default configuration will be used. 1370 Defaults to :code:`None`. 1371 verify_dataclass: 1372 Determines the dataclass to use for verification. 1373 If :python:`True`, the :py:attr:`.BASE_SETTINGS` dataclass will be used. 1374 See :py:meth:`AgentConfig.check_config` for more details. 1375 Defaults to True. 1376 strictness: 1377 The strictness level for :py:meth:`AgentConfig.check_config`. 1378 Defaults to :python:`3`. 1379 1380 Raises: 1381 TypeError: If **verify_dataclass** is not a valid AgentConfig subclass or True. 1382 MissingMandatoryValue: If :python:`config.planner.dt` is not present 1383 or not a :python:`float` 1384 """ 1385 config = config or self.config 1386 1387 if self._world_model.world_settings.synchronous_mode: 1388 # Assure that dt is set 1389 if isinstance(config, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance] 1390 OmegaConf.select(config, "planner.dt", throw_on_missing=True) 1391 elif not isinstance(config.planner.dt, float): 1392 from omegaconf import MissingMandatoryValue 1393 1394 raise MissingMandatoryValue( 1395 "`config.planner.dt` needs to be set to a value. Cannot be:", 1396 type(config.planner.dt), 1397 config.planner.dt, 1398 ) 1399 1400 if strictness < 0: 1401 return 1402 1403 # NOTE: Below is experimental and might fail as the config at this point has already been setup 1404 1405 from agents.tools import config_creation 1406 1407 old_value = config_creation._WARN_LIVE_INFO # pyright: ignore[reportPrivateUsage] 1408 config_creation._WARN_LIVE_INFO = False # pyright: ignore[reportPrivateUsage] 1409 1410 if verify_dataclass: 1411 if verify_dataclass is True: 1412 dataclass = self.BASE_SETTINGS 1413 elif issubclass(verify_dataclass, AgentConfig): # pyright: ignore[reportUnnecessaryIsInstance] 1414 dataclass = verify_dataclass 1415 else: 1416 raise TypeError("`verify_dataclass` must be a `AgentConfig` or `True` to select the BASE_SETTINGS ") 1417 dataclass.check_config(config, dataclass.get("strict_config", strictness), as_dict_config=True) 1418 1419 config_creation._WARN_LIVE_INFO = old_value # pyright: ignore[reportPrivateUsage]
1420 1421 # ------------------ Getter Function ------------------ # 1422
[docs] 1423 def get_control(self): 1424 """ 1425 Returns the currently planned control of the agent. 1426 1427 If retrieved before the local planner has been run, it will return None. 1428 """ 1429 return self.ctx.control
1430 1431 # ------------------ Setter Function ------------------ # 1432
[docs] 1433 def set_control(self, control: carla.VehicleControl): 1434 """ 1435 Set new controls for the agent. Must be called before apply_control. 1436 1437 Raises: 1438 ValueError: If the control is None. 1439 """ 1440 self.ctx.control = control
1441 1442 def _init_detection_matrix(self) -> None: 1443 if self.config.detection_matrix and self.config.detection_matrix.enabled: 1444 if self.config.detection_matrix.sync and self._world_model.world_settings.synchronous_mode: 1445 self._detection_matrix = DetectionMatrix(self._vehicle) 1446 else: 1447 self._detection_matrix = AsyncDetectionMatrix(self._vehicle) 1448 self._detection_matrix.start() 1449 else: 1450 self._detection_matrix = None 1451 self._road_matrix_counter = 0 1452 1453 if READTHEDOCS or TYPE_CHECKING: 1454 # From the parent class, but without type-hints
[docs] 1455 def set_destination(
1456 self, 1457 end_location: carla.Location, 1458 start_location: Optional[carla.Location] = None, 1459 clean_queue: bool = True, 1460 ) -> None: ... 1461 1462 def set_vehicle(self, vehicle: carla.Actor) -> None: 1463 """ 1464 Set the vehicle for the agent (experimental if applied a second time) 1465 1466 :meta private: 1467 """ 1468 self._vehicle = assure_type("carla.Vehicle", vehicle) 1469 # do not register same id twice 1470 if all(actor.id != vehicle.id for actor in CarlaDataProvider._actor_velocity_map): # pyright: ignore[reportPrivateUsage] 1471 with contextlib.suppress(KeyError): 1472 CarlaDataProvider.register_actor(vehicle, vehicle.get_transform()) # pyright: ignore[reportUnknownMemberType] 1473 else: 1474 # Exchange the key for a faster lookup 1475 for key in CarlaDataProvider._actor_velocity_map: # pyright: ignore[reportPrivateUsage] 1476 if key.id == vehicle.id: 1477 break 1478 else: 1479 # This does not happen because of the first if condition 1480 raise ValueError("The vehicle was not registered in the actor map.") 1481 CarlaDataProvider._actor_velocity_map[vehicle] = CarlaDataProvider._actor_velocity_map.pop(key) # pyright: ignore[reportPrivateUsage] 1482 CarlaDataProvider._actor_transform_map[vehicle] = CarlaDataProvider._actor_transform_map.pop(key) # pyright: ignore[reportPrivateUsage] 1483 CarlaDataProvider._actor_location_map[vehicle] = CarlaDataProvider._actor_location_map.pop(key) # pyright: ignore[reportPrivateUsage] 1484 1485 self._init_detection_matrix() 1486 self._local_planner = DynamicLocalPlannerWithRss( 1487 self, 1488 map_inst=CarlaDataProvider.get_map(), 1489 world=CarlaDataProvider.get_world(), 1490 rss_sensor=self._world_model.rss_sensor, 1491 ) 1492 1493 # @override
[docs] 1494 def set_target_speed(self, speed: float) -> None: 1495 """ 1496 Changes the target speed of the agent. 1497 :param speed (float): target speed in Km/h 1498 """ 1499 if self.config.speed.follow_speed_limits: 1500 print( 1501 "WARNING: The max speed is currently set to follow the speed limits. " 1502 "Use 'follow_speed_limits' to deactivate this" 1503 ) 1504 self.config.speed.target_speed = speed # shared with planner
1505
[docs] 1506 def follow_speed_limits(self, value: bool = True) -> None: 1507 """ 1508 If active, the agent will dynamically change the target speed according to the speed limits. 1509 1510 Arguments: 1511 value: Whether to activate this behavior 1512 """ 1513 self.config.speed.follow_speed_limits = value
1514
[docs] 1515 def ignore_traffic_lights(self, active: bool = True) -> None: 1516 """(De)activates the checks for traffic lights""" 1517 self.config.obstacles.ignore_traffic_lights = active
1518
[docs] 1519 def ignore_stop_signs(self, active: bool = True) -> None: 1520 """(De)activates the checks for stop signs""" 1521 self.config.obstacles.ignore_stop_signs = active
1522
[docs] 1523 def ignore_vehicles(self, active: bool = True) -> None: 1524 """(De)activates the checks for stop signs""" 1525 self.config.obstacles.ignore_vehicles = active
1526
[docs] 1527 def rss_set_road_boundaries_mode( 1528 self, road_boundaries_mode: Optional[Union[bool, RssRoadBoundariesModeAlias]] = None 1529 ) -> None: 1530 if road_boundaries_mode is None: 1531 road_boundaries_mode = self.config.rss.use_stay_on_road_feature 1532 self._world_model.rss_set_road_boundaries_mode(road_boundaries_mode)
1533 1534 # ------------------ Overwritten & Outsourced functions ------------------ # 1535 1536 from agents.tools.lunatic_agent_tools import detect_vehicles, max_detection_distance 1537 1538 # signature of parent is str and not Literal["left", "right"] 1539 from agents.tools.lunatic_agent_tools import ( 1540 generate_lane_change_path as _generate_lane_change_path, # pyright: ignore[reportAssignmentType] 1541 ) 1542 1543 # NOTE: the original pedestrian_avoid_manager is still usable 1544 def pedestrian_avoid_manager(self, waypoint) -> NoReturn: # type: ignore 1545 """ 1546 This function was replaced by ", substep_managers.pedestrian_detection_manager 1547 1548 :meta private: 1549 """ 1550 raise NotImplementedError("This function was replaced by ", substep_managers.pedestrian_detection_manager) 1551 1552 # @override 1553 def collision_and_car_avoid_manager(self, waypoint) -> NoReturn: # type: ignore 1554 """ 1555 This function was split into detect_obstacles_in_path and car_following_manager 1556 1557 :meta private: 1558 """ 1559 raise NotImplementedError("This function was split into detect_obstacles_in_path and car_following_manager") 1560 1561 # @override 1562 def _tailgating(self, waypoint, vehicle_list) -> NoReturn: # type: ignore 1563 raise NotImplementedError("Tailgating has been implemented as a rule") 1564 1565 # @override 1566 def emergency_stop(self) -> NoReturn: 1567 """ 1568 :meta private: 1569 """ 1570 raise NotImplementedError("This function was overwritten use `add_emergency_stop` instead") 1571 1572 # ------------------------------------ # 1573 # As reference Parent Functions 1574 # ------------------------------------ # 1575 # def get_local_planner(self): 1576 # def get_global_planner(self): 1577 1578 # def done(self): # from base class self._local_planner.done() 1579 1580 # def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True): 1581 """ 1582 Adds a specific plan to the agent. 1583 1584 :param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed 1585 :param stop_waypoint_creation: stops the automatic random creation of waypoints 1586 :param clean_queue: resets the current agent's plan 1587 """ 1588 1589 # def trace_route(self, start_waypoint, end_waypoint): 1590 """ 1591 Calculates the shortest route between a starting and ending waypoint. 1592 1593 :param start_waypoint (carla.Waypoint): initial waypoint 1594 :param end_waypoint (carla.Waypoint): final waypoint 1595 """ 1596 1597 # ---------------- Cleanup ------------------- # 1598 1599 def _destroy_sensor(self): 1600 if self._detection_matrix: 1601 self._detection_matrix.stop() 1602 self._detection_matrix = None 1603 if self._collision_sensor: 1604 self._collision_sensor.destroy() 1605 self._collision_sensor = None # type: ignore[assignment] 1606
[docs] 1607 def destroy(self): 1608 """Resets attributes and destroys helpers like the :py:class:`.DetectionMatrix`.""" 1609 self._destroy_sensor() 1610 self._world_model = None # type: ignore[assignment] 1611 self._world = None # type: ignore[assignment] 1612 if self.ctx: 1613 self.ctx.agent = None # type: ignore[assignment] 1614 self.ctx = None # type: ignore[assignment] 1615 try: 1616 self.all_vehicles.clear() 1617 self.vehicles_nearby.clear() 1618 self.all_walkers.clear() 1619 self.walkers_nearby.clear() 1620 except AttributeError: 1621 pass
1622 1623 def __del__(self): 1624 with contextlib.suppress(Exception): 1625 self.destroy()