Source code for agents.lunatic_agent

   1"""
   2This module implements an agent that roams around a track following random
   3waypoints and avoiding other vehicles. The agent also responds to traffic lights,
   4traffic signs, and has different possible configurations.
   5"""
   6
   7# Decide how to handle imports
   8# ruff: noqa: PLC0415
   9
  10from __future__ import annotations
  11
  12import contextlib
  13import sys
  14from copy import deepcopy
  15from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, List, NoReturn, Optional, Sequence, Set, Union
  16from typing import cast as assure_type
  17
  18import carla  # pyright: ignore[reportMissingTypeStubs]
  19import omegaconf
  20from omegaconf import DictConfig, OmegaConf
  21from typing_extensions import Literal, Self, Unpack
  22
  23from agents import substep_managers
  24from agents.dynamic_planning.dynamic_local_planner import DynamicLocalPlannerWithRss
  25from agents.navigation.behavior_agent import BehaviorAgent
  26from agents.navigation.global_route_planner import GlobalRoutePlanner
  27from agents.rules import rule_from_config
  28from agents.tools.config_creation import (
  29    AgentConfig,
  30    LaunchConfig,
  31    LiveInfo,
  32    LunaticAgentSettings,
  33    RssRoadBoundariesModeAlias,
  34    RuleCreatingParameters,
  35)
  36from agents.tools.logs import logger
  37from agents.tools.lunatic_agent_tools import (
  38    detect_vehicles,
  39    must_clear_hazard,
  40    phase_callback,  # type: ignore[unused-import] # noqa: F401
  41    result_to_context,
  42)
  43from agents.tools.misc import lanes_have_same_direction
  44from classes.constants import AD_RSS_AVAILABLE, READTHEDOCS, AgentState, Hazard, HazardSeverity, Phase, RoadOption
  45from classes.exceptions import (
  46    AgentDoneException,
  47    ContinueLoopException,
  48    EmergencyStopException,
  49    LunaticAgentException,
  50    NoFurtherRulesException,
  51    SkipInnerLoopException,
  52    UpdatedPathException,
  53    UserInterruption,
  54)
  55from classes.rule import BlockingRule, Context, Rule
  56from classes.worldmodel import CarlaDataProvider, WorldModel
  57from classes.detection_matrix import AsyncDetectionMatrix, DetectionMatrix
  58from classes.information_manager import InformationManager
  59
  60if TYPE_CHECKING:
  61    from agents.tools.hints import ObstacleDetectionResult, TrafficLightDetectionResult
  62    import pygame
  63
  64
