1"""
2This module implements an agent that roams around a track following random
3waypoints and avoiding other vehicles. The agent also responds to traffic lights,
4traffic signs, and has different possible configurations.
5"""
6
7# Decide how to handle imports
8# ruff: noqa: PLC0415
9
10from __future__ import annotations
11
12import contextlib
13import sys
14from copy import deepcopy
15from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, List, NoReturn, Optional, Set, Union
16from typing import cast as assure_type
17
18import carla # pyright: ignore[reportMissingTypeStubs]
19import omegaconf
20from omegaconf import DictConfig, OmegaConf
21from typing_extensions import Literal, Self, Unpack
22
23from agents import substep_managers
24from agents.dynamic_planning.dynamic_local_planner import DynamicLocalPlannerWithRss
25from agents.navigation.behavior_agent import BehaviorAgent
26from agents.navigation.global_route_planner import GlobalRoutePlanner
27from agents.rules import rule_from_config
28from agents.tools.config_creation import (
29 AgentConfig,
30 LaunchConfig,
31 LiveInfo,
32 LunaticAgentSettings,
33 RssRoadBoundariesModeAlias,
34 RuleCreatingParameters,
35)
36from agents.tools.logs import logger
37from agents.tools.lunatic_agent_tools import (
38 detect_vehicles,
39 must_clear_hazard,
40 # Left here as a reminder that this can be used as a wrapper
41 phase_callback, # type: ignore[unused-import] # noqa: F401
42 result_to_context,
43)
44from agents.tools.misc import lanes_have_same_direction
45from classes.constants import AD_RSS_AVAILABLE, READTHEDOCS, AgentState, Hazard, HazardSeverity, Phase, RoadOption
46from classes.exceptions import (
47 AgentDoneException,
48 ContinueLoopException,
49 EmergencyStopException,
50 LunaticAgentException,
51 NoFurtherRulesException,
52 SkipInnerLoopException,
53 UpdatedPathException,
54 UserInterruption,
55)
56from classes.rule import BlockingRule, Context, Rule
57from classes.worldmodel import CarlaDataProvider, WorldModel
58from classes.detection_matrix import AsyncDetectionMatrix, DetectionMatrix
59from classes.information_manager import InformationManager
60
61if TYPE_CHECKING:
62 from agents.tools.hints import ObstacleDetectionResult, TrafficLightDetectionResult
63 import pygame
64
65
[docs]
66class LunaticAgent(BehaviorAgent):
67 """
68 BasicAgent implements an agent that navigates the scene.
69 This agent respects traffic lights and other vehicles, but ignores stop signs.
70 It has several functions available to specify the route that the agent must follow,
71 as well as to change its parameters in case a different driving mode is desired.
72 """
73
74 BASE_SETTINGS: "type[LunaticAgentSettings]" = LunaticAgentSettings
75 """
76 Base AgentConfig class for this agent. This is used to create the default settings for the agent
77 if none are provided.
78 """
79
80 DEFAULT_RULES: ClassVar[Dict[Phase, List[Rule]]] = {k: [] for k in Phase.get_phases()}
81 """
82 Default rules of this agent class when initialized.
83
84 :meta hide-value:
85 """
86
87 rules: Dict[Phase, List[Rule]]
88 """
89 The rules of the this agent.
90 When initialized the rules are deep copied from :py:attr:`DEFAULT_RULES`.
91 """
92
93 ctx: Context
94 """The context object of the current step"""
95
96 # Information from the InformationManager
97 walkers_nearby: List[carla.Walker]
98 vehicles_nearby: List[carla.Vehicle]
99 static_obstacles_nearby: List[carla.Actor]
100 """Static obstacles detected by the :py:class:`.InformationManager`"""
101
102 obstacles_nearby: List[carla.Actor]
103 """
104 Combination of :py:attr:`vehicles_nearby`, :py:attr:`walkers_nearby`
105 and :py:attr:`static_obstacles_nearby`.
106 """
107
108 traffic_lights_nearby: List[carla.TrafficLight]
109 traffic_signs_nearby: List[carla.TrafficSign] = NotImplemented
110 """
111 Not yet implemented
112
113 :meta private:
114 """
115
116 current_states: Dict[AgentState, int]
117 """The current states of the agent. The count of the steps being each state is stored as value."""
118
119 #
120
121 _world_model: WorldModel
122 """Reference to the attached :py:class:`WorldModel`."""
123
124 _validate_phases = False
125 """
126 Debugging flag to sanity check if the agent passes trough the phases in the correct order.
127 Attention: Currently straight forward anymore.
128 """
129
130 _active_blocking_rules: Set[BlockingRule]
131 """Blocking rules that are currently active and have taken over the agents loop."""
132
133 # ------------------ Initialization ------------------ #
134
135 from agents.tools.lunatic_agent_tools import create_agent_config as _create_agent_config
136
[docs]
137 @classmethod
138 def create_world_and_agent(
139 cls,
140 args: LaunchConfig,
141 *,
142 vehicle: Optional[carla.Vehicle] = None,
143 sim_world: carla.World,
144 settings_archetype: "Optional[type[AgentConfig]]" = None,
145 agent_config: Optional["LunaticAgentSettings"] = None,
146 overwrites: Optional[Dict[str, Any]] = None,
147 ) -> tuple[Self, WorldModel, GlobalRoutePlanner]:
148 """
149 Setup function to create the agent from the :py:class:`LaunchConfig` settings.
150
151 Note:
152 - :py:meth:`.GameFramework.init_agent_and_interface` is the preferred way to create to
153 instantiate the agent, only use this method if you try not do create a
154 :py:class:`GameFramework` object.
155 """
156 if overwrites is None:
157 overwrites = {}
158
159 if agent_config is None:
160 if hasattr(args, "agent"):
161 if settings_archetype is not None:
162 logger.warning("settings_archetype was passed but using args.agent. Ignoring settings_archetype.")
163 agent_config = args.agent
164 elif settings_archetype is not None and isinstance(settings_archetype, object):
165 logger.warning("settings_archetype is an instance. To pass an instance use agent_config instead.")
166 agent_config = settings_archetype # type: ignore
167 elif settings_archetype is not None:
168 logger.debug("Creating config from settings_archetype")
169 behavior = settings_archetype(overwrites)
170 agent_config = cls.BASE_SETTINGS.cast(behavior.to_dict_config())
171 else:
172 logger.debug("Using %s._base_settings %s to create config.", cls.__name__, cls.BASE_SETTINGS)
173 agent_config = cls.BASE_SETTINGS.cast(cls.BASE_SETTINGS.to_dict_config())
174 assert agent_config is not None
175 else:
176 logger.debug("A config was passed, using it as is.")
177
178 world_model = WorldModel(agent_config, args=args, carla_world=sim_world, player=vehicle) # TEST: without args
179 agent_config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage]
180
181 agent = cls(agent_config, world_model)
182 return agent, world_model, agent.get_global_planner()
183
184 def __init__(
185 self,
186 settings: Union[str, LunaticAgentSettings],
187 world_model: Optional[WorldModel] = None,
188 *,
189 vehicle: Optional[carla.Vehicle] = None,
190 overwrite_options: Optional[dict[str, Any]] = None,
191 debug: bool = True,
192 ):
193 """
194 Initialize the LunaticAgent.
195
196 Args:
197 settings :
198 The settings of the agent to construct the :py:attr:`LunaticAgent.config`.
199 It can be either a string pointing to a yaml file or a LunaticAgentSettings.
200 world_model : The world model to use. If None, a new WorldModel will be created.
201 vehicle : The vehicle controlled by the agent. Can be None then the :code:`world_model.player` will be used.
202 overwrite_options : Additional options to overwrite the default agent configuration.
203 debug : Whether to enable debug mode.
204 """
205 self._debug = debug
206 """Enables additional debug output."""
207 if world_model is None and len(sys.argv) > 1:
208 logger.error(
209 "BUG: Beware when not passing a WorldModel, the WorldModel currently ignores command line overrides, "
210 "i.e. will only use the config file.\n> Use `%s.create_world_and_agent` and provide the LaunchConfig instead.",
211 self.__class__.__name__,
212 )
213
214 # -------------------- Settings ------------------------
215 # Depending on the input type, the settings are created differently.
216
217 self.config = self._create_agent_config(settings, world_model, overwrite_options)
218
219 # -------------------------------
220
221 logger.info("\n\nAgent config is %s", OmegaConf.to_yaml(self.config))
222
223 # World Model
224 if world_model is None:
225 world_model = WorldModel(self.config, player=vehicle)
226 self.config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage]
227
228 self._world_model: WorldModel = world_model
229 self._world: carla.World = world_model.world
230
231 # Register Vehicle
232 assert world_model.player is not None
233 self.set_vehicle(world_model.player)
234
235 self.current_phase: Phase = Phase.NONE
236 """current phase of the agent inside the loop"""
237
238 self.ctx = None # type: ignore
239
240 self._last_traffic_light: Optional[carla.TrafficLight] = None
241 """Current red traffic light"""
242
243 # TODO: No more hardcoded defaults / set them from opt_dict which must have all parameters; check which are parameters and which are set by other functions (e.g. _look_ahead_steps)
244
245 # Parameters from BehaviorAgent ------------------------------------------
246 # todo: check redefinitions
247
248 self._look_ahead_steps: int = 0
249 """
250 updated in _update_information used for local_planner.get_incoming_waypoint_and_direction
251 """
252
253 # Initialize the planners
254 self._global_planner = (
255 CarlaDataProvider.get_global_route_planner()
256 ) # NOTE: THIS does not use self.config.planner.sampling_resolution
257 if not self._global_planner:
258 logger.error(
259 "Global Route Planner not set - This should not happen, if the CarlaDataProvider has been initialized."
260 )
261 self._global_planner = GlobalRoutePlanner(
262 CarlaDataProvider.get_map(), self.config.planner.sampling_resolution
263 )
264 CarlaDataProvider._grp = self._global_planner # pyright: ignore[reportPrivateUsage]
265
266 # Get the static elements of the scene
267 self._traffic_light_map: Dict[carla.TrafficLight, carla.Transform] = CarlaDataProvider._traffic_light_map # pyright: ignore[reportPrivateUsage]
268 self._lights_list = CarlaDataProvider._traffic_light_map.keys() # pyright: ignore[reportPrivateUsage]
269 self._lights_map: Dict[int, carla.Waypoint] = {}
270 """Dictionary mapping a traffic light to a wp corresponding to its trigger volume location"""
271
272 # Vehicle Lights
273 self._vehicle_lights = carla.VehicleLightState.NONE
274
275 # Collision Sensor # NOTE: another in the WorldModel for the HUD
276 self._set_collision_sensor()
277
278 # Rule Framework
279 # 1. add all rules from the class, if any - else this is a dict with empty lists
280 self.rules = deepcopy(self.__class__.DEFAULT_RULES) # Copies the ClassVar to the instance
281
282 # 2. add rules from the config
283 self.add_config_rules()
284
285 # Information Manager
286 self.information_manager = InformationManager(self)
287 self.current_states = self.information_manager.state_counter # share the dict
288
289 # Blocking Rules
290 self._active_blocking_rules = set()
291
292 # --------------------- Collision Callback ------------------------
293
294 def _set_collision_sensor(self):
295 # see: https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector
296 # and https://carla.readthedocs.io/en/latest/python_api/#carla.Sensor.listen
297 blueprint = CarlaDataProvider._blueprint_library.find("sensor.other.collision") # pyright: ignore[reportPrivateUsage]
298 self._collision_sensor: carla.Sensor = assure_type(
299 "carla.Sensor",
300 CarlaDataProvider.get_world().spawn_actor(blueprint, carla.Transform(), attach_to=self._vehicle),
301 )
302
303 self._collision_sensor.listen(self._collision_event)
304
305 from agents.substep_managers import collision_manager as _collision_manager
306
307 def _collision_event(self, event: carla.CollisionEvent):
308 """
309 Callback function for the collision sensor.
310
311 By default uses :py:func:`agents.substep_managers.collision_manager`.
312
313 Executes Phases:
314 - :py:class:`Phase.COLLISION | Phase.BEGIN<.Phase>`
315 - :py:class:`Phase.COLLISION | Phase.END<.Phase>`
316 """
317 # https://carla.readthedocs.io/en/latest/python_api/#carla.CollisionEvent
318 # e.g. setting ignore_vehicles to False, if it was True before.
319 # do an emergency stop (in certain situations)
320 try:
321 self.execute_phase(Phase.COLLISION | Phase.BEGIN, prior_results=event)
322 except AttributeError as e:
323 if "'NoneType' object has no attribute 'prior_result'" not in str(e):
324 raise
325 # context not yet set up, very early collision
326 result = self._collision_manager(event)
327 try:
328 self.execute_phase(Phase.COLLISION | Phase.END, prior_results=result)
329 except AttributeError as e:
330 if "'NoneType' object has no attribute 'prior_result'" not in str(e):
331 raise
332 # context not yet set up, very early collision
333
334 # --------------------- Adding rules ------------------------
335
[docs]
336 def add_rule(self, rule: Rule, position: Union[int, None] = None):
337 """
338 Add a rule to the agent. The rule will be inserted at the given position.
339
340 Args:
341 rule : The rule to add
342 position :
343 The position to insert the rule at.
344 If :python:`None` the rule list will be sorted by priority.
345 Defaults to :python:`None`.
346 """
347 for p in rule.phases:
348 if p not in self.rules:
349 logger.warning("Phase %s from Rule %s is not a default phase. Adding a new phase.", p, rule)
350 self.rules[p] = []
351 if position is None:
352 self.rules[p].append(rule)
353 self.rules[p].sort(key=lambda r: r.priority, reverse=True)
354 else:
355 self.rules[p].insert(position, rule)
356
[docs]
357 def add_rules(self, rules: "Rule | Iterable[Rule]"):
358 """Add a list of rules and sort the agents rules by priority."""
359 if isinstance(rules, Rule):
360 rules = [rules]
361 for rule in rules:
362 for phase in rule.phases:
363 self.rules[phase].append(rule)
364 for phase in self.rules.keys(): # noqa: SIM118
365 self.rules[phase].sort(key=lambda r: r.priority, reverse=True)
366
[docs]
367 def add_config_rules(self, config: Optional[Union[LunaticAgentSettings, List[RuleCreatingParameters]]] = None):
368 """
369 Adds rules
370 """
371 if config is None:
372 config = self.config
373 rule_list: List[RuleCreatingParameters]
374 # "rules" in config is also false for rules == MISSING
375 if isinstance(config, (AgentConfig, DictConfig)) and "rules" in config.keys(): # noqa: SIM118, RUF100
376 try:
377 rule_list = config.rules
378 except omegaconf.MissingMandatoryValue:
379 logger.debug("`rules` key was missing, skipping rule addition.")
380 return
381 else:
382 rule_list: List[RuleCreatingParameters] = config # type: ignore[assignment]
383 logger.debug("Adding rules from config:\n%s", OmegaConf.to_yaml(rule_list))
384 for rule in rule_list:
385 self.add_rules(rule_from_config(rule)) # each call could produce one or more rules
386
387 # --------------------- Properties ------------------------
388
389 @property
390 def live_info(self) -> LiveInfo:
391 return self.config.live_info
392
393 @property
394 def detection_matrix(self):
395 """
396 Returns :any:`DetectionMatrix.getMatrix` if the matrix is set.
397 """
398 if self._detection_matrix:
399 return self._detection_matrix.getMatrix()
400 return None
401
402 # ------------------ Hazard ------------------ #
403
404 @property
405 def detected_hazards(self) -> Set[Hazard]:
406 return self.ctx.detected_hazards
407
408 @property
409 def detected_hazards_info(self) -> Dict[Hazard, Any]:
410 """
411 Information about the detected hazards, e.g. severity.
412 """
413 return self.ctx.detected_hazards_info
414
415 @detected_hazards.setter
416 def detected_hazards(self, hazards: Set[Hazard]):
417 if not isinstance(hazards, set): # pyright: ignore[reportUnnecessaryIsInstance]
418 raise TypeError("detected_hazards must be a set of Hazards.")
419 self.ctx._detected_hazards = hazards # pyright: ignore[reportPrivateUsage]
420
421 # These have the same signature and can be used interchangeably
422 discard_hazard = Context.discard_hazard
423 add_hazard = Context.add_hazard
424 has_hazard = Context.has_hazard
425
426 #
427 @property
428 def current_traffic_light(self) -> "carla.TrafficLight | None":
429 """Alias to :py:attr:`self._last_traffic_light <_last_traffic_light>`."""
430 return self._last_traffic_light
431
432 @current_traffic_light.setter
433 def current_traffic_light(self, traffic_light: carla.TrafficLight):
434 self._last_traffic_light = traffic_light
435
436 @property
437 def phase_results(self) -> Dict[Phase, Any]:
438 """
439 Retrieves :py:attr:`agent.ctx.phase_results <classes.rule.Context.phase_results>`
440
441 Stores the results of the phases the agent has been in.
442 By default the keys are set to :any:`Context.PHASE_NOT_EXECUTED`.
443 """
444 return self.ctx.phase_results
445
446 @property
447 def active_blocking_rules(self) -> Set[BlockingRule]:
448 """
449 Blocking rules that are currently active and have taken over the agents loop.
450 """
451 return self._active_blocking_rules
452
453 @property
454 def _map(self) -> carla.Map:
455 """Get the current map of the world.""" # Needed *only* for set_destination
456 return CarlaDataProvider.get_map()
457
458 # ------------------
459
460 # @property
461 # def ctx(self) -> Union[Context, None]:
462 # print("Getting Context", self._ctx())
463 # return self._ctx() # might be None
464
465 # ------------------ Information functions ------------------ #
466
485
486 def _update_information(self, *, second_pass: bool = False):
487 """
488 This method updates the information regarding the ego
489 vehicle based on the surrounding world.
490
491 second_pass = True will skip some calculations,
492 especially useful if a second call to _update_information is necessary in the same tick.
493
494 Assumes: second_pass == True => agent.done() == False
495
496 Note:
497 Does not execute any Phase.
498 """
499 # --------------------------------------------------------------------------
500 # Information that is CONSTANT DURING THIS TICK and INDEPENDENT OF THE ROUTE
501 # --------------------------------------------------------------------------
502 if not second_pass:
503 if self._debug: # noqa: SIM102
504 if not self._lights_list and len(CarlaDataProvider._traffic_light_map.keys()): # pyright: ignore[reportPrivateUsage]
505 logger.error("Traffic light list is empty, but map is not.")
506
507 # ----------------------------
508 # First Pass for expensive and tick-constant information
509
510 # --- InformationManager ---
511 information: InformationManager.Information = (
512 self.information_manager.tick()
513 ) # NOTE: # Warning: Currently not route-dependant, might need to be changed later
514 self.tick_information = information
515
516 self._current_waypoint = information.current_waypoint
517 self.current_states = information.current_states
518
519 # Maybe access over subattribute, or properties
520 # NOTE: If other scripts, e.g. the scenario_runner, are used in parallel or sync=False
521 # the stored actors might could have been destroyed later on.
522
523 # Global Information
524 self.all_vehicles = information.vehicles
525 self.all_walkers = information.walkers
526 self.all_static_obstacles = information.static_obstacles
527 # Combination of the three lists
528 self.all_obstacles = information.obstacles
529
530 # Filtered by config.obstacles.nearby_vehicles_max_distance and nearby_vehicles_max_distance
531 self.vehicles_nearby = information.vehicles_nearby
532 self.walkers_nearby = information.walkers_nearby
533 self.static_obstacles_nearby = information.static_obstacles_nearby
534
535 # Combination of the three lists
536 self.all_obstacles_nearby = information.obstacles_nearby
537
538 self.traffic_lights_nearby = information.traffic_lights_nearby
539
540 # Find vehicles and walkers nearby; could be moved to the information manager
541
542 # ----------------------------
543
544 # Data Matrix
545 # update not every frame to save performance
546 if self._detection_matrix and self._detection_matrix.sync:
547 self._road_matrix_counter += 1
548 if (self._road_matrix_counter % self.config.detection_matrix.sync_interval) == 0:
549 # logger.debug("Updating Road Matrix")
550 # TODO: Still prevent async mode from using too much resources and slowing fps down too much.
551 self._detection_matrix.update() # NOTE: Does nothing if in async mode. self.road_matrix is updated by another thread.
552 # else:
553 # pass
554
555 # used for self._local_planner.get_incoming_waypoint_and_direction
556 self._look_ahead_steps = int(
557 (self.live_info.current_speed_limit) / 10
558 ) # TODO: Maybe make this an interpolation and make more use of it
559
560 # ----- Follow speed limits -------
561 # NOTE: This is once again set in the local planner, but only on the Context config!
562 if self.config.speed.follow_speed_limits:
563 self.config.speed.target_speed = self.config.live_info.current_speed_limit
564
565 # NOTE: This is the direction used by the planner in the *last* step.
566 self.live_info.executed_direction = self._local_planner.target_road_option
567
568 assert self.live_info.executed_direction == self._local_planner.target_road_option, (
569 "Executed direction should not change."
570 )
571
572 # -------------------------------------------------------------------
573 # Information that NEEDS TO BE UPDATED AFTER a plan / ROUTE CHANGE.
574 # -------------------------------------------------------------------
575 if not self.done():
576 # NOTE: This should be called after
577 self.live_info.incoming_waypoint, self.live_info.incoming_direction = (
578 self._local_planner.get_incoming_waypoint_and_direction( # pyright: ignore[reportAttributeAccessIssue]
579 steps=self._look_ahead_steps
580 )
581 )
582 else:
583 assert second_pass is False, (
584 "In the second pass the agent should have replanned and agent.done() should be False"
585 )
586 # Assumes second_pass is False
587 # Queue is empty
588 self.live_info.incoming_waypoint = None
589 self.live_info.incoming_direction = RoadOption.VOID
590
591 # Information that requires updated waypoint and route information:
592 self.live_info.is_taking_turn = self.is_taking_turn()
593 self.live_info.is_changing_lane = self.is_changing_lane()
594
595 # logger.debug(f"Incoming Direction: {str(self.live_info.incoming_direction):<20} - Second Pass: {second_pass}")
596
597 # RSS
598 # todo uncomment if agent is created after world model
599 # self.rss_set_road_boundaries_mode() # in case this was adjusted during runtime. # TODO: maybe implement this update differently. As here it is called unnecessarily often.
600
601 if self._debug:
602 OmegaConf.to_container(self.live_info, resolve=True, throw_on_missing=True)
603
[docs]
604 def is_taking_turn(self) -> bool:
605 """Checks if the agent is taking a turn in a few steps"""
606 return self.live_info.incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT)
607
[docs]
608 def is_changing_lane(self) -> bool:
609 """Checks if the agent is changing lanes in a few steps"""
610 return self.live_info.incoming_direction in (RoadOption.CHANGELANELEFT, RoadOption.CHANGELANERIGHT)
611
612 # ------------------ Step & Loop Logic ------------------ #
613
[docs]
614 def execute_phase(
615 self, phase: Phase, *, prior_results: Any, update_controls: Optional[carla.VehicleControl] = None
616 ) -> Context:
617 """
618 Sets the current phase of the agent and executes all rules that are associated with it.
619
620 Parameters:
621 phase : The phase to execute.
622 prior_results : The results of the previous phase, e.g. :py:attr:`detected_hazards`.
623 update_controls : Optionally controls that should be used from now onward.
624 """
625 normal_next = self.current_phase.next_phase() # sanity checking if everything is correct
626 if self._validate_phases:
627 assert (
628 normal_next in {phase, Phase.USER_CONTROLLED}
629 or phase & Phase.EXCEPTIONS
630 or phase & Phase.USER_CONTROLLED
631 ), (
632 f"Phase {phase} is not the next phase of {self.current_phase} or an exception phase. Expected {normal_next}"
633 )
634
635 self.current_phase = phase # set next phase
636
637 if update_controls is not None:
638 self.set_control(update_controls)
639 self.ctx.prior_result = prior_results
640 self.ctx.phase_results[phase] = prior_results
641
642 rules_to_check = self.rules.get(phase, ()) # use get if a custom phase is added, without a rule
643 try:
644 for rule in rules_to_check: # todo: maybe dict? grouped by phase?
645 assert self.current_phase in rule.phases, (
646 f"Current phase {self.current_phase} not in Rule {rule.phases}"
647 ) # TODO remove:
648 rule(self.ctx)
649 # NOTE: Blocking rules can change the and above assertion will fail.
650 if phase != self.current_phase:
651 logger.warning(
652 "Phase was changed by rule %s to %s. "
653 "Resting self.current_phase to %s. "
654 "To prevent his raise an exception in the rule or adjust the phase.",
655 rule,
656 self.current_phase,
657 phase,
658 )
659 self.current_phase = phase
660
661 except NoFurtherRulesException:
662 pass
663 except omegaconf.ReadonlyConfigError:
664 print(
665 "WARNING: A action likely tried to change `ctx.config` which is non-permanent. Use `ctx.agent.config.` instead."
666 )
667 raise
668 return self.ctx
669
670 def _plan_path_phase(self, *, second_pass: bool, debug: bool = False):
671 try:
672 self.execute_phase(Phase.PLAN_PATH | Phase.BEGIN, prior_results=None)
673 # User defined action
674 # TODO: when going around corners / junctions and the distance between waypoints is too big,
675 # We should replan and and make a more fine grained plan, to stay on the road.
676 self.execute_phase(Phase.PLAN_PATH | Phase.END, prior_results=None)
677 except UpdatedPathException as e:
678 if second_pass:
679 logger.warning(
680 "UpdatedPathException was raised in the second pass. This should not happen: %s. Restrict your rule on ctx.second_pass.",
681 e,
682 )
683 return self.run_step(debug, second_pass=True)
684 return None
685
686 def _make_context(self, last_context: Union[Context, None], **kwargs: Any) -> Context:
687 """Creates a new context object for the agent at the start of a step."""
688 if last_context is not None:
689 del last_context.last_context
690 ctx = Context(agent=self, last_context=last_context, **kwargs)
691 self.ctx = ctx
692 return ctx
693
[docs]
694 def __call__(self, debug: bool = False) -> carla.VehicleControl:
695 """Calculates the next vehicle control object."""
696 return self.run_step(debug, second_pass=False) # debug should be positional!
697
698 # Python 3.8+ add / for positional only arguments
[docs]
699 def run_step(self, debug: bool = False, second_pass: bool = False) -> carla.VehicleControl:
700 """
701 Calculates the next vehicle control object.
702
703 Arguments:
704 debug : Whether to enable debug mode.
705 This prints some more information and debug drawings.
706 second_pass : **Internal usage** set to :python:`True`
707 if this function is called a second time, e.g. after a route update.
708
709 Warning:
710 To be compatible with the :py:class:`.LunaticChallenger`, **always pass** :code:`debug` **as a positional argument**,
711 or use the :py:meth:`__call__` method.
712 """
713
714 if not second_pass:
715 ctx = self._make_context(last_context=self.ctx)
716 else:
717 ctx = self.ctx
718 ctx.second_pass = second_pass
719 try:
720 # ----------------------------
721 # Phase 0 - Update Information
722 # ----------------------------
723 # > Phase.UPDATE_INFORMATION | Phase.BEGIN
724 self.update_information(second_pass=second_pass)
725 # > Phase.UPDATE_INFORMATION | Phase.END
726
727 # ----------------------------
728 # Phase 1 - Plan Path
729 # ----------------------------
730
731 # Question: What TODO if the last phase was COLLISION (async), EMERGENCY
732 # Some information to PLAN_PATH should reflect this
733
734 # NOTE: Currently no option to diverge from existing path here, or plan a new path
735 # NOTE: Currently done in the local planner and behavior functions
736 self._plan_path_phase(second_pass=second_pass, debug=debug)
737
738 # Check whether the agent has reached its destination.
739 if self.done():
740 # NOTE: Might be in NONE phase here.
741 self.execute_phase(Phase.DONE | Phase.BEGIN, prior_results=None)
742 if self.done():
743 # No Rule set a new destination
744 logger.info("The target has been reached, stopping the simulation")
745 self.execute_phase(Phase.TERMINATING | Phase.BEGIN, prior_results=None)
746 raise AgentDoneException
747 self.execute_phase(Phase.DONE | Phase.END, prior_results=None)
748 return self.run_step(
749 debug, second_pass=True
750 ) # NOTE! For child classes like the leaderboard agent this calls the higher level run_step.
751
752 # ----------------------------
753 # Phase NONE - Before Running step
754 # ----------------------------
755 try:
756 planned_control = self._inner_step(debug=debug) # debug=True draws waypoints
757 if self.detected_hazards:
758 # The must_clear_hazard decorator will raise this, but in case the _inner_step is overwritten
759 raise EmergencyStopException(self.detected_hazards) # noqa: TRY301
760 # Reraise Exceptions
761 except UserInterruption:
762 raise
763
764 # Handled Exceptions
765 except EmergencyStopException as emergency:
766 # ----------------------------
767 # Phase Emergency
768 # no Rule with Phase.EMERGENCY | BEGIN cleared the provided hazards in ctx.prior_results
769 # ----------------------------
770
771 emergency_controls = self.emergency_manager(reasons=emergency.hazards_detected)
772
773 # TODO: somehow backup the control defined before.
774 self.execute_phase(
775 Phase.EMERGENCY | Phase.END,
776 update_controls=emergency_controls,
777 prior_results=emergency.hazards_detected,
778 )
779 planned_control = self.get_control() # type: carla.VehicleControl # type: ignore[assignment]
780
781 except SkipInnerLoopException as skip:
782 self.set_control(skip.planned_control)
783 self.current_phase = Phase.USER_CONTROLLED
784 planned_control = skip.planned_control
785 except UpdatedPathException as update_exception:
786 if second_pass > 5:
787 logger.warning(
788 "UpdatedPathException was raised more than %s times. Warning: This might be an infinite loop.",
789 second_pass,
790 )
791 elif second_pass > 50:
792 raise RecursionError(
793 "UpdatedPathException was raised more than 50 times. Assuming an infinite loop and terminating"
794 ) from update_exception
795 else:
796 logger.warning(
797 "%s was raised in the inner step, this should be done in Phase.PLAN_PATH instead.",
798 update_exception,
799 )
800 return self.run_step(second_pass=int(second_pass) + 1) # type: ignore
801 except LunaticAgentException as lae:
802 if self.ctx.control is None:
803 msg = (
804 f"A VehicleControl object must be set on the agent when {type(lae).__name__} is "
805 "raised during `._inner_step`"
806 )
807 raise ValueError(msg) from lae
808 planned_control = self.get_control() # type: ignore[assignment]
809 # assert ctx.control
810
811 # ----------------------------
812 # No known Phase multiple exit points
813 # ----------------------------
814
815 # ----------------------------
816 # Phase RSS - Check RSS
817 # ----------------------------
818
819 ctx = self.execute_phase(
820 Phase.RSS_EVALUATION | Phase.BEGIN, prior_results=None, update_controls=planned_control
821 )
822 if AD_RSS_AVAILABLE and self.config.rss and self.config.rss.enabled:
823 rss_updated_controls = self._world_model.rss_check_control(ctx.control) # type: ignore[arg-type]
824 else:
825 rss_updated_controls = None
826 # NOTE: rss_updated_controls could be None.
827 ctx = self.execute_phase(Phase.RSS_EVALUATION | Phase.END, prior_results=rss_updated_controls)
828
829 # if ctx.control is not planned_control:
830 # logger.debug("RSS updated control accepted.")
831
832 except ContinueLoopException as e:
833 logger.debug("ContinueLoopException skipping rest of loop.")
834 if self.ctx.control is None:
835 msg = f"A VehicleControl object must be set on the agent when {type(e).__name__} is raised during `._inner_step`"
836 raise ValueError(msg) from e
837
838 planned_control = self.ctx.control # type: carla.VehicleControl # type: ignore[assignment]
839 planned_control.manual_gear_shift = False
840 return self.get_control() # type: ignore[return-value]
841
[docs]
842 @must_clear_hazard
843 @result_to_context("control")
844 def _inner_step(self, debug: bool = False) -> carla.VehicleControl:
845 """
846 This is is the internal function to provide the next control object for
847 the agent; it should run every tick.
848
849 Raises:
850 EmergencyStopException: If :py:attr:`detected_hazards` is not empty when the
851 function returns.
852
853 :meta public:
854 """
855 self.debug = debug
856
857 # ----------------------------
858 # Phase 2 - Detection of Pedestrians and Traffic Lights
859 # ----------------------------
860
861 # Detect hazards
862 # phases are executed in detect_hazard
863 # > Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN # phases executed inside
864 pedestrians_and_tlight_hazard = self.detect_hazard()
865 # > Phase.DETECT_PEDESTRIANS | Phase.END
866
867 # Pedestrian avoidance behaviors
868 # currently doing either emergency (detect_hazard) stop or nothing
869 if self.detected_hazards:
870 # ----------------------------
871 # Phase Hazard Detected (traffic light or pedestrian)
872 # If no Rule with Phase.EMERGENCY | BEGIN clears pedestrians_or_traffic_light
873 # An EmergencyStopException is raised
874 # ----------------------------
875 self.react_to_hazard(pedestrians_and_tlight_hazard) # Optional[NoReturn]
876
877 # -----------------------------
878 # Phase Detect Static Obstacles
879 # -----------------------------
880
881 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.BEGIN, prior_results=None)
882 static_obstacle_detection_result = self.detect_obstacles_in_path(self.static_obstacles_nearby)
883 if static_obstacle_detection_result.obstacle_was_found:
884 self.current_states[AgentState.BLOCKED_BY_STATIC] += 1
885 # Must plan around it
886 self.add_hazard(Hazard.STATIC_OBSTACLE)
887 else:
888 self.current_states[AgentState.BLOCKED_BY_STATIC] = 0
889 # TODO: add a basic rule for circumventing static obstacles
890 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.END, prior_results=static_obstacle_detection_result)
891 # Not throwing an error here yet
892
893 # ----------------------------
894 # Phase 3 - Detection of Cars
895 # ----------------------------
896
897 self.execute_phase(Phase.DETECT_CARS | Phase.BEGIN, prior_results=None) # TODO: Maybe add some prio result
898 vehicle_detection_result = self.detect_obstacles_in_path(self.vehicles_nearby)
899
900 # TODO: add a way to let the execution overwrite
901 if vehicle_detection_result.obstacle_was_found:
902 # ----------------------------
903 # Phase 2.A - React to cars in front
904 # TODO: turn this into a rule.
905 # remove CAR_DETECTED -> pass detection_result to rules
906 # TODO some way to circumvent returning control here, like above.
907 # TODO: Needs refinement with the car_following_behavior
908 # ----------------------------
909
910 self.execute_phase(Phase.CAR_DETECTED | Phase.BEGIN, prior_results=vehicle_detection_result)
911 control = self.car_following_behavior(*vehicle_detection_result) # type: ignore[arg-type]
912 # NOTE: might throw EmergencyStopException
913 self.execute_phase(
914 Phase.CAR_DETECTED | Phase.END, update_controls=control, prior_results=vehicle_detection_result
915 )
916 return self.get_control() # type: ignore[return-value]
917
918 # TODO: maybe new phase instead of END or remove CAR_DETECTED and handle as rules (maybe better)
919 self.execute_phase(Phase.DETECT_CARS | Phase.END, prior_results=None) # NOTE: avoiding tailgate here
920
921 # Intersection behavior
922 # NOTE: is_taking_turn <- incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT)
923 if self.live_info.incoming_waypoint.is_junction and self.is_taking_turn(): # pyright: ignore[reportOptionalMemberAccess]
924 # ----------------------------
925 # Phase Turning at Junction
926 # ----------------------------
927
928 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.BEGIN, prior_results=None)
929 control = self._calculate_control(debug=debug)
930 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.END, update_controls=control, prior_results=None)
931 return self.get_control() # type: ignore[return-value]
932
933 # ----------------------------
934 # Phase 4 - Plan Path normally
935 # ----------------------------
936
937 # Normal behavior
938 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.BEGIN, prior_results=None)
939 control = self._calculate_control(debug=debug)
940 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.END, prior_results=None, update_controls=control)
941
942 # Leave loop and apply controls outside
943 return self.get_control() # type: ignore[return-value]
944
945 def _calculate_control(self, debug: bool = False):
946 """
947 Plan the next step of the agent. This will execute the local planner
948 to retrieve the next control fitting the current path and settings.
949
950 Note:
951 This is the innermost function of the agents run_step function.
952 It should be called each step to acquire a desired control object.
953 Use this function inside rules if a control object is desired.
954
955 **[Context.get_or_calculate_control](#Context.get_or_calculate_control)
956 is a safer alternative to this function**
957
958 Warning:
959 If you do not use this function in a [`BlockingRule`](#BlockingRule)
960 you should raise a `SkipInnerLoopException` or `ContinueLoopException`
961 else the planned path will skip a waypoint.
962
963 Warning:
964 This function only calculates and returns the control object directly.
965 **It does not set the :py:attr:`agent/ctx.control <control>` attribute which is the one
966 the agent uses in [`apply_control`](#apply_control) to apply the final controls.**
967 """
968 if self.ctx.control is not None:
969 logger.error("Control was set before calling _calculate_control. This might lead to unexpected behavior.")
970
971 return self._local_planner.run_step(debug)
972
996
[docs]
997 def apply_control(self, control: Optional[carla.VehicleControl] = None):
998 """
999 Applies the control to the agent's actor.
1000 Will execute the :py:class:`Phase.EXECUTION | Phase.BEGIN <classes.constants.Phase>`
1001 and :py:class:`Phase.EXECUTION | Phase.END <classes.constants.Phase>` phases.
1002
1003 Note:
1004 The final control object that is applied to the agent's actor
1005 is stored in the :py:attr:`ctx.control <ctx>` attribute.
1006
1007 Raises ValueError:
1008 If the control object is not set, i.e. :py:meth:`get_control` returns :python:`None`.
1009 """
1010 if control is None:
1011 control = self.get_control()
1012 if control is None:
1013 raise ValueError(
1014 "The agent has not yet performed a step this tick and has no control object was passed."
1015 )
1016 if self.current_phase != Phase.EXECUTION | Phase.BEGIN:
1017 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control, update_controls=control)
1018 else:
1019 logger.debug("Agent is already in execution phase.")
1020 # Set automatic control-related vehicle lights
1021 final_control: carla.VehicleControl = self.get_control() # type: ignore[assignment]
1022 self._update_lights(final_control)
1023 self._vehicle.apply_control(final_control)
1024 self.execute_phase(Phase.EXECUTION | Phase.END, prior_results=final_control)
1025
1026 # ------------------ Hazard Detection & Reaction ------------------ #
1027
1028 from agents.substep_managers import detect_traffic_light # -> TrafficLightDetectionResult
1029
1030 traffic_light_manager = detect_traffic_light # pyright: ignore[reportAssignmentType]
1031 """Alias of :py:meth:`detect_traffic_light`"""
1032
[docs]
1033 def detect_hazard(self) -> Set[Hazard]:
1034 """
1035 Checks for red traffic lights and pedestrians in the agents path.
1036
1037 If :py:attr:`.LunaticAgentSettings.obstacles.detect_yellow_tlights` is set to :python:`True`,
1038 then yellow traffic lights will also be regarded as a hazard that can trigger an
1039 :py:exc:`EmergencyStopException` in :py:meth:`react_to_hazard` that is executed after this
1040 function.
1041 """
1042 # Red lights and stops behavior
1043
1044 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN, prior_results=None)
1045 tlight_detection_result: TrafficLightDetectionResult = self.detect_traffic_light()
1046 if tlight_detection_result.traffic_light_was_found:
1047 if tlight_detection_result.traffic_light.state == carla.TrafficLightState.Red: # pyright: ignore[reportOptionalMemberAccess]
1048 self.add_hazard(Hazard.TRAFFIC_LIGHT_RED)
1049 else: # NOTE: self.config.obstacles.detect_yellow_tlights must be True
1050 self.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, HazardSeverity.WARNING)
1051
1052 # assert self.live_info.next_traffic_light.id == tlight_detection_result.traffic_light.id, "Next assumed traffic light should be the same as the detected one." # TEMP
1053
1054 # NOTE next tlight is the next bounding box and might not be the next "correct" one
1055 # self.live_info.next_traffic_light.id != tlight_detection_result.traffic_light.id:
1056
1057 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.END, prior_results=tlight_detection_result)
1058
1059 # Pedestrian avoidance behaviors
1060 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.BEGIN, prior_results=None)
1061 is_dangerous, detection_result = self.pedestrian_avoidance_behavior()
1062 if detection_result.obstacle_was_found:
1063 if is_dangerous:
1064 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.EMERGENCY)
1065 else:
1066 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.WARNING)
1067 # NOTE: its a flag, could pack all in one bin
1068 # Pro: easier to check
1069 # Con: when to remove other states like warning
1070 # Make it a dict with the state as key and the detection result as value!
1071 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.END, prior_results=(is_dangerous, detection_result))
1072
1073 return self.detected_hazards
1074
[docs]
1075 def react_to_hazard(self, hazard_detected: Union[Hazard, Iterable[Hazard], None]) -> Optional[NoReturn]:
1076 """
1077 Called when a hazard was detected-
1078
1079 Will store the detected hazards in the Context: `ctx.prior_result`
1080 If no rule clears this variable, the agent will throw a EmergencyStopException
1081
1082 Raises:
1083 EmergencyStopException: If a hazard was detected and no rule cleared it.
1084 """
1085
1086 # update state? prevent flodding of log information
1087 # logger.info("Hazard(s) detected: %s", self.detected_hazards)
1088 if hazard_detected:
1089 if not isinstance(hazard_detected, Hazard):
1090 self.detected_hazards.update(hazard_detected)
1091 else:
1092 self.detected_hazards.add(hazard_detected)
1093
1094 if self.detected_hazards:
1095 self.execute_phase(Phase.EMERGENCY | Phase.BEGIN, prior_results=hazard_detected)
1096 else:
1097 logger.info("react_to_hazard was called without any detected hazards.")
1098 if self.detected_hazards:
1099 raise EmergencyStopException(self.detected_hazards)
1100 logger.info("Hazards have been cleared.")
1101
1102 # ------------------ Behaviors ------------------ #
1103 # TODO: Section needs overhaul -> turn into rules
1104
[docs]
1105 def pedestrian_avoidance_behavior(self) -> tuple[bool, ObstacleDetectionResult]:
1106 """
1107 Detects pedestrians in the agents path.
1108
1109 Returns:
1110 A tuple containing a boolean indicating if the detected pedestrian is dangerous
1111 and the detection result.
1112 """
1113 # note ego_vehicle_wp is the current waypoint self._current_waypoint
1114 detection_result = self.detect_obstacles_in_path(self.walkers_nearby)
1115 if detection_result.obstacle_was_found and (
1116 detection_result.distance
1117 - max(
1118 detection_result.obstacle.bounding_box.extent.y, # pyright: ignore[reportOptionalMemberAccess]
1119 detection_result.obstacle.bounding_box.extent.x,
1120 ) # pyright: ignore[reportOptionalMemberAccess]
1121 - max(self._vehicle.bounding_box.extent.y, self._vehicle.bounding_box.extent.x)
1122 < self.config.distance.emergency_braking_distance
1123 ):
1124 # print("Detected walker", detection_result.obstacle)
1125 # NOTE: should slow down here
1126 return True, detection_result
1127 # NOTE detected but not stopping -> ADD avoidance behavior
1128 if detection_result.obstacle_was_found:
1129 logger.debug("Detected a pedestrian but determined no intervention necessary (too far away).")
1130 return False, detection_result
1131
[docs]
1132 def car_following_behavior(
1133 self,
1134 vehicle_detected: bool, # noqa: ARG002,RUF100 # pylint: disable=unused-argument
1135 vehicle: carla.Actor,
1136 distance: float,
1137 ) -> carla.VehicleControl:
1138 """
1139 Parameters:
1140 Must match :py:class:`.ObstacleDetectionResult`
1141
1142 Assumes:
1143 - That an obstacle was detected:
1144 :py:attr:`vehicle_detected<.ObstacleDetectionResult.obstacle_was_found> is True and
1145 :py:attr:`vehicle<.ObstacleDetectionResult.obstacle>` is the detected vehicle.
1146 """
1147 exact_distance = (
1148 distance
1149 - max(vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x)
1150 - max(self._vehicle.bounding_box.extent.y, self._vehicle.bounding_box.extent.x)
1151 )
1152
1153 if exact_distance < self.config.distance.emergency_braking_distance:
1154 # Note: If the passed set is not cleared by a Phase.EMERGENCY | Phase.BEGIN rule,
1155 # an EmergencyStopException is raised.
1156 self.add_hazard(Hazard.CAR)
1157 self.react_to_hazard(hazard_detected={Hazard.CAR}) # Optional[NoReturn]
1158 controls = self.car_following_manager(vehicle, exact_distance)
1159 return controls
1160
1161 # ------------------ External Helpers ------------------ #
1162
1163 # Moved outside of the class for organization
1164 from agents.substep_managers import (
1165 car_following_manager, # pyright: ignore[reportAssignmentType]
1166 emergency_manager,
1167 )
1168
1169 # Subfunction of traffic_light_manager. In traffic_light_manager the parameters are chosen automatically
1170 # which is why traffic_light_manager should be used instead.
1171 # Kept for backwards compatibility and possible future use.
1172 from agents.substep_managers.traffic_light import affected_by_traffic_light
1173 from agents.tools.lunatic_agent_tools import detect_obstacles_in_path
1174
1175 # ----
1176
1177 # @override
[docs]
1178 def add_emergency_stop(
1179 self, control: carla.VehicleControl, reasons: "Optional[set[str]]" = None
1180 ) -> carla.VehicleControl:
1181 """
1182 Modifies the control values to perform an emergency stop.
1183 The steering remains unchanged to avoid going out of the lane during turns.
1184
1185 :param control: (carla.VehicleControl) control to be modified
1186 :param enable_random_steer: (bool, optional) Flag to enable random steering
1187 """
1188 return self.emergency_manager(reasons=reasons, control=control) # type: ignore[arg-type]
1189
[docs]
1190 def lane_change(
1191 self,
1192 direction: Literal["left", "right"],
1193 same_lane_time: float = 0,
1194 other_lane_time: float = 0,
1195 lane_change_time: float = 2,
1196 *,
1197 check: bool = False,
1198 ):
1199 """
1200 Changes the path so that the vehicle performs a lane change.
1201 Use 'direction' to specify either a 'left' or 'right' lane change,
1202 and the time parameters to fine tune the maneuver.
1203
1204 Steps for the lane change:
1205 1. **same_lane_time** seconds in the same lane.
1206 2. **lane_change_time** seconds to reach the other lane.
1207 3. **other_lane_time** seconds to stay in the other lane.
1208
1209 Parameters:
1210 waypoint: The starting waypoint.
1211 direction: The direction of the lane change, either 'left' or 'right'.
1212 Defaults to 'left'.
1213 same_lane_time: The time to follow the same lane before the lane change.
1214 other_lane_time: The time to follow the other lane after the lane change.
1215 lane_change_time: The time to reach the center of the last lane.
1216 A low value will make a fast lane change, while a high value will make slow lane change.
1217 check: If :python:`True`, the function will check if the lane change is possible, i.e.
1218 if there is a lane of :py:class:`carla.LaneType.Driving <carla.LaneType>` in
1219 the desired direction. Otherwise it can change to other lane types as well.
1220 Defaults to :code:`False`.
1221
1222 See Also:
1223 - :py:func:`agents.tools.lunatic_agent_tools.generate_lane_change_path`
1224 - :py:meth:`set_global_plan`
1225 """
1226 speed = self.live_info.current_speed / 3.6 # m/s
1227 # This is a staticfunction from BasicAgent function
1228 path: list[tuple[carla.Waypoint, RoadOption]] = self._generate_lane_change_path(
1229 self._current_waypoint, # NOTE: Assuming exact_waypoint
1230 direction,
1231 same_lane_time * speed, # get direction in meters t*V
1232 other_lane_time * speed,
1233 lane_change_time * speed,
1234 check=check,
1235 lane_changes=1, # changes only one lane
1236 step_distance=self.config.planner.sampling_resolution,
1237 )
1238 if not path:
1239 logger.info("Ignoring the lane change as no path was found")
1240
1241 # Change path to take now
1242 # NOTE: use super with arguments here.
1243 super(LunaticAgent, self).set_global_plan(path) # noqa: UP008
1244 # TODO: # CRITICAL: Keep old global plan if it is some end goal -> Restore it.
1245
1246 # TODO: Use generate_lane_change_path to finetune
[docs]
1247 def make_lane_change(
1248 self,
1249 order: Iterable[Literal["left", "right"]] = ["left", "right"],
1250 up_angle_th: int = 180,
1251 low_angle_th: int = 0,
1252 ) -> "Literal[True] | None":
1253 """
1254 Move to the left/right lane if possible
1255
1256 Args:
1257 order : The order in
1258 which the agent should try to change lanes. If a single string is given, the agent
1259 will try to change to that lane. Default is :python:`["left", "right"]`.
1260 up_angle_th : The angle threshold for the upper limit of obstacle detection in the other lane.
1261 Default is 180 degrees, meaning that the agent will detect obstacles ahead.
1262 low_angle_th : The angle threshold for the lower limit of obstacle detection in the other lane.
1263 Default is 0 degrees, meaning that the agent will detect obstacles behind.
1264
1265 Assumes:
1266 - :python:`(self.config.live_info.incoming_direction == RoadOption.LANEFOLLOW \
1267 and not waypoint.is_junction and self.config.live_info.current_speed > 10)`
1268 - :python:`check_behind.obstacle_was_found and self.config.live_info.current_speed < get_speed(check_behind.obstacle)`
1269 """
1270 vehicle_list = self.vehicles_nearby
1271 waypoint = self._current_waypoint # todo use a getter
1272
1273 # There is a faster car behind us
1274 if isinstance(order, str):
1275 order = [order] # type: ignore
1276
1277 for direction in order:
1278 if direction == "right":
1279 right_turn = waypoint.right_lane_marking.lane_change
1280 can_change = right_turn in {carla.LaneChange.Right, carla.LaneChange.Both}
1281 other_wpt = waypoint.get_right_lane()
1282 lane_offset = 1
1283 elif direction == "left":
1284 left_turn = waypoint.left_lane_marking.lane_change
1285 can_change = left_turn in {carla.LaneChange.Left, carla.LaneChange.Both}
1286 other_wpt = waypoint.get_left_lane()
1287 lane_offset = -1
1288 else:
1289 msg = f"Direction must be 'left' or 'right', was {direction}"
1290 raise ValueError(msg)
1291 if (
1292 can_change # other_wpt is not None
1293 and lanes_have_same_direction(waypoint, other_wpt) # type: ignore[arg-type]
1294 and other_wpt.lane_type == carla.LaneType.Driving
1295 ): # type: ignore[attr-defined]
1296 # Detect if right lane is free
1297 detection_result = detect_vehicles(
1298 self,
1299 vehicle_list,
1300 self.max_detection_distance("other_lane"),
1301 up_angle_th=up_angle_th,
1302 low_angle_th=low_angle_th,
1303 lane_offset=lane_offset,
1304 )
1305 if not detection_result.obstacle_was_found:
1306 logger.debug(
1307 "Change Lane, moving to the %s! Reason: %s",
1308 direction,
1309 "Overtaking" if tuple(order) == ("left", "right") else "Tailgating",
1310 )
1311
1312 end_waypoint = self._local_planner.target_waypoint
1313 # TODO: How to set waypoint order? Or better use generate_lane_change_path!
1314 self.set_destination(
1315 end_location=other_wpt.transform.location, # type: ignore[arg-type]
1316 start_location=end_waypoint.transform.location,
1317 clean_queue=True,
1318 )
1319 return True
1320 return None
1321
1322 # ------------------ Other Function ------------------ #
1323
1324 def _update_lights(self, vehicle_control: carla.VehicleControl):
1325 """Updates the light of the vehicle in the simulation."""
1326 current_lights: carla.VehicleLightState = self._vehicle_lights
1327 if vehicle_control.brake:
1328 current_lights |= carla.VehicleLightState.Brake
1329 else: # Remove the Brake flag
1330 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Brake
1331 if vehicle_control.reverse:
1332 current_lights |= carla.VehicleLightState.Reverse
1333 else: # Remove the Reverse flag
1334 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Reverse
1335 if current_lights != self._vehicle_lights: # Change the light state only if necessary
1336 self._vehicle_lights = current_lights
1337 self._vehicle.set_light_state(carla.VehicleLightState(self._vehicle_lights))
1338
1339 def render_detection_matrix(self, display: "pygame.Surface", **options: Unpack["DetectionMatrix.RenderOptions"]):
1340 """
1341 See Also:
1342 - :py:meth:`DetectionMatrix.render`
1343
1344 Attention:
1345 **options** must match the arguments of :py:meth:`DetectionMatrix.render`.
1346
1347 :meta private:
1348 """
1349 if self._detection_matrix:
1350 # options should align with CameraConfig.DetectionMatrixHudConfig
1351 self._detection_matrix.render(display, **options)
1352
[docs]
1353 def verify_settings(
1354 self,
1355 config: Optional[LunaticAgentSettings] = None,
1356 *,
1357 verify_dataclass: Union["type[AgentConfig]", bool] = True,
1358 strictness: Literal[-1, 0, 1, 2, 3, 4] = 0,
1359 ):
1360 """
1361 Verifies the settings of the LunaticAgent.
1362 Foremost this checks if the :py:obj:`.planner.dt` value has been set
1363 to the speed of the world ticks in synchronous mode.
1364 Secondly if :python:`verify_dataclass=True` or a different AgentConfig class is provided,
1365 it will check for correct type usage.
1366
1367 Args:
1368 config: The configuration to verify.
1369 If not provided, the agent's default configuration will be used.
1370 Defaults to :code:`None`.
1371 verify_dataclass:
1372 Determines the dataclass to use for verification.
1373 If :python:`True`, the :py:attr:`.BASE_SETTINGS` dataclass will be used.
1374 See :py:meth:`AgentConfig.check_config` for more details.
1375 Defaults to True.
1376 strictness:
1377 The strictness level for :py:meth:`AgentConfig.check_config`.
1378 Defaults to :python:`3`.
1379
1380 Raises:
1381 TypeError: If **verify_dataclass** is not a valid AgentConfig subclass or True.
1382 MissingMandatoryValue: If :python:`config.planner.dt` is not present
1383 or not a :python:`float`
1384 """
1385 config = config or self.config
1386
1387 if self._world_model.world_settings.synchronous_mode:
1388 # Assure that dt is set
1389 if isinstance(config, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance]
1390 OmegaConf.select(config, "planner.dt", throw_on_missing=True)
1391 elif not isinstance(config.planner.dt, float):
1392 from omegaconf import MissingMandatoryValue
1393
1394 raise MissingMandatoryValue(
1395 "`config.planner.dt` needs to be set to a value. Cannot be:",
1396 type(config.planner.dt),
1397 config.planner.dt,
1398 )
1399
1400 if strictness < 0:
1401 return
1402
1403 # NOTE: Below is experimental and might fail as the config at this point has already been setup
1404
1405 from agents.tools import config_creation
1406
1407 old_value = config_creation._WARN_LIVE_INFO # pyright: ignore[reportPrivateUsage]
1408 config_creation._WARN_LIVE_INFO = False # pyright: ignore[reportPrivateUsage]
1409
1410 if verify_dataclass:
1411 if verify_dataclass is True:
1412 dataclass = self.BASE_SETTINGS
1413 elif issubclass(verify_dataclass, AgentConfig): # pyright: ignore[reportUnnecessaryIsInstance]
1414 dataclass = verify_dataclass
1415 else:
1416 raise TypeError("`verify_dataclass` must be a `AgentConfig` or `True` to select the BASE_SETTINGS ")
1417 dataclass.check_config(config, dataclass.get("strict_config", strictness), as_dict_config=True)
1418
1419 config_creation._WARN_LIVE_INFO = old_value # pyright: ignore[reportPrivateUsage]
1420
1421 # ------------------ Getter Function ------------------ #
1422
[docs]
1423 def get_control(self):
1424 """
1425 Returns the currently planned control of the agent.
1426
1427 If retrieved before the local planner has been run, it will return None.
1428 """
1429 return self.ctx.control
1430
1431 # ------------------ Setter Function ------------------ #
1432
[docs]
1433 def set_control(self, control: carla.VehicleControl):
1434 """
1435 Set new controls for the agent. Must be called before apply_control.
1436
1437 Raises:
1438 ValueError: If the control is None.
1439 """
1440 self.ctx.control = control
1441
1442 def _init_detection_matrix(self) -> None:
1443 if self.config.detection_matrix and self.config.detection_matrix.enabled:
1444 if self.config.detection_matrix.sync and self._world_model.world_settings.synchronous_mode:
1445 self._detection_matrix = DetectionMatrix(self._vehicle)
1446 else:
1447 self._detection_matrix = AsyncDetectionMatrix(self._vehicle)
1448 self._detection_matrix.start()
1449 else:
1450 self._detection_matrix = None
1451 self._road_matrix_counter = 0
1452
1453 if READTHEDOCS or TYPE_CHECKING:
1454 # From the parent class, but without type-hints
[docs]
1455 def set_destination(
1456 self,
1457 end_location: carla.Location,
1458 start_location: Optional[carla.Location] = None,
1459 clean_queue: bool = True,
1460 ) -> None: ...
1461
1462 def set_vehicle(self, vehicle: carla.Actor) -> None:
1463 """
1464 Set the vehicle for the agent (experimental if applied a second time)
1465
1466 :meta private:
1467 """
1468 self._vehicle = assure_type("carla.Vehicle", vehicle)
1469 # do not register same id twice
1470 if all(actor.id != vehicle.id for actor in CarlaDataProvider._actor_velocity_map): # pyright: ignore[reportPrivateUsage]
1471 with contextlib.suppress(KeyError):
1472 CarlaDataProvider.register_actor(vehicle, vehicle.get_transform()) # pyright: ignore[reportUnknownMemberType]
1473 else:
1474 # Exchange the key for a faster lookup
1475 for key in CarlaDataProvider._actor_velocity_map: # pyright: ignore[reportPrivateUsage]
1476 if key.id == vehicle.id:
1477 break
1478 else:
1479 # This does not happen because of the first if condition
1480 raise ValueError("The vehicle was not registered in the actor map.")
1481 CarlaDataProvider._actor_velocity_map[vehicle] = CarlaDataProvider._actor_velocity_map.pop(key) # pyright: ignore[reportPrivateUsage]
1482 CarlaDataProvider._actor_transform_map[vehicle] = CarlaDataProvider._actor_transform_map.pop(key) # pyright: ignore[reportPrivateUsage]
1483 CarlaDataProvider._actor_location_map[vehicle] = CarlaDataProvider._actor_location_map.pop(key) # pyright: ignore[reportPrivateUsage]
1484
1485 self._init_detection_matrix()
1486 self._local_planner = DynamicLocalPlannerWithRss(
1487 self,
1488 map_inst=CarlaDataProvider.get_map(),
1489 world=CarlaDataProvider.get_world(),
1490 rss_sensor=self._world_model.rss_sensor,
1491 )
1492
1493 # @override
[docs]
1494 def set_target_speed(self, speed: float) -> None:
1495 """
1496 Changes the target speed of the agent.
1497 :param speed (float): target speed in Km/h
1498 """
1499 if self.config.speed.follow_speed_limits:
1500 print(
1501 "WARNING: The max speed is currently set to follow the speed limits. "
1502 "Use 'follow_speed_limits' to deactivate this"
1503 )
1504 self.config.speed.target_speed = speed # shared with planner
1505
[docs]
1506 def follow_speed_limits(self, value: bool = True) -> None:
1507 """
1508 If active, the agent will dynamically change the target speed according to the speed limits.
1509
1510 Arguments:
1511 value: Whether to activate this behavior
1512 """
1513 self.config.speed.follow_speed_limits = value
1514
[docs]
1515 def ignore_traffic_lights(self, active: bool = True) -> None:
1516 """(De)activates the checks for traffic lights"""
1517 self.config.obstacles.ignore_traffic_lights = active
1518
[docs]
1519 def ignore_stop_signs(self, active: bool = True) -> None:
1520 """(De)activates the checks for stop signs"""
1521 self.config.obstacles.ignore_stop_signs = active
1522
[docs]
1523 def ignore_vehicles(self, active: bool = True) -> None:
1524 """(De)activates the checks for stop signs"""
1525 self.config.obstacles.ignore_vehicles = active
1526
1533
1534 # ------------------ Overwritten & Outsourced functions ------------------ #
1535
1536 from agents.tools.lunatic_agent_tools import detect_vehicles, max_detection_distance
1537
1538 # signature of parent is str and not Literal["left", "right"]
1539 from agents.tools.lunatic_agent_tools import (
1540 generate_lane_change_path as _generate_lane_change_path, # pyright: ignore[reportAssignmentType]
1541 )
1542
1543 # NOTE: the original pedestrian_avoid_manager is still usable
1544 def pedestrian_avoid_manager(self, waypoint) -> NoReturn: # type: ignore
1545 """
1546 This function was replaced by ", substep_managers.pedestrian_detection_manager
1547
1548 :meta private:
1549 """
1550 raise NotImplementedError("This function was replaced by ", substep_managers.pedestrian_detection_manager)
1551
1552 # @override
1553 def collision_and_car_avoid_manager(self, waypoint) -> NoReturn: # type: ignore
1554 """
1555 This function was split into detect_obstacles_in_path and car_following_manager
1556
1557 :meta private:
1558 """
1559 raise NotImplementedError("This function was split into detect_obstacles_in_path and car_following_manager")
1560
1561 # @override
1562 def _tailgating(self, waypoint, vehicle_list) -> NoReturn: # type: ignore
1563 raise NotImplementedError("Tailgating has been implemented as a rule")
1564
1565 # @override
1566 def emergency_stop(self) -> NoReturn:
1567 """
1568 :meta private:
1569 """
1570 raise NotImplementedError("This function was overwritten use `add_emergency_stop` instead")
1571
1572 # ------------------------------------ #
1573 # As reference Parent Functions
1574 # ------------------------------------ #
1575 # def get_local_planner(self):
1576 # def get_global_planner(self):
1577
1578 # def done(self): # from base class self._local_planner.done()
1579
1580 # def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True):
1581 """
1582 Adds a specific plan to the agent.
1583
1584 :param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed
1585 :param stop_waypoint_creation: stops the automatic random creation of waypoints
1586 :param clean_queue: resets the current agent's plan
1587 """
1588
1589 # def trace_route(self, start_waypoint, end_waypoint):
1590 """
1591 Calculates the shortest route between a starting and ending waypoint.
1592
1593 :param start_waypoint (carla.Waypoint): initial waypoint
1594 :param end_waypoint (carla.Waypoint): final waypoint
1595 """
1596
1597 # ---------------- Cleanup ------------------- #
1598
1599 def _destroy_sensor(self):
1600 if self._detection_matrix:
1601 self._detection_matrix.stop()
1602 self._detection_matrix = None
1603 if self._collision_sensor:
1604 self._collision_sensor.destroy()
1605 self._collision_sensor = None # type: ignore[assignment]
1606
[docs]
1607 def destroy(self):
1608 """Resets attributes and destroys helpers like the :py:class:`.DetectionMatrix`."""
1609 self._destroy_sensor()
1610 self._world_model = None # type: ignore[assignment]
1611 self._world = None # type: ignore[assignment]
1612 if self.ctx:
1613 self.ctx.agent = None # type: ignore[assignment]
1614 self.ctx = None # type: ignore[assignment]
1615 try:
1616 self.all_vehicles.clear()
1617 self.vehicles_nearby.clear()
1618 self.all_walkers.clear()
1619 self.walkers_nearby.clear()
1620 except AttributeError:
1621 pass
1622
1623 def __del__(self):
1624 with contextlib.suppress(Exception):
1625 self.destroy()