[docs] 65class LunaticAgent(BehaviorAgent): 66 """ 67 BasicAgent implements an agent that navigates the scene. 68 This agent respects traffic lights and other vehicles, but ignores stop signs. 69 It has several functions available to specify the route that the agent must follow, 70 as well as to change its parameters in case a different driving mode is desired. 71 """ 72 73 BASE_SETTINGS: "type[LunaticAgentSettings]" = LunaticAgentSettings 74 """ 75 Base AgentConfig class for this agent. This is used to create the default settings for the agent 76 if none are provided. 77 """ 78 79 DEFAULT_RULES: ClassVar[Dict[Phase, List[Rule]]] = {k: [] for k in Phase.get_phases()} 80 """ 81 Default rules of this agent class when initialized. 82 83 :meta hide-value: 84 """ 85 86 rules: Dict[Phase, List[Rule]] 87 """ 88 The rules of the this agent. 89 When initialized the rules are deep copied from :py:attr:`DEFAULT_RULES`. 90 """ 91 92 ctx: Context 93 """The context object of the current step""" 94 95 # Information from the InformationManager 96 walkers_nearby: List[carla.Walker] 97 vehicles_nearby: List[carla.Vehicle] 98 static_obstacles_nearby: List[carla.Actor] 99 """Static obstacles detected by the :py:class:`.InformationManager`""" 100 101 obstacles_nearby: List[carla.Actor] 102 """ 103 Combination of :py:attr:`vehicles_nearby`, :py:attr:`walkers_nearby` 104 and :py:attr:`static_obstacles_nearby`. 105 """ 106 107 traffic_lights_nearby: List[carla.TrafficLight] 108 traffic_signs_nearby: List[carla.TrafficSign] = NotImplemented 109 """ 110 Not yet implemented 111 112 :meta private: 113 """ 114 115 current_states: Dict[AgentState, int] 116 """The current states of the agent. The count of the steps being each state is stored as value.""" 117 118 # 119 120 _world_model: WorldModel 121 """Reference to the attached :py:class:`WorldModel`.""" 122 123 _validate_phases = False 124 """ 125 Debugging flag to sanity check if the agent passes trough the phases in the correct order. 126 Attention: Currently straight forward anymore. 127 """ 128 129 _active_blocking_rules: Set[BlockingRule] 130 """Blocking rules that are currently active and have taken over the agents loop.""" 131 132 # ------------------ Initialization ------------------ # 133 134 from agents.tools.lunatic_agent_tools import create_agent_config as _create_agent_config 135
[docs] 136 @classmethod 137 def create_world_and_agent(cls, args: LaunchConfig, *, 138 vehicle: Optional[carla.Vehicle] = None, 139 sim_world: carla.World, 140 settings_archetype: "Optional[type[AgentConfig]]" = None, 141 agent_config: Optional["LunaticAgentSettings"] = None, 142 overwrites: Optional[Dict[str, Any]] = None 143 ) -> tuple[Self, WorldModel, GlobalRoutePlanner]: 144 """ 145 Setup function to create the agent from the :py:class:`LaunchConfig` settings. 146 147 Note: 148 - :py:meth:`.GameFramework.init_agent_and_interface` is the preferred way to create to 149 instantiate the agent, only use this method if you try not do create a 150 :py:class:`GameFramework` object. 151 """ 152 if overwrites is None: 153 overwrites = {} 154 155 if agent_config is None: 156 if hasattr(args, "agent"): 157 if settings_archetype is not None: 158 logger.warning("settings_archetype was passed but using args.agent. " 159 "Ignoring settings_archetype.") 160 agent_config = args.agent 161 elif settings_archetype is not None and isinstance(settings_archetype, object): 162 logger.warning("settings_archetype is an instance. " 163 "To pass an instance use agent_config instead.") 164 agent_config = settings_archetype # type: ignore 165 elif settings_archetype is not None: 166 logger.debug("Creating config from settings_archetype") 167 behavior = settings_archetype(overwrites) 168 agent_config = cls.BASE_SETTINGS.cast(behavior.to_dict_config()) 169 else: 170 logger.debug("Using %s._base_settings %s to create config.", 171 cls.__name__, cls.BASE_SETTINGS) 172 agent_config = cls.BASE_SETTINGS.cast(cls.BASE_SETTINGS.to_dict_config()) 173 assert agent_config is not None 174 else: 175 logger.debug("A config was passed, using it as is.") 176 177 world_model = WorldModel(agent_config, args=args, carla_world=sim_world, player=vehicle) # TEST: without args 178 agent_config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage] 179 180 agent = cls(agent_config, world_model) 181 return agent, world_model, agent.get_global_planner()
182 183 def __init__(self, 184 settings: Union[str, LunaticAgentSettings], 185 world_model: Optional[WorldModel] = None, 186 *, 187 vehicle: Optional[carla.Vehicle] = None, 188 overwrite_options: Optional[dict[str, Any]] = None, 189 debug: bool = True): 190 """ 191 Initialize the LunaticAgent. 192 193 Args: 194 settings : 195 The settings of the agent to construct the :py:attr:`LunaticAgent.config`. 196 It can be either a string pointing to a yaml file or a LunaticAgentSettings. 197 world_model : The world model to use. If None, a new WorldModel will be created. 198 vehicle : The vehicle controlled by the agent. Can be None then the :code:`world_model.player` will be used. 199 overwrite_options : Additional options to overwrite the default agent configuration. 200 debug : Whether to enable debug mode. 201 """ 202 self._debug = debug 203 """Enables additional debug output.""" 204 if world_model is None and len(sys.argv) > 1: 205 logger.error("BUG: Beware when not passing a WorldModel, the WorldModel currently ignores command line overrides, " 206 "i.e. will only use the config file.\n> Use `%s.create_world_and_agent` and provide the LaunchConfig instead.", self.__class__.__name__) 207 208 # -------------------- Settings ------------------------ 209 # Depending on the input type, the settings are created differently. 210 211 self.config = self._create_agent_config(settings, world_model, overwrite_options) 212 213 # ------------------------------- 214 215 logger.info("\n\nAgent config is %s", OmegaConf.to_yaml(self.config)) 216 217 # World Model 218 if world_model is None: 219 world_model = WorldModel(self.config, player=vehicle) 220 self.config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage] 221 222 self._world_model: WorldModel = world_model 223 self._world: carla.World = world_model.world 224 225 # Register Vehicle 226 assert world_model.player is not None 227 self.set_vehicle(world_model.player) 228 229 self.current_phase: Phase = Phase.NONE 230 """current phase of the agent inside the loop""" 231 232 self.ctx = None # type: ignore 233 234 self._last_traffic_light: Optional[carla.TrafficLight] = None 235 """Current red traffic light""" 236 237 # TODO: No more hardcoded defaults / set them from opt_dict which must have all parameters; check which are parameters and which are set by other functions (e.g. _look_ahead_steps) 238 239 # Parameters from BehaviorAgent ------------------------------------------ 240 # todo: check redefinitions 241 242 self._look_ahead_steps: int = 0 243 """ 244 updated in _update_information used for local_planner.get_incoming_waypoint_and_direction 245 """ 246 247 # Initialize the planners 248 self._global_planner = CarlaDataProvider.get_global_route_planner() # NOTE: THIS does not use self.config.planner.sampling_resolution 249 if not self._global_planner: 250 logger.error("Global Route Planner not set - This should not happen, if the CarlaDataProvider has been initialized.") 251 self._global_planner = GlobalRoutePlanner(CarlaDataProvider.get_map(), self.config.planner.sampling_resolution) 252 CarlaDataProvider._grp = self._global_planner # pyright: ignore[reportPrivateUsage] 253 254 # Get the static elements of the scene 255 self._traffic_light_map: Dict[carla.TrafficLight, carla.Transform] = CarlaDataProvider._traffic_light_map # pyright: ignore[reportPrivateUsage] 256 self._lights_list = CarlaDataProvider._traffic_light_map.keys() # pyright: ignore[reportPrivateUsage] 257 self._lights_map: Dict[int, carla.Waypoint] = {} 258 """Dictionary mapping a traffic light to a wp corresponding to its trigger volume location""" 259 260 # Vehicle Lights 261 self._vehicle_lights = carla.VehicleLightState.NONE 262 263 # Collision Sensor # NOTE: another in the WorldModel for the HUD 264 self._set_collision_sensor() 265 266 # Rule Framework 267 # 1. add all rules from the class, if any - else this is a dict with empty lists 268 self.rules = deepcopy(self.__class__.DEFAULT_RULES) # Copies the ClassVar to the instance 269 270 # 2. add rules from the config 271 self.add_config_rules() 272 273 # Information Manager 274 self.information_manager = InformationManager(self) 275 self.current_states = self.information_manager.state_counter # share the dict 276 277 # Blocking Rules 278 self._active_blocking_rules = set() 279 280 # --------------------- Collision Callback ------------------------ 281 282 def _set_collision_sensor(self): 283 # see: https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector 284 # and https://carla.readthedocs.io/en/latest/python_api/#carla.Sensor.listen 285 blueprint = CarlaDataProvider._blueprint_library.find('sensor.other.collision') # pyright: ignore[reportPrivateUsage] 286 self._collision_sensor: carla.Sensor = assure_type(carla.Sensor, CarlaDataProvider.get_world().spawn_actor( 287 blueprint, carla.Transform(), attach_to=self._vehicle)) 288 289 self._collision_sensor.listen(self._collision_event) 290 291 from agents.substep_managers import collision_manager as _collision_manager 292 293 def _collision_event(self, event: carla.CollisionEvent): 294 """ 295 Callback function for the collision sensor. 296 297 By default uses :py:func:`agents.substep_managers.collision_manager`. 298 299 Executes Phases: 300 - :py:class:`Phase.COLLISION | Phase.BEGIN<.Phase>` 301 - :py:class:`Phase.COLLISION | Phase.END<.Phase>` 302 """ 303 # https://carla.readthedocs.io/en/latest/python_api/#carla.CollisionEvent 304 # e.g. setting ignore_vehicles to False, if it was True before. 305 # do an emergency stop (in certain situations) 306 try: 307 self.execute_phase(Phase.COLLISION | Phase.BEGIN, prior_results=event) 308 except AttributeError as e: 309 if "'NoneType' object has no attribute 'prior_result'" not in str(e): 310 raise e 311 # context not yet set up, very early collision 312 result = self._collision_manager(event) 313 try: 314 self.execute_phase(Phase.COLLISION | Phase.END, prior_results=result) 315 except AttributeError as e: 316 if "'NoneType' object has no attribute 'prior_result'" not in str(e): 317 raise e 318 # context not yet set up, very early collision 319 320 # --------------------- Adding rules ------------------------ 321
[docs] 322 def add_rule(self, rule: Rule, position: Union[int, None] = None): 323 """ 324 Add a rule to the agent. The rule will be inserted at the given position. 325 326 Args: 327 rule : The rule to add 328 position : 329 The position to insert the rule at. 330 If :python:`None` the rule list will be sorted by priority. 331 Defaults to :python:`None`. 332 """ 333 for p in rule.phases: 334 if p not in self.rules: 335 logger.warning("Phase %s from Rule %s is not a default phase. Adding a new phase.", p, rule) 336 self.rules[p] = [] 337 if position is None: 338 self.rules[p].append(rule) 339 self.rules[p].sort(key=lambda r: r.priority, reverse=True) 340 else: 341 self.rules[p].insert(position, rule)
342
[docs] 343 def add_rules(self, rules: "Rule | Iterable[Rule]"): 344 """Add a list of rules and sort the agents rules by priority.""" 345 if isinstance(rules, Rule): 346 rules = [rules] 347 for rule in rules: 348 for phase in rule.phases: 349 self.rules[phase].append(rule) 350 for phase in self.rules.keys(): # noqa: SIM118 351 self.rules[phase].sort(key=lambda r: r.priority, reverse=True)
352
[docs] 353 def add_config_rules(self, config: Optional[Union[LunaticAgentSettings, List[RuleCreatingParameters]]] = None): 354 """ 355 Adds rules 356 """ 357 if config is None: 358 config = self.config 359 rule_list: List[RuleCreatingParameters] 360 # "rules" in config is also false for rules == MISSING 361 if isinstance(config, (AgentConfig, DictConfig)) and "rules" in config.keys(): # noqa: SIM118, RUF100 362 try: 363 rule_list = config.rules 364 except omegaconf.MissingMandatoryValue: 365 logger.debug("`rules` key was missing, skipping rule addition.") 366 return 367 else: 368 rule_list: List[RuleCreatingParameters] = config # type: ignore[assignment] 369 logger.debug("Adding rules from config:\n%s", OmegaConf.to_yaml(rule_list)) 370 for rule in rule_list: 371 self.add_rules(rule_from_config(rule)) # each call could produce one or more rules
372 373 # --------------------- Properties ------------------------ 374 375 @property 376 def live_info(self) -> LiveInfo: 377 return self.config.live_info 378 379 @property 380 def detection_matrix(self): 381 """ 382 Returns :any:`DetectionMatrix.getMatrix` if the matrix is set. 383 """ 384 if self._detection_matrix: 385 return self._detection_matrix.getMatrix() 386 return None 387 388 # ------------------ Hazard ------------------ # 389 390 @property 391 def detected_hazards(self) -> Set[Hazard]: 392 return self.ctx.detected_hazards 393 394 @property 395 def detected_hazards_info(self) -> Dict[Hazard, Any]: 396 """ 397 Information about the detected hazards, e.g. severity. 398 """ 399 return self.ctx.detected_hazards_info 400 401 @detected_hazards.setter 402 def detected_hazards(self, hazards: Set[Hazard]): 403 if not isinstance(hazards, set): # pyright: ignore[reportUnnecessaryIsInstance] 404 raise TypeError("detected_hazards must be a set of Hazards.") 405 self.ctx._detected_hazards = hazards # pyright: ignore[reportPrivateUsage] 406 407 # These have the same signature and can be used interchangeably 408 discard_hazard = Context.discard_hazard 409 add_hazard = Context.add_hazard 410 has_hazard = Context.has_hazard 411 412 # 413 @property 414 def current_traffic_light(self) -> "carla.TrafficLight | None": 415 """Alias to :py:attr:`self._last_traffic_light <_last_traffic_light>`.""" 416 return self._last_traffic_light 417 418 @current_traffic_light.setter 419 def current_traffic_light(self, traffic_light: carla.TrafficLight): 420 self._last_traffic_light = traffic_light 421 422 @property 423 def phase_results(self) -> Dict[Phase, Any]: 424 """ 425 Retrieves :py:attr:`agent.ctx.phase_results <classes.rule.Context.phase_results>` 426 427 Stores the results of the phases the agent has been in. 428 By default the keys are set to :any:`Context.PHASE_NOT_EXECUTED`. 429 """ 430 return self.ctx.phase_results 431 432 @property 433 def active_blocking_rules(self) -> Set[BlockingRule]: 434 """ 435 Blocking rules that are currently active and have taken over the agents loop. 436 """ 437 return self._active_blocking_rules 438 439 @property 440 def _map(self) -> carla.Map: 441 """Get the current map of the world.""" # Needed *only* for set_destination 442 return CarlaDataProvider.get_map() 443 444 # ------------------ 445 446 #@property 447 #def ctx(self) -> Union[Context, None]: 448 # print("Getting Context", self._ctx()) 449 # return self._ctx() # might be None 450 451 # ------------------ Information functions ------------------ # 452
[docs] 453 def update_information(self, second_pass: bool = False): 454 """ 455 Updates the information regarding the ego vehicle based on the surrounding world. 456 457 Parameters: 458 second_pass : *Internal usage* set to :python:`True` if this function is called a second 459 time in the same tick, e.g. after a route update. 460 461 See Also: 462 - :py:attr:`information_manager` 463 464 Executes the phases: 465 - :py:class:`Phase.UPDATE_INFORMATION | Phase.BEGIN<.Phase>` 466 - :py:class:`Phase.UPDATE_INFORMATION | Phase.END<.Phase>` 467 """ 468 self.execute_phase(Phase.UPDATE_INFORMATION | Phase.BEGIN, prior_results=None) 469 self._update_information(second_pass=second_pass) 470 self.execute_phase(Phase.UPDATE_INFORMATION | Phase.END, prior_results=None)
471 472 def _update_information(self, *, second_pass: bool = False): 473 """ 474 This method updates the information regarding the ego 475 vehicle based on the surrounding world. 476 477 second_pass = True will skip some calculations, 478 especially useful if a second call to _update_information is necessary in the same tick. 479 480 Assumes: second_pass == True => agent.done() == False 481 482 Note: 483 Does not execute any Phase. 484 """ 485 # -------------------------------------------------------------------------- 486 # Information that is CONSTANT DURING THIS TICK and INDEPENDENT OF THE ROUTE 487 # -------------------------------------------------------------------------- 488 if not second_pass: 489 if self._debug: # noqa: SIM102 490 if not self._lights_list and len(CarlaDataProvider._traffic_light_map.keys()): # pyright: ignore[reportPrivateUsage] 491 logger.error("Traffic light list is empty, but map is not.") 492 493 # ---------------------------- 494 # First Pass for expensive and tick-constant information 495 496 # --- InformationManager --- 497 information: InformationManager.Information = self.information_manager.tick() # NOTE: # Warning: Currently not route-dependant, might need to be changed later 498 self.tick_information = information 499 500 self._current_waypoint = information.current_waypoint 501 self.current_states = information.current_states 502 503 # Maybe access over subattribute, or properties 504 # NOTE: If other scripts, e.g. the scenario_runner, are used in parallel or sync=False 505 # the stored actors might could have been destroyed later on. 506 507 # Global Information 508 self.all_vehicles = information.vehicles 509 self.all_walkers = information.walkers 510 self.all_static_obstacles = information.static_obstacles 511 # Combination of the three lists 512 self.all_obstacles = information.obstacles 513 514 # Filtered by config.obstacles.nearby_vehicles_max_distance and nearby_vehicles_max_distance 515 self.vehicles_nearby = information.vehicles_nearby 516 self.walkers_nearby = information.walkers_nearby 517 self.static_obstacles_nearby = information.static_obstacles_nearby 518 519 # Combination of the three lists 520 self.all_obstacles_nearby = information.obstacles_nearby 521 522 self.traffic_lights_nearby = information.traffic_lights_nearby 523 524 # Find vehicles and walkers nearby; could be moved to the information manager 525 526 # ---------------------------- 527 528 # Data Matrix 529 # update not every frame to save performance 530 if self._detection_matrix and self._detection_matrix.sync: 531 self._road_matrix_counter += 1 532 if (self._road_matrix_counter % self.config.detection_matrix.sync_interval) == 0: 533 #logger.debug("Updating Road Matrix") 534 # TODO: Still prevent async mode from using too much resources and slowing fps down too much. 535 self._detection_matrix.update() # NOTE: Does nothing if in async mode. self.road_matrix is updated by another thread. 536 else: 537 pass 538 539 # used for self._local_planner.get_incoming_waypoint_and_direction 540 self._look_ahead_steps = int((self.live_info.current_speed_limit) / 10) # TODO: Maybe make this an interpolation and make more use of it 541 542 # ----- Follow speed limits ------- 543 # NOTE: This is once again set in the local planner, but only on the Context config! 544 if self.config.speed.follow_speed_limits: 545 self.config.speed.target_speed = self.config.live_info.current_speed_limit 546 547 # NOTE: This is the direction used by the planner in the *last* step. 548 self.live_info.executed_direction = self._local_planner.target_road_option 549 550 assert self.live_info.executed_direction == self._local_planner.target_road_option, "Executed direction should not change." 551 552 # ------------------------------------------------------------------- 553 # Information that NEEDS TO BE UPDATED AFTER a plan / ROUTE CHANGE. 554 # ------------------------------------------------------------------- 555 if not self.done(): 556 # NOTE: This should be called after 557 self.live_info.incoming_waypoint, self.live_info.incoming_direction = self._local_planner.get_incoming_waypoint_and_direction( # pyright: ignore[reportAttributeAccessIssue] 558 steps=self._look_ahead_steps) 559 else: 560 assert second_pass is False, "In the second pass the agent should have replanned and agent.done() should be False" 561 # Assumes second_pass is False 562 # Queue is empty 563 self.live_info.incoming_waypoint = None 564 self.live_info.incoming_direction = RoadOption.VOID 565 566 # Information that requires updated waypoint and route information: 567 self.live_info.is_taking_turn = self.is_taking_turn() 568 self.live_info.is_changing_lane = self.is_changing_lane() 569 570 #logger.debug(f"Incoming Direction: {str(self.live_info.incoming_direction):<20} - Second Pass: {second_pass}") 571 572 # RSS 573 # todo uncomment if agent is created after world model 574 #self.rss_set_road_boundaries_mode() # in case this was adjusted during runtime. # TODO: maybe implement this update differently. As here it is called unnecessarily often. 575 576 if self._debug: 577 OmegaConf.to_container(self.live_info, resolve=True, throw_on_missing=True) 578
[docs] 579 def is_taking_turn(self) -> bool: 580 """Checks if the agent is taking a turn in a few steps""" 581 return self.live_info.incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT)
582
[docs] 583 def is_changing_lane(self) -> bool: 584 """Checks if the agent is changing lanes in a few steps""" 585 return self.live_info.incoming_direction in (RoadOption.CHANGELANELEFT, RoadOption.CHANGELANERIGHT)
586 587 # ------------------ Step & Loop Logic ------------------ # 588
[docs] 589 def execute_phase(self, phase: Phase, *, prior_results: Any, update_controls: Optional[carla.VehicleControl] = None) -> Context: 590 """ 591 Sets the current phase of the agent and executes all rules that are associated with it. 592 593 Parameters: 594 phase : The phase to execute. 595 prior_results : The results of the previous phase, e.g. :py:attr:`detected_hazards`. 596 update_controls : Optionally controls that should be used from now onward. 597 """ 598 normal_next = self.current_phase.next_phase() # sanity checking if everything is correct 599 if self._validate_phases: 600 assert (normal_next in {phase, Phase.USER_CONTROLLED} 601 or phase & Phase.EXCEPTIONS 602 or phase & Phase.USER_CONTROLLED),\ 603 f"Phase {phase} is not the next phase of {self.current_phase} or an exception phase. Expected {normal_next}" 604 605 self.current_phase = phase # set next phase 606 607 if update_controls is not None: 608 self.set_control(update_controls) 609 self.ctx.prior_result = prior_results 610 self.ctx.phase_results[phase] = prior_results 611 612 rules_to_check = self.rules.get(phase, ()) # use get if a custom phase is added, without a rule 613 try: 614 for rule in rules_to_check: # todo: maybe dict? grouped by phase? 615 assert self.current_phase in rule.phases, f"Current phase {self.current_phase} not in Rule {rule.phases}" # TODO remove: 616 rule(self.ctx) 617 # NOTE: Blocking rules can change the and above assertion will fail. 618 if phase != self.current_phase: 619 logger.warning("Phase was changed by rule %s to %s. " 620 "Resting self.current_phase to %s. " 621 "To prevent his raise an exception in the rule or adjust the phase.", 622 rule, self.current_phase, phase) 623 self.current_phase = phase 624 625 except NoFurtherRulesException: 626 pass 627 except omegaconf.ReadonlyConfigError: 628 print("WARNING: A action likely tried to change `ctx.config` which is non-permanent. Use `ctx.agent.config.` instead.") 629 raise 630 return self.ctx
631 632 def _plan_path_phase(self, *, second_pass: bool, debug: bool = False): 633 try: 634 self.execute_phase(Phase.PLAN_PATH | Phase.BEGIN, prior_results=None) 635 # User defined action 636 # TODO: when going around corners / junctions and the distance between waypoints is too big, 637 # We should replan and and make a more fine grained plan, to stay on the road. 638 self.execute_phase(Phase.PLAN_PATH | Phase.END, prior_results=None) 639 except UpdatedPathException as e: 640 if second_pass: 641 logger.warning("UpdatedPathException was raised in the second pass. This should not happen: %s. Restrict your rule on ctx.second_pass.", e) 642 return self.run_step(debug, second_pass=True) 643 return None 644 645 def _make_context(self, last_context: Union[Context, None], **kwargs: Any) -> Context: 646 """Creates a new context object for the agent at the start of a step.""" 647 if last_context is not None: 648 del last_context.last_context 649 ctx = Context(agent=self, last_context=last_context, **kwargs) 650 self.ctx = ctx 651 return ctx 652
[docs] 653 def __call__(self, debug: bool = False) -> carla.VehicleControl: 654 """Calculates the next vehicle control object.""" 655 return self.run_step(debug, second_pass=False) # debug should be positional!
656 657 # Python 3.8+ add / for positional only arguments
[docs] 658 def run_step(self, debug: bool = False, second_pass: bool = False) -> carla.VehicleControl: 659 """ 660 Calculates the next vehicle control object. 661 662 Arguments: 663 debug : Whether to enable debug mode. 664 This prints some more information and debug drawings. 665 second_pass : **Internal usage** set to :python:`True` 666 if this function is called a second time, e.g. after a route update. 667 668 Warning: 669 To be compatible with the :py:class:`.LunaticChallenger`, **always pass** :code:`debug` **as a positional argument**, 670 or use the :py:meth:`__call__` method. 671 """ 672 673 if not second_pass: 674 ctx = self._make_context(last_context=self.ctx) 675 else: 676 ctx = self.ctx 677 ctx.second_pass = second_pass 678 try: 679 # ---------------------------- 680 # Phase 0 - Update Information 681 # ---------------------------- 682 # > Phase.UPDATE_INFORMATION | Phase.BEGIN 683 self.update_information(second_pass=second_pass) 684 # > Phase.UPDATE_INFORMATION | Phase.END 685 686 # ---------------------------- 687 # Phase 1 - Plan Path 688 # ---------------------------- 689 690 # Question: What TODO if the last phase was COLLISION (async), EMERGENCY 691 # Some information to PLAN_PATH should reflect this 692 693 # NOTE: Currently no option to diverge from existing path here, or plan a new path 694 # NOTE: Currently done in the local planner and behavior functions 695 self._plan_path_phase(second_pass=second_pass, debug=debug) 696 697 # Check whether the agent has reached its destination. 698 if self.done(): 699 # NOTE: Might be in NONE phase here. 700 self.execute_phase(Phase.DONE | Phase.BEGIN, prior_results=None) 701 if self.done(): 702 # No Rule set a new destination 703 logger.info("The target has been reached, stopping the simulation") 704 self.execute_phase(Phase.TERMINATING | Phase.BEGIN, prior_results=None) 705 raise AgentDoneException 706 self.execute_phase(Phase.DONE | Phase.END, prior_results=None) 707 return self.run_step(debug, second_pass=True) # NOTE! For child classes like the leaderboard agent this calls the higher level run_step. 708 709 # ---------------------------- 710 # Phase NONE - Before Running step 711 # ---------------------------- 712 try: 713 planned_control = self._inner_step(debug=debug) # debug=True draws waypoints 714 if self.detected_hazards: 715 # The must_clear_hazard decorator will raise this, but in case the _inner_step is overwritten 716 raise EmergencyStopException(self.detected_hazards) # noqa: TRY301 717 # Reraise Exceptions 718 except UserInterruption: 719 raise 720 721 # Handled Exceptions 722 except EmergencyStopException as emergency: 723 # ---------------------------- 724 # Phase Emergency 725 # no Rule with Phase.EMERGENCY | BEGIN cleared the provided hazards in ctx.prior_results 726 # ---------------------------- 727 728 emergency_controls = self.emergency_manager(reasons=emergency.hazards_detected) 729 730 # TODO: somehow backup the control defined before. 731 self.execute_phase(Phase.EMERGENCY | Phase.END, 732 update_controls=emergency_controls, 733 prior_results=emergency.hazards_detected) 734 planned_control = self.get_control() # type: carla.VehicleControl # type: ignore[assignment] 735 736 except SkipInnerLoopException as skip: 737 self.set_control(skip.planned_control) 738 self.current_phase = Phase.USER_CONTROLLED 739 planned_control = skip.planned_control 740 except UpdatedPathException as update_exception: 741 if second_pass > 5: 742 logger.warning("UpdatedPathException was raised more than %s times. " 743 "Warning: This might be an infinite loop.", second_pass) 744 elif second_pass > 50: 745 raise RecursionError("UpdatedPathException was raised more than 50 times. " 746 "Assuming an infinite loop and terminating") from update_exception 747 else: 748 logger.warning("%s was raised in the inner step, this should be done in " 749 "Phase.PLAN_PATH instead.", update_exception) 750 return self.run_step(second_pass=int(second_pass) + 1) # type: ignore 751 except LunaticAgentException as lae: 752 if self.ctx.control is None: 753 raise ValueError(f"A VehicleControl object must be set on the agent when {type(lae).__name__} is " 754 "raised during `._inner_step`") from lae 755 planned_control = self.get_control() # type: ignore[assignment] 756 # assert ctx.control 757 758 # ---------------------------- 759 # No known Phase multiple exit points 760 # ---------------------------- 761 762 # ---------------------------- 763 # Phase RSS - Check RSS 764 # ---------------------------- 765 766 ctx = self.execute_phase(Phase.RSS_EVALUATION | Phase.BEGIN, 767 prior_results=None, 768 update_controls=planned_control) 769 if AD_RSS_AVAILABLE and self.config.rss and self.config.rss.enabled: 770 rss_updated_controls = self._world_model.rss_check_control(ctx.control) # type: ignore[arg-type] 771 else: 772 rss_updated_controls = None 773 # NOTE: rss_updated_controls could be None. 774 ctx = self.execute_phase(Phase.RSS_EVALUATION | Phase.END, prior_results=rss_updated_controls) 775 776 #if ctx.control is not planned_control: 777 # logger.debug("RSS updated control accepted.") 778 779 except ContinueLoopException as e: 780 logger.debug("ContinueLoopException skipping rest of loop.") 781 if self.ctx.control is None: 782 raise ValueError(f"A VehicleControl object must be set on the agent when {type(e).__name__} is raised during `._inner_step`") from e 783 784 planned_control = self.ctx.control # type: carla.VehicleControl # type: ignore[assignment] 785 planned_control.manual_gear_shift = False 786 return self.get_control() # type: ignore[return-value]
787
[docs] 788 @must_clear_hazard 789 @result_to_context("control") 790 def _inner_step(self, debug: bool = False) -> carla.VehicleControl: 791 """ 792 This is is the internal function to provide the next control object for 793 the agent; it should run every tick. 794 795 Raises: 796 EmergencyStopException: If :py:attr:`detected_hazards` is not empty when the 797 function returns. 798 799 :meta public: 800 """ 801 self.debug = debug 802 803 # ---------------------------- 804 # Phase 2 - Detection of Pedestrians and Traffic Lights 805 # ---------------------------- 806 807 # Detect hazards 808 # phases are executed in detect_hazard 809 # > Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN # phases executed inside 810 pedestrians_and_tlight_hazard = self.detect_hazard() 811 # > Phase.DETECT_PEDESTRIANS | Phase.END 812 813 # Pedestrian avoidance behaviors 814 # currently doing either emergency (detect_hazard) stop or nothing 815 if self.detected_hazards: 816 817 # ---------------------------- 818 # Phase Hazard Detected (traffic light or pedestrian) 819 # If no Rule with Phase.EMERGENCY | BEGIN clears pedestrians_or_traffic_light 820 # An EmergencyStopException is raised 821 # ---------------------------- 822 self.react_to_hazard(pedestrians_and_tlight_hazard) # Optional[NoReturn] 823 824 # ----------------------------- 825 # Phase Detect Static Obstacles 826 # ----------------------------- 827 828 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.BEGIN, prior_results=None) 829 static_obstacle_detection_result = self.detect_obstacles_in_path(self.static_obstacles_nearby) 830 if static_obstacle_detection_result.obstacle_was_found: 831 self.current_states[AgentState.BLOCKED_BY_STATIC] += 1 832 # Must plan around it 833 self.add_hazard(Hazard.STATIC_OBSTACLE) 834 else: 835 self.current_states[AgentState.BLOCKED_BY_STATIC] = 0 836 # TODO: add a basic rule for circumventing static obstacles 837 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.END, prior_results=static_obstacle_detection_result) 838 # Not throwing an error here yet 839 840 # ---------------------------- 841 # Phase 3 - Detection of Cars 842 # ---------------------------- 843 844 self.execute_phase(Phase.DETECT_CARS | Phase.BEGIN, prior_results=None) # TODO: Maybe add some prio result 845 vehicle_detection_result = self.detect_obstacles_in_path(self.vehicles_nearby) 846 847 # TODO: add a way to let the execution overwrite 848 if vehicle_detection_result.obstacle_was_found: 849 850 # ---------------------------- 851 # Phase 2.A - React to cars in front 852 # TODO: turn this into a rule. 853 # remove CAR_DETECTED -> pass detection_result to rules 854 # TODO some way to circumvent returning control here, like above. 855 # TODO: Needs refinement with the car_following_behavior 856 # ---------------------------- 857 858 self.execute_phase(Phase.CAR_DETECTED | Phase.BEGIN, prior_results=vehicle_detection_result) 859 control = self.car_following_behavior(*vehicle_detection_result) # type: ignore[arg-type] 860 # NOTE: might throw EmergencyStopException 861 self.execute_phase(Phase.CAR_DETECTED | Phase.END, update_controls=control, prior_results=vehicle_detection_result) 862 return self.get_control() # type: ignore[return-value] 863 864 #TODO: maybe new phase instead of END or remove CAR_DETECTED and handle as rules (maybe better) 865 self.execute_phase(Phase.DETECT_CARS | Phase.END, prior_results=None) # NOTE: avoiding tailgate here 866 867 # Intersection behavior 868 # NOTE: is_taking_turn <- incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT) 869 if self.live_info.incoming_waypoint.is_junction and self.is_taking_turn(): # pyright: ignore[reportOptionalMemberAccess] 870 871 # ---------------------------- 872 # Phase Turning at Junction 873 # ---------------------------- 874 875 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.BEGIN, prior_results=None) 876 control = self._calculate_control(debug=debug) 877 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.END, update_controls=control, prior_results=None) 878 return self.get_control() # type: ignore[return-value] 879 880 # ---------------------------- 881 # Phase 4 - Plan Path normally 882 # ---------------------------- 883 884 # Normal behavior 885 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.BEGIN, prior_results=None) 886 control = self._calculate_control(debug=debug) 887 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.END, prior_results=None, update_controls=control) 888 889 # Leave loop and apply controls outside 890 return self.get_control() # type: ignore[return-value]
891 892 def _calculate_control(self, debug: bool = False): 893 """ 894 Plan the next step of the agent. This will execute the local planner 895 to retrieve the next control fitting the current path and settings. 896 897 Note: 898 This is the innermost function of the agents run_step function. 899 It should be called each step to acquire a desired control object. 900 Use this function inside rules if a control object is desired. 901 902 **[Context.get_or_calculate_control](#Context.get_or_calculate_control) 903 is a safer alternative to this function** 904 905 Warning: 906 If you do not use this function in a [`BlockingRule`](#BlockingRule) 907 you should raise a `SkipInnerLoopException` or `ContinueLoopException` 908 else the planned path will skip a waypoint. 909 910 Warning: 911 This function only calculates and returns the control object directly. 912 **It does not set the :py:attr:`agent/ctx.control <control>` attribute which is the one 913 the agent uses in [`apply_control`](#apply_control) to apply the final controls.** 914 """ 915 if self.ctx.control is not None: 916 logger.error("Control was set before calling _calculate_control. This might lead to unexpected behavior.") 917 918 return self._local_planner.run_step(debug) 919
[docs] 920 def parse_keyboard_input(self, allow_user_updates: bool = True, 921 *, control: Optional[carla.VehicleControl] = None) -> None: 922 """ 923 Parse the current user input and allow manual updates of the controls. 924 925 Args: 926 allow_user_updates: If :python:`True`, the user can update the controls manually. 927 Otherwise only the normal hotkeys do work. 928 929 Executes Phases: 930 - :py:class:`Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN <classes.constants.Phase>` 931 - :py:class:`Phase.APPLY_MANUAL_CONTROLS | Phase.END <classes.constants.Phase>` 932 """ 933 planned_control = control or self.get_control() 934 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN, prior_results=planned_control) 935 936 # Controls can be updated inplace by the user. 937 if self._world_model.controller.parse_events(planned_control if allow_user_updates else carla.VehicleControl()): # pyright: ignore[reportOptionalMemberAccess] 938 print("Exiting by user input.") 939 raise UserInterruption("Exiting by user input.") 940 941 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.END, prior_results=None)
942
[docs] 943 def apply_control(self, control: Optional[carla.VehicleControl] = None): 944 """ 945 Applies the control to the agent's actor. 946 Will execute the :py:class:`Phase.EXECUTION | Phase.BEGIN <classes.constants.Phase>` 947 and :py:class:`Phase.EXECUTION | Phase.END <classes.constants.Phase>` phases. 948 949 Note: 950 The final control object that is applied to the agent's actor 951 is stored in the :py:attr:`ctx.control <ctx>` attribute. 952 953 Raises ValueError: 954 If the control object is not set, i.e. :py:meth:`get_control` returns :python:`None`. 955 """ 956 if control is None: 957 control = self.get_control() 958 if control is None: 959 raise ValueError("The agent has not yet performed a step this tick " 960 "and has no control object was passed.") 961 if self.current_phase != Phase.EXECUTION | Phase.BEGIN: 962 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control, update_controls=control) 963 else: 964 logger.debug("Agent is already in execution phase.") 965 # Set automatic control-related vehicle lights 966 final_control: carla.VehicleControl = self.get_control() # type: ignore[assignment] 967 self._update_lights(final_control) 968 self._vehicle.apply_control(final_control) 969 self.execute_phase(Phase.EXECUTION | Phase.END, prior_results=final_control)
970 971 # ------------------ Hazard Detection & Reaction ------------------ # 972 973 from agents.substep_managers import detect_traffic_light # -> TrafficLightDetectionResult 974 traffic_light_manager = detect_traffic_light # pyright: ignore[reportAssignmentType] 975 """Alias of :py:meth:`detect_traffic_light`""" 976
[docs] 977 def detect_hazard(self) -> Set[Hazard]: 978 """ 979 Checks for red traffic lights and pedestrians in the agents path. 980 981 If :py:attr:`.LunaticAgentSettings.obstacles.detect_yellow_tlights` is set to :python:`True`, 982 then yellow traffic lights will also be regarded as a hazard that can trigger an 983 :py:exc:`EmergencyStopException` in :py:meth:`react_to_hazard` that is executed after this 984 function. 985 """ 986 # Red lights and stops behavior 987 988 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN, prior_results=None) 989 tlight_detection_result: TrafficLightDetectionResult = self.detect_traffic_light() 990 if tlight_detection_result.traffic_light_was_found: 991 if tlight_detection_result.traffic_light.state == carla.TrafficLightState.Red: # pyright: ignore[reportOptionalMemberAccess] 992 self.add_hazard(Hazard.TRAFFIC_LIGHT_RED) 993 else: # NOTE: self.config.obstacles.detect_yellow_tlights must be True 994 self.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, HazardSeverity.WARNING) 995 996 #assert self.live_info.next_traffic_light.id == tlight_detection_result.traffic_light.id, "Next assumed traffic light should be the same as the detected one." # TEMP 997 998 # NOTE next tlight is the next bounding box and might not be the next "correct" one 999 # self.live_info.next_traffic_light.id != tlight_detection_result.traffic_light.id: 1000 1001 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.END, prior_results=tlight_detection_result) 1002 1003 # Pedestrian avoidance behaviors 1004 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.BEGIN, prior_results=None) 1005 is_dangerous, detection_result = self.pedestrian_avoidance_behavior() 1006 if detection_result.obstacle_was_found: 1007 if is_dangerous: 1008 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.EMERGENCY) 1009 else: 1010 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.WARNING) 1011 # NOTE: its a flag, could pack all in one bin 1012 # Pro: easier to check 1013 # Con: when to remove other states like warning 1014 # Make it a dict with the state as key and the detection result as value! 1015 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.END, prior_results=(is_dangerous, detection_result)) 1016 1017 return self.detected_hazards
1018
[docs] 1019 def react_to_hazard(self, hazard_detected: Union[Hazard, Iterable[Hazard], None]) -> Optional[NoReturn]: 1020 """ 1021 Called when a hazard was detected- 1022 1023 Will store the detected hazards in the Context: `ctx.prior_result` 1024 If no rule clears this variable, the agent will throw a EmergencyStopException 1025 1026 Raises: 1027 EmergencyStopException: If a hazard was detected and no rule cleared it. 1028 """ 1029 1030 # update state? prevent flodding of log information 1031 #logger.info("Hazard(s) detected: %s", self.detected_hazards) 1032 if hazard_detected: 1033 if not isinstance(hazard_detected, Hazard): 1034 self.detected_hazards.update(hazard_detected) 1035 else: 1036 self.detected_hazards.add(hazard_detected) 1037 1038 if self.detected_hazards: 1039 self.execute_phase(Phase.EMERGENCY | Phase.BEGIN, prior_results=hazard_detected) 1040 else: 1041 logger.info("react_to_hazard was called without any detected hazards.") 1042 if self.detected_hazards: 1043 raise EmergencyStopException(self.detected_hazards) 1044 logger.info("Hazards have been cleared.")
1045 1046 # ------------------ Behaviors ------------------ # 1047 # TODO: Section needs overhaul -> turn into rules 1048
[docs] 1049 def pedestrian_avoidance_behavior(self) -> tuple[bool, ObstacleDetectionResult]: 1050 """ 1051 Detects pedestrians in the agents path. 1052 1053 Returns: 1054 A tuple containing a boolean indicating if the detected pedestrian is dangerous 1055 and the detection result. 1056 """ 1057 # note ego_vehicle_wp is the current waypoint self._current_waypoint 1058 detection_result = self.detect_obstacles_in_path(self.walkers_nearby) 1059 if (detection_result.obstacle_was_found 1060 and (detection_result.distance - max(detection_result.obstacle.bounding_box.extent.y, # pyright: ignore[reportOptionalMemberAccess] 1061 detection_result.obstacle.bounding_box.extent.x) # pyright: ignore[reportOptionalMemberAccess] 1062 - max(self._vehicle.bounding_box.extent.y, 1063 self._vehicle.bounding_box.extent.x) 1064 < self.config.distance.emergency_braking_distance)): 1065 #print("Detected walker", detection_result.obstacle) 1066 # NOTE: should slow down here 1067 return True, detection_result 1068 # NOTE detected but not stopping -> ADD avoidance behavior 1069 if detection_result.obstacle_was_found: 1070 logger.debug("Detected a pedestrian but determined no intervention necessary (too far away).") 1071 return False, detection_result
1072
[docs] 1073 def car_following_behavior(self, 1074 vehicle_detected: bool, # noqa: ARG002,RUF100 # pylint: disable=unused-argument 1075 vehicle: carla.Actor, 1076 distance: float) -> carla.VehicleControl: 1077 """ 1078 Parameters: 1079 Must match :py:class:`.ObstacleDetectionResult` 1080 1081 Assumes: 1082 - That an obstacle was detected: 1083 :py:attr:`vehicle_detected<.ObstacleDetectionResult.obstacle_was_found> is True and 1084 :py:attr:`vehicle<.ObstacleDetectionResult.obstacle>` is the detected vehicle. 1085 """ 1086 exact_distance = distance - max(vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x) - max( 1087 self._vehicle.bounding_box.extent.y, self._vehicle.bounding_box.extent.x) 1088 1089 if exact_distance < self.config.distance.emergency_braking_distance: 1090 # Note: If the passed set is not cleared by a Phase.EMERGENCY | Phase.BEGIN rule, 1091 # an EmergencyStopException is raised. 1092 self.add_hazard(Hazard.CAR) 1093 self.react_to_hazard(hazard_detected={Hazard.CAR}) # Optional[NoReturn] 1094 controls = self.car_following_manager(vehicle, exact_distance) 1095 return controls
1096 1097 # ------------------ External Helpers ------------------ # 1098 1099 # Moved outside of the class for organization 1100 from agents.substep_managers import ( 1101 car_following_manager, # pyright: ignore[reportAssignmentType] 1102 emergency_manager, 1103 ) 1104 1105 # Subfunction of traffic_light_manager. In traffic_light_manager the parameters are chosen automatically 1106 # which is why traffic_light_manager should be used instead. 1107 # Kept for backwards compatibility and possible future use. 1108 from agents.substep_managers.traffic_light import affected_by_traffic_light 1109 from agents.tools.lunatic_agent_tools import detect_obstacles_in_path 1110 1111 # ---- 1112 1113 #@override
[docs] 1114 def add_emergency_stop(self, control: carla.VehicleControl, reasons: "Optional[set[str]]" = None) -> carla.VehicleControl: 1115 """ 1116 Modifies the control values to perform an emergency stop. 1117 The steering remains unchanged to avoid going out of the lane during turns. 1118 1119 :param control: (carla.VehicleControl) control to be modified 1120 :param enable_random_steer: (bool, optional) Flag to enable random steering 1121 """ 1122 return self.emergency_manager(reasons=reasons, control=control) # type: ignore[arg-type]
1123
[docs] 1124 def lane_change(self, 1125 direction: Literal['left', 'right'], 1126 same_lane_time: float = 0, 1127 other_lane_time: float = 0, 1128 lane_change_time: float = 2, 1129 *, 1130 check: bool = False): 1131 """ 1132 Changes the path so that the vehicle performs a lane change. 1133 Use 'direction' to specify either a 'left' or 'right' lane change, 1134 and the time parameters to fine tune the maneuver. 1135 1136 Steps for the lane change: 1137 1. **same_lane_time** seconds in the same lane. 1138 2. **lane_change_time** seconds to reach the other lane. 1139 3. **other_lane_time** seconds to stay in the other lane. 1140 1141 Parameters: 1142 waypoint: The starting waypoint. 1143 direction: The direction of the lane change, either 'left' or 'right'. 1144 Defaults to 'left'. 1145 same_lane_time: The time to follow the same lane before the lane change. 1146 other_lane_time: The time to follow the other lane after the lane change. 1147 lane_change_time: The time to reach the center of the last lane. 1148 A low value will make a fast lane change, while a high value will make slow lane change. 1149 check: If :python:`True`, the function will check if the lane change is possible, i.e. 1150 if there is a lane of :py:class:`carla.LaneType.Driving <carla.LaneType>` in 1151 the desired direction. Otherwise it can change to other lane types as well. 1152 Defaults to :code:`False`. 1153 1154 See Also: 1155 - :py:func:`agents.tools.lunatic_agent_tools.generate_lane_change_path` 1156 - :py:meth:`set_global_plan` 1157 """ 1158 speed = self.live_info.current_speed / 3.6 # m/s 1159 # This is a staticfunction from BasicAgent function 1160 path: list[tuple[carla.Waypoint, RoadOption]] = self._generate_lane_change_path( 1161 self._current_waypoint, # NOTE: Assuming exact_waypoint 1162 direction, 1163 same_lane_time * speed, # get direction in meters t*V 1164 other_lane_time * speed, 1165 lane_change_time * speed, 1166 check=check, 1167 lane_changes=1, # changes only one lane 1168 step_distance=self.config.planner.sampling_resolution 1169 ) 1170 if not path: 1171 logger.info("Ignoring the lane change as no path was found") 1172 1173 # Change path to take now 1174 # NOTE: use super with arguments here. 1175 super(LunaticAgent, self).set_global_plan(path) # noqa: UP008
1176 # TODO: # CRITICAL: Keep old global plan if it is some end goal -> Restore it. 1177 1178 # TODO: Use generate_lane_change_path to finetune
[docs] 1179 def make_lane_change(self, 1180 order: Sequence[Literal["left", "right"]] = ["left", "right"], 1181 up_angle_th: int = 180, 1182 low_angle_th: int = 0) -> "None | Literal[True]": 1183 """ 1184 Move to the left/right lane if possible 1185 1186 Args: 1187 order : The order in 1188 which the agent should try to change lanes. If a single string is given, the agent 1189 will try to change to that lane. 1190 up_angle_th : The angle threshold for the upper limit of obstacle detection in the other lane. 1191 Default is 180 degrees, meaning that the agent will detect obstacles ahead. 1192 low_angle_th : The angle threshold for the lower limit of obstacle detection in the other lane. 1193 Default is 0 degrees, meaning that the agent will detect obstacles behind. 1194 1195 Assumes: 1196 - :python:`(self.config.live_info.incoming_direction == RoadOption.LANEFOLLOW \ 1197 and not waypoint.is_junction and self.config.live_info.current_speed > 10)` 1198 - :python:`check_behind.obstacle_was_found and self.config.live_info.current_speed < get_speed(check_behind.obstacle)` 1199 """ 1200 vehicle_list = self.vehicles_nearby 1201 waypoint = self._current_waypoint # todo use a getter 1202 1203 # There is a faster car behind us 1204 if isinstance(order, str): 1205 order = [order] 1206 1207 for direction in order: 1208 if direction == "right": 1209 right_turn = waypoint.right_lane_marking.lane_change 1210 can_change = (right_turn in {carla.LaneChange.Right, carla.LaneChange.Both}) 1211 other_wpt = waypoint.get_right_lane() 1212 lane_offset = 1 1213 elif direction == "left": 1214 left_turn = waypoint.left_lane_marking.lane_change 1215 can_change = (left_turn in {carla.LaneChange.Left, carla.LaneChange.Both}) 1216 other_wpt = waypoint.get_left_lane() 1217 lane_offset = -1 1218 else: 1219 raise ValueError(f"Direction must be 'left' or 'right', was {direction}") 1220 if (can_change # other_wpt is not None 1221 and lanes_have_same_direction(waypoint, other_wpt) # type: ignore[arg-type] 1222 and other_wpt.lane_type == carla.LaneType.Driving): # type: ignore[attr-defined] 1223 # Detect if right lane is free 1224 detection_result = detect_vehicles(self, vehicle_list, 1225 self.max_detection_distance("other_lane"), 1226 up_angle_th=up_angle_th, 1227 low_angle_th=low_angle_th, 1228 lane_offset=lane_offset) 1229 if not detection_result.obstacle_was_found: 1230 logger.debug("Change Lane, moving to the %s! Reason: %s", direction, "Overtaking" if tuple(order) == ("left", "right") else "Tailgating") 1231 1232 end_waypoint = self._local_planner.target_waypoint 1233 # TODO: How to set waypoint order? Or better use generate_lane_change_path! 1234 self.set_destination(end_location=other_wpt.transform.location, # type: ignore[arg-type] 1235 start_location=end_waypoint.transform.location, clean_queue=True) 1236 return True 1237 return None
1238 1239 # ------------------ Other Function ------------------ # 1240 1241 def _update_lights(self, vehicle_control: carla.VehicleControl): 1242 """Updates the light of the vehicle in the simulation.""" 1243 current_lights: carla.VehicleLightState = self._vehicle_lights 1244 if vehicle_control.brake: 1245 current_lights |= carla.VehicleLightState.Brake 1246 else: # Remove the Brake flag 1247 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Brake 1248 if vehicle_control.reverse: 1249 current_lights |= carla.VehicleLightState.Reverse 1250 else: # Remove the Reverse flag 1251 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Reverse 1252 if current_lights != self._vehicle_lights: # Change the light state only if necessary 1253 self._vehicle_lights = current_lights 1254 self._vehicle.set_light_state(carla.VehicleLightState(self._vehicle_lights)) 1255 1256 def render_detection_matrix(self, display: "pygame.Surface", **options: Unpack["DetectionMatrix.RenderOptions"]): 1257 """ 1258 See Also: 1259 - :py:meth:`DetectionMatrix.render` 1260 1261 Attention: 1262 **options** must match the arguments of :py:meth:`DetectionMatrix.render`. 1263 1264 :meta private: 1265 """ 1266 if self._detection_matrix: 1267 # options should align with CameraConfig.DetectionMatrixHudConfig 1268 self._detection_matrix.render(display, **options) 1269
[docs] 1270 def verify_settings(self, config: Optional[LunaticAgentSettings] = None, *, 1271 verify_dataclass: Union["type[AgentConfig]", bool] = True, 1272 strictness: Literal[-1, 0, 1, 2, 3, 4] = 0): 1273 """ 1274 Verifies the settings of the LunaticAgent. 1275 Foremost this checks if the :py:obj:`.planner.dt` value has been set 1276 to the speed of the world ticks in synchronous mode. 1277 Secondly if :python:`verify_dataclass=True` or a different AgentConfig class is provided, 1278 it will check for correct type usage. 1279 1280 Args: 1281 config: The configuration to verify. 1282 If not provided, the agent's default configuration will be used. 1283 Defaults to :code:`None`. 1284 verify_dataclass: 1285 Determines the dataclass to use for verification. 1286 If :python:`True`, the :py:attr:`.BASE_SETTINGS` dataclass will be used. 1287 See :py:meth:`AgentConfig.check_config` for more details. 1288 Defaults to True. 1289 strictness: 1290 The strictness level for :py:meth:`AgentConfig.check_config`. 1291 Defaults to :python:`3`. 1292 1293 Raises: 1294 TypeError: If **verify_dataclass** is not a valid AgentConfig subclass or True. 1295 MissingMandatoryValue: If :python:`config.planner.dt` is not present 1296 or not a :python:`float` 1297 """ 1298 config = config or self.config 1299 1300 if self._world_model.world_settings.synchronous_mode: 1301 # Assure that dt is set 1302 if isinstance(config, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance] 1303 OmegaConf.select(config, 1304 "planner.dt", 1305 throw_on_missing=True 1306 ) 1307 elif not isinstance(config.planner.dt, float): 1308 from omegaconf import MissingMandatoryValue 1309 raise MissingMandatoryValue("`config.planner.dt` needs to be set to a value. " 1310 "Cannot be:", type(config.planner.dt), config.planner.dt) 1311 1312 if strictness < 0: 1313 return 1314 1315 # NOTE: Below is experimental and might fail as the config at this point has already been setup 1316 1317 from agents.tools import config_creation 1318 _old = config_creation._WARN_LIVE_INFO # pyright: ignore[reportPrivateUsage] 1319 config_creation._WARN_LIVE_INFO = False # pyright: ignore[reportPrivateUsage] 1320 1321 if verify_dataclass: 1322 if verify_dataclass is True: 1323 dataclass = self.BASE_SETTINGS 1324 elif issubclass(verify_dataclass, AgentConfig): # pyright: ignore[reportUnnecessaryIsInstance] 1325 dataclass = verify_dataclass 1326 else: 1327 raise TypeError("`verify_dataclass` must be a `AgentConfig` or `True` to select the BASE_SETTINGS ") 1328 dataclass.check_config(config, dataclass.get("strict_config", strictness), as_dict_config=True) 1329 1330 config_creation._WARN_LIVE_INFO = _old # pyright: ignore[reportPrivateUsage]
1331 1332 # ------------------ Getter Function ------------------ # 1333
[docs] 1334 def get_control(self): 1335 """ 1336 Returns the currently planned control of the agent. 1337 1338 If retrieved before the local planner has been run, it will return None. 1339 """ 1340 return self.ctx.control
1341 1342 # ------------------ Setter Function ------------------ # 1343
[docs] 1344 def set_control(self, control: carla.VehicleControl): 1345 """ 1346 Set new controls for the agent. Must be called before apply_control. 1347 1348 Raises: 1349 ValueError: If the control is None. 1350 """ 1351 self.ctx.control = control
1352 1353 def _init_detection_matrix(self) -> None: 1354 if self.config.detection_matrix and self.config.detection_matrix.enabled: 1355 if self.config.detection_matrix.sync and self._world_model.world_settings.synchronous_mode: 1356 self._detection_matrix = DetectionMatrix(self._vehicle) 1357 else: 1358 self._detection_matrix = AsyncDetectionMatrix(self._vehicle) 1359 self._detection_matrix.start() 1360 else: 1361 self._detection_matrix = None 1362 self._road_matrix_counter = 0 1363 1364 if READTHEDOCS or TYPE_CHECKING: 1365 # From the parent class, but without type-hints
[docs] 1366 def set_destination(self, 1367 end_location: carla.Location, 1368 start_location: Optional[carla.Location] = None, 1369 clean_queue: bool = True) -> None: 1370 ...
1371 1372 def set_vehicle(self, vehicle: carla.Actor) -> None: 1373 """ 1374 Set the vehicle for the agent (experimental if applied a second time) 1375 1376 :meta private: 1377 """ 1378 self._vehicle = assure_type(carla.Vehicle, vehicle) 1379 # do not register same id twice 1380 if all(actor.id != vehicle.id for actor in CarlaDataProvider._actor_velocity_map): # pyright: ignore[reportPrivateUsage] 1381 with contextlib.suppress(KeyError): 1382 CarlaDataProvider.register_actor(vehicle, vehicle.get_transform()) # pyright: ignore[reportUnknownMemberType] 1383 else: 1384 # Exchange the key for a faster lookup 1385 for key in CarlaDataProvider._actor_velocity_map: # pyright: ignore[reportPrivateUsage] 1386 if key.id == vehicle.id: 1387 break 1388 else: 1389 # This does not happen because of the first if condition 1390 raise ValueError("The vehicle was not registered in the actor map.") 1391 CarlaDataProvider._actor_velocity_map[vehicle] = CarlaDataProvider._actor_velocity_map.pop(key) # pyright: ignore[reportPrivateUsage] 1392 CarlaDataProvider._actor_transform_map[vehicle] = CarlaDataProvider._actor_transform_map.pop(key) # pyright: ignore[reportPrivateUsage] 1393 CarlaDataProvider._actor_location_map[vehicle] = CarlaDataProvider._actor_location_map.pop(key) # pyright: ignore[reportPrivateUsage] 1394 1395 self._init_detection_matrix() 1396 self._local_planner = DynamicLocalPlannerWithRss(self, 1397 map_inst=CarlaDataProvider.get_map(), 1398 world=CarlaDataProvider.get_world(), 1399 rss_sensor=self._world_model.rss_sensor) 1400 1401 #@override
[docs] 1402 def set_target_speed(self, speed: float) -> None: 1403 """ 1404 Changes the target speed of the agent. 1405 :param speed (float): target speed in Km/h 1406 """ 1407 if self.config.speed.follow_speed_limits: 1408 print("WARNING: The max speed is currently set to follow the speed limits. " 1409 "Use 'follow_speed_limits' to deactivate this") 1410 self.config.speed.target_speed = speed # shared with planner
1411
[docs] 1412 def follow_speed_limits(self, value: bool = True) -> None: 1413 """ 1414 If active, the agent will dynamically change the target speed according to the speed limits. 1415 1416 Arguments: 1417 value: Whether to activate this behavior 1418 """ 1419 self.config.speed.follow_speed_limits = value
1420
[docs] 1421 def ignore_traffic_lights(self, active: bool = True) -> None: 1422 """(De)activates the checks for traffic lights""" 1423 self.config.obstacles.ignore_traffic_lights = active
1424
[docs] 1425 def ignore_stop_signs(self, active: bool = True) -> None: 1426 """(De)activates the checks for stop signs""" 1427 self.config.obstacles.ignore_stop_signs = active
1428
[docs] 1429 def ignore_vehicles(self, active: bool = True) -> None: 1430 """(De)activates the checks for stop signs""" 1431 self.config.obstacles.ignore_vehicles = active
1432
[docs] 1433 def rss_set_road_boundaries_mode(self, 1434 road_boundaries_mode: Optional[ 1435 Union[bool, RssRoadBoundariesModeAlias]] = None) -> None: 1436 if road_boundaries_mode is None: 1437 road_boundaries_mode = self.config.rss.use_stay_on_road_feature 1438 self._world_model.rss_set_road_boundaries_mode(road_boundaries_mode)
1439 1440 # ------------------ Overwritten & Outsourced functions ------------------ # 1441 1442 from agents.tools.lunatic_agent_tools import detect_vehicles, max_detection_distance 1443 1444 # signature of parent is str and not Literal["left", "right"] 1445 from agents.tools.lunatic_agent_tools import ( 1446 generate_lane_change_path as _generate_lane_change_path, # pyright: ignore[reportAssignmentType] 1447 ) 1448 1449 # NOTE: the original pedestrian_avoid_manager is still usable 1450 def pedestrian_avoid_manager(self, waypoint) -> NoReturn: # type: ignore 1451 """ 1452 This function was replaced by ", substep_managers.pedestrian_detection_manager 1453 1454 :meta private: 1455 """ 1456 raise NotImplementedError("This function was replaced by ", substep_managers.pedestrian_detection_manager) 1457 1458 #@override 1459 def collision_and_car_avoid_manager(self, waypoint) -> NoReturn: # type: ignore 1460 """ 1461 This function was split into detect_obstacles_in_path and car_following_manager 1462 1463 :meta private: 1464 """ 1465 raise NotImplementedError("This function was split into detect_obstacles_in_path and car_following_manager") 1466 1467 #@override 1468 def _tailgating(self, waypoint, vehicle_list) -> NoReturn: # type: ignore 1469 raise NotImplementedError("Tailgating has been implemented as a rule") 1470 1471 #@override 1472 def emergency_stop(self) -> NoReturn: 1473 """ 1474 :meta private: 1475 """ 1476 raise NotImplementedError("This function was overwritten use `add_emergency_stop` instead") 1477 1478 # ------------------------------------ # 1479 # As reference Parent Functions 1480 # ------------------------------------ # 1481 #def get_local_planner(self): 1482 #def get_global_planner(self): 1483 1484 #def done(self): # from base class self._local_planner.done() 1485 1486 #def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True): 1487 """ 1488 Adds a specific plan to the agent. 1489 1490 :param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed 1491 :param stop_waypoint_creation: stops the automatic random creation of waypoints 1492 :param clean_queue: resets the current agent's plan 1493 """ 1494 1495 #def trace_route(self, start_waypoint, end_waypoint): 1496 """ 1497 Calculates the shortest route between a starting and ending waypoint. 1498 1499 :param start_waypoint (carla.Waypoint): initial waypoint 1500 :param end_waypoint (carla.Waypoint): final waypoint 1501 """ 1502 1503 # ---------------- Cleanup ------------------- # 1504 1505 def _destroy_sensor(self): 1506 if self._detection_matrix: 1507 self._detection_matrix.stop() 1508 self._detection_matrix = None 1509 if self._collision_sensor: 1510 self._collision_sensor.destroy() 1511 self._collision_sensor = None # type: ignore[assignment] 1512
[docs] 1513 def destroy(self): 1514 """Resets attributes and destroys helpers like the :py:class:`.DetectionMatrix`.""" 1515 self._destroy_sensor() 1516 self._world_model = None # type: ignore[assignment] 1517 self._world = None # type: ignore[assignment] 1518 if self.ctx: 1519 self.ctx.agent = None # type: ignore[assignment] 1520 self.ctx = None # type: ignore[assignment] 1521 try: 1522 self.all_vehicles.clear() 1523 self.vehicles_nearby.clear() 1524 self.all_walkers.clear() 1525 self.walkers_nearby.clear() 1526 except AttributeError: 1527 pass
1528 1529 def __del__(self): 1530 with contextlib.suppress(Exception): 1531 self.destroy()