1"""
2This module implements an agent that roams around a track following random
3waypoints and avoiding other vehicles. The agent also responds to traffic lights,
4traffic signs, and has different possible configurations.
5"""
6
7# Decide how to handle imports
8# ruff: noqa: PLC0415
9
10from __future__ import annotations
11
12import contextlib
13import sys
14from copy import deepcopy
15from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, List, NoReturn, Optional, Sequence, Set, Union
16from typing import cast as assure_type
17
18import carla # pyright: ignore[reportMissingTypeStubs]
19import omegaconf
20from omegaconf import DictConfig, OmegaConf
21from typing_extensions import Literal, Self, Unpack
22
23from agents import substep_managers
24from agents.dynamic_planning.dynamic_local_planner import DynamicLocalPlannerWithRss
25from agents.navigation.behavior_agent import BehaviorAgent
26from agents.navigation.global_route_planner import GlobalRoutePlanner
27from agents.rules import rule_from_config
28from agents.tools.config_creation import (
29 AgentConfig,
30 LaunchConfig,
31 LiveInfo,
32 LunaticAgentSettings,
33 RssRoadBoundariesModeAlias,
34 RuleCreatingParameters,
35)
36from agents.tools.logs import logger
37from agents.tools.lunatic_agent_tools import (
38 detect_vehicles,
39 must_clear_hazard,
40 phase_callback, # type: ignore[unused-import] # noqa: F401
41 result_to_context,
42)
43from agents.tools.misc import lanes_have_same_direction
44from classes.constants import AD_RSS_AVAILABLE, READTHEDOCS, AgentState, Hazard, HazardSeverity, Phase, RoadOption
45from classes.exceptions import (
46 AgentDoneException,
47 ContinueLoopException,
48 EmergencyStopException,
49 LunaticAgentException,
50 NoFurtherRulesException,
51 SkipInnerLoopException,
52 UpdatedPathException,
53 UserInterruption,
54)
55from classes.rule import BlockingRule, Context, Rule
56from classes.worldmodel import CarlaDataProvider, WorldModel
57from classes.detection_matrix import AsyncDetectionMatrix, DetectionMatrix
58from classes.information_manager import InformationManager
59
60if TYPE_CHECKING:
61 from agents.tools.hints import ObstacleDetectionResult, TrafficLightDetectionResult
62 import pygame
63
64
[docs]
65class LunaticAgent(BehaviorAgent):
66 """
67 BasicAgent implements an agent that navigates the scene.
68 This agent respects traffic lights and other vehicles, but ignores stop signs.
69 It has several functions available to specify the route that the agent must follow,
70 as well as to change its parameters in case a different driving mode is desired.
71 """
72
73 BASE_SETTINGS: "type[LunaticAgentSettings]" = LunaticAgentSettings
74 """
75 Base AgentConfig class for this agent. This is used to create the default settings for the agent
76 if none are provided.
77 """
78
79 DEFAULT_RULES: ClassVar[Dict[Phase, List[Rule]]] = {k: [] for k in Phase.get_phases()}
80 """
81 Default rules of this agent class when initialized.
82
83 :meta hide-value:
84 """
85
86 rules: Dict[Phase, List[Rule]]
87 """
88 The rules of the this agent.
89 When initialized the rules are deep copied from :py:attr:`DEFAULT_RULES`.
90 """
91
92 ctx: Context
93 """The context object of the current step"""
94
95 # Information from the InformationManager
96 walkers_nearby: List[carla.Walker]
97 vehicles_nearby: List[carla.Vehicle]
98 static_obstacles_nearby: List[carla.Actor]
99 """Static obstacles detected by the :py:class:`.InformationManager`"""
100
101 obstacles_nearby: List[carla.Actor]
102 """
103 Combination of :py:attr:`vehicles_nearby`, :py:attr:`walkers_nearby`
104 and :py:attr:`static_obstacles_nearby`.
105 """
106
107 traffic_lights_nearby: List[carla.TrafficLight]
108 traffic_signs_nearby: List[carla.TrafficSign] = NotImplemented
109 """
110 Not yet implemented
111
112 :meta private:
113 """
114
115 current_states: Dict[AgentState, int]
116 """The current states of the agent. The count of the steps being each state is stored as value."""
117
118 #
119
120 _world_model: WorldModel
121 """Reference to the attached :py:class:`WorldModel`."""
122
123 _validate_phases = False
124 """
125 Debugging flag to sanity check if the agent passes trough the phases in the correct order.
126 Attention: Currently straight forward anymore.
127 """
128
129 _active_blocking_rules: Set[BlockingRule]
130 """Blocking rules that are currently active and have taken over the agents loop."""
131
132 # ------------------ Initialization ------------------ #
133
134 from agents.tools.lunatic_agent_tools import create_agent_config as _create_agent_config
135
[docs]
136 @classmethod
137 def create_world_and_agent(cls, args: LaunchConfig, *,
138 vehicle: Optional[carla.Vehicle] = None,
139 sim_world: carla.World,
140 settings_archetype: "Optional[type[AgentConfig]]" = None,
141 agent_config: Optional["LunaticAgentSettings"] = None,
142 overwrites: Optional[Dict[str, Any]] = None
143 ) -> tuple[Self, WorldModel, GlobalRoutePlanner]:
144 """
145 Setup function to create the agent from the :py:class:`LaunchConfig` settings.
146
147 Note:
148 - :py:meth:`.GameFramework.init_agent_and_interface` is the preferred way to create to
149 instantiate the agent, only use this method if you try not do create a
150 :py:class:`GameFramework` object.
151 """
152 if overwrites is None:
153 overwrites = {}
154
155 if agent_config is None:
156 if hasattr(args, "agent"):
157 if settings_archetype is not None:
158 logger.warning("settings_archetype was passed but using args.agent. "
159 "Ignoring settings_archetype.")
160 agent_config = args.agent
161 elif settings_archetype is not None and isinstance(settings_archetype, object):
162 logger.warning("settings_archetype is an instance. "
163 "To pass an instance use agent_config instead.")
164 agent_config = settings_archetype # type: ignore
165 elif settings_archetype is not None:
166 logger.debug("Creating config from settings_archetype")
167 behavior = settings_archetype(overwrites)
168 agent_config = cls.BASE_SETTINGS.cast(behavior.to_dict_config())
169 else:
170 logger.debug("Using %s._base_settings %s to create config.",
171 cls.__name__, cls.BASE_SETTINGS)
172 agent_config = cls.BASE_SETTINGS.cast(cls.BASE_SETTINGS.to_dict_config())
173 assert agent_config is not None
174 else:
175 logger.debug("A config was passed, using it as is.")
176
177 world_model = WorldModel(agent_config, args=args, carla_world=sim_world, player=vehicle) # TEST: without args
178 agent_config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage]
179
180 agent = cls(agent_config, world_model)
181 return agent, world_model, agent.get_global_planner()
182
183 def __init__(self,
184 settings: Union[str, LunaticAgentSettings],
185 world_model: Optional[WorldModel] = None,
186 *,
187 vehicle: Optional[carla.Vehicle] = None,
188 overwrite_options: Optional[dict[str, Any]] = None,
189 debug: bool = True):
190 """
191 Initialize the LunaticAgent.
192
193 Args:
194 settings :
195 The settings of the agent to construct the :py:attr:`LunaticAgent.config`.
196 It can be either a string pointing to a yaml file or a LunaticAgentSettings.
197 world_model : The world model to use. If None, a new WorldModel will be created.
198 vehicle : The vehicle controlled by the agent. Can be None then the :code:`world_model.player` will be used.
199 overwrite_options : Additional options to overwrite the default agent configuration.
200 debug : Whether to enable debug mode.
201 """
202 self._debug = debug
203 """Enables additional debug output."""
204 if world_model is None and len(sys.argv) > 1:
205 logger.error("BUG: Beware when not passing a WorldModel, the WorldModel currently ignores command line overrides, "
206 "i.e. will only use the config file.\n> Use `%s.create_world_and_agent` and provide the LaunchConfig instead.", self.__class__.__name__)
207
208 # -------------------- Settings ------------------------
209 # Depending on the input type, the settings are created differently.
210
211 self.config = self._create_agent_config(settings, world_model, overwrite_options)
212
213 # -------------------------------
214
215 logger.info("\n\nAgent config is %s", OmegaConf.to_yaml(self.config))
216
217 # World Model
218 if world_model is None:
219 world_model = WorldModel(self.config, player=vehicle)
220 self.config.planner.dt = world_model.world_settings.fixed_delta_seconds or 1 / world_model._args.fps # pyright: ignore[reportPrivateUsage]
221
222 self._world_model: WorldModel = world_model
223 self._world: carla.World = world_model.world
224
225 # Register Vehicle
226 assert world_model.player is not None
227 self.set_vehicle(world_model.player)
228
229 self.current_phase: Phase = Phase.NONE
230 """current phase of the agent inside the loop"""
231
232 self.ctx = None # type: ignore
233
234 self._last_traffic_light: Optional[carla.TrafficLight] = None
235 """Current red traffic light"""
236
237 # TODO: No more hardcoded defaults / set them from opt_dict which must have all parameters; check which are parameters and which are set by other functions (e.g. _look_ahead_steps)
238
239 # Parameters from BehaviorAgent ------------------------------------------
240 # todo: check redefinitions
241
242 self._look_ahead_steps: int = 0
243 """
244 updated in _update_information used for local_planner.get_incoming_waypoint_and_direction
245 """
246
247 # Initialize the planners
248 self._global_planner = CarlaDataProvider.get_global_route_planner() # NOTE: THIS does not use self.config.planner.sampling_resolution
249 if not self._global_planner:
250 logger.error("Global Route Planner not set - This should not happen, if the CarlaDataProvider has been initialized.")
251 self._global_planner = GlobalRoutePlanner(CarlaDataProvider.get_map(), self.config.planner.sampling_resolution)
252 CarlaDataProvider._grp = self._global_planner # pyright: ignore[reportPrivateUsage]
253
254 # Get the static elements of the scene
255 self._traffic_light_map: Dict[carla.TrafficLight, carla.Transform] = CarlaDataProvider._traffic_light_map # pyright: ignore[reportPrivateUsage]
256 self._lights_list = CarlaDataProvider._traffic_light_map.keys() # pyright: ignore[reportPrivateUsage]
257 self._lights_map: Dict[int, carla.Waypoint] = {}
258 """Dictionary mapping a traffic light to a wp corresponding to its trigger volume location"""
259
260 # Vehicle Lights
261 self._vehicle_lights = carla.VehicleLightState.NONE
262
263 # Collision Sensor # NOTE: another in the WorldModel for the HUD
264 self._set_collision_sensor()
265
266 # Rule Framework
267 # 1. add all rules from the class, if any - else this is a dict with empty lists
268 self.rules = deepcopy(self.__class__.DEFAULT_RULES) # Copies the ClassVar to the instance
269
270 # 2. add rules from the config
271 self.add_config_rules()
272
273 # Information Manager
274 self.information_manager = InformationManager(self)
275 self.current_states = self.information_manager.state_counter # share the dict
276
277 # Blocking Rules
278 self._active_blocking_rules = set()
279
280 # --------------------- Collision Callback ------------------------
281
282 def _set_collision_sensor(self):
283 # see: https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector
284 # and https://carla.readthedocs.io/en/latest/python_api/#carla.Sensor.listen
285 blueprint = CarlaDataProvider._blueprint_library.find('sensor.other.collision') # pyright: ignore[reportPrivateUsage]
286 self._collision_sensor: carla.Sensor = assure_type(carla.Sensor, CarlaDataProvider.get_world().spawn_actor(
287 blueprint, carla.Transform(), attach_to=self._vehicle))
288
289 self._collision_sensor.listen(self._collision_event)
290
291 from agents.substep_managers import collision_manager as _collision_manager
292
293 def _collision_event(self, event: carla.CollisionEvent):
294 """
295 Callback function for the collision sensor.
296
297 By default uses :py:func:`agents.substep_managers.collision_manager`.
298
299 Executes Phases:
300 - :py:class:`Phase.COLLISION | Phase.BEGIN<.Phase>`
301 - :py:class:`Phase.COLLISION | Phase.END<.Phase>`
302 """
303 # https://carla.readthedocs.io/en/latest/python_api/#carla.CollisionEvent
304 # e.g. setting ignore_vehicles to False, if it was True before.
305 # do an emergency stop (in certain situations)
306 try:
307 self.execute_phase(Phase.COLLISION | Phase.BEGIN, prior_results=event)
308 except AttributeError as e:
309 if "'NoneType' object has no attribute 'prior_result'" not in str(e):
310 raise e
311 # context not yet set up, very early collision
312 result = self._collision_manager(event)
313 try:
314 self.execute_phase(Phase.COLLISION | Phase.END, prior_results=result)
315 except AttributeError as e:
316 if "'NoneType' object has no attribute 'prior_result'" not in str(e):
317 raise e
318 # context not yet set up, very early collision
319
320 # --------------------- Adding rules ------------------------
321
[docs]
322 def add_rule(self, rule: Rule, position: Union[int, None] = None):
323 """
324 Add a rule to the agent. The rule will be inserted at the given position.
325
326 Args:
327 rule : The rule to add
328 position :
329 The position to insert the rule at.
330 If :python:`None` the rule list will be sorted by priority.
331 Defaults to :python:`None`.
332 """
333 for p in rule.phases:
334 if p not in self.rules:
335 logger.warning("Phase %s from Rule %s is not a default phase. Adding a new phase.", p, rule)
336 self.rules[p] = []
337 if position is None:
338 self.rules[p].append(rule)
339 self.rules[p].sort(key=lambda r: r.priority, reverse=True)
340 else:
341 self.rules[p].insert(position, rule)
342
[docs]
343 def add_rules(self, rules: "Rule | Iterable[Rule]"):
344 """Add a list of rules and sort the agents rules by priority."""
345 if isinstance(rules, Rule):
346 rules = [rules]
347 for rule in rules:
348 for phase in rule.phases:
349 self.rules[phase].append(rule)
350 for phase in self.rules.keys(): # noqa: SIM118
351 self.rules[phase].sort(key=lambda r: r.priority, reverse=True)
352
[docs]
353 def add_config_rules(self, config: Optional[Union[LunaticAgentSettings, List[RuleCreatingParameters]]] = None):
354 """
355 Adds rules
356 """
357 if config is None:
358 config = self.config
359 rule_list: List[RuleCreatingParameters]
360 # "rules" in config is also false for rules == MISSING
361 if isinstance(config, (AgentConfig, DictConfig)) and "rules" in config.keys(): # noqa: SIM118, RUF100
362 try:
363 rule_list = config.rules
364 except omegaconf.MissingMandatoryValue:
365 logger.debug("`rules` key was missing, skipping rule addition.")
366 return
367 else:
368 rule_list: List[RuleCreatingParameters] = config # type: ignore[assignment]
369 logger.debug("Adding rules from config:\n%s", OmegaConf.to_yaml(rule_list))
370 for rule in rule_list:
371 self.add_rules(rule_from_config(rule)) # each call could produce one or more rules
372
373 # --------------------- Properties ------------------------
374
375 @property
376 def live_info(self) -> LiveInfo:
377 return self.config.live_info
378
379 @property
380 def detection_matrix(self):
381 """
382 Returns :any:`DetectionMatrix.getMatrix` if the matrix is set.
383 """
384 if self._detection_matrix:
385 return self._detection_matrix.getMatrix()
386 return None
387
388 # ------------------ Hazard ------------------ #
389
390 @property
391 def detected_hazards(self) -> Set[Hazard]:
392 return self.ctx.detected_hazards
393
394 @property
395 def detected_hazards_info(self) -> Dict[Hazard, Any]:
396 """
397 Information about the detected hazards, e.g. severity.
398 """
399 return self.ctx.detected_hazards_info
400
401 @detected_hazards.setter
402 def detected_hazards(self, hazards: Set[Hazard]):
403 if not isinstance(hazards, set): # pyright: ignore[reportUnnecessaryIsInstance]
404 raise TypeError("detected_hazards must be a set of Hazards.")
405 self.ctx._detected_hazards = hazards # pyright: ignore[reportPrivateUsage]
406
407 # These have the same signature and can be used interchangeably
408 discard_hazard = Context.discard_hazard
409 add_hazard = Context.add_hazard
410 has_hazard = Context.has_hazard
411
412 #
413 @property
414 def current_traffic_light(self) -> "carla.TrafficLight | None":
415 """Alias to :py:attr:`self._last_traffic_light <_last_traffic_light>`."""
416 return self._last_traffic_light
417
418 @current_traffic_light.setter
419 def current_traffic_light(self, traffic_light: carla.TrafficLight):
420 self._last_traffic_light = traffic_light
421
422 @property
423 def phase_results(self) -> Dict[Phase, Any]:
424 """
425 Retrieves :py:attr:`agent.ctx.phase_results <classes.rule.Context.phase_results>`
426
427 Stores the results of the phases the agent has been in.
428 By default the keys are set to :any:`Context.PHASE_NOT_EXECUTED`.
429 """
430 return self.ctx.phase_results
431
432 @property
433 def active_blocking_rules(self) -> Set[BlockingRule]:
434 """
435 Blocking rules that are currently active and have taken over the agents loop.
436 """
437 return self._active_blocking_rules
438
439 @property
440 def _map(self) -> carla.Map:
441 """Get the current map of the world.""" # Needed *only* for set_destination
442 return CarlaDataProvider.get_map()
443
444 # ------------------
445
446 #@property
447 #def ctx(self) -> Union[Context, None]:
448 # print("Getting Context", self._ctx())
449 # return self._ctx() # might be None
450
451 # ------------------ Information functions ------------------ #
452
471
472 def _update_information(self, *, second_pass: bool = False):
473 """
474 This method updates the information regarding the ego
475 vehicle based on the surrounding world.
476
477 second_pass = True will skip some calculations,
478 especially useful if a second call to _update_information is necessary in the same tick.
479
480 Assumes: second_pass == True => agent.done() == False
481
482 Note:
483 Does not execute any Phase.
484 """
485 # --------------------------------------------------------------------------
486 # Information that is CONSTANT DURING THIS TICK and INDEPENDENT OF THE ROUTE
487 # --------------------------------------------------------------------------
488 if not second_pass:
489 if self._debug: # noqa: SIM102
490 if not self._lights_list and len(CarlaDataProvider._traffic_light_map.keys()): # pyright: ignore[reportPrivateUsage]
491 logger.error("Traffic light list is empty, but map is not.")
492
493 # ----------------------------
494 # First Pass for expensive and tick-constant information
495
496 # --- InformationManager ---
497 information: InformationManager.Information = self.information_manager.tick() # NOTE: # Warning: Currently not route-dependant, might need to be changed later
498 self.tick_information = information
499
500 self._current_waypoint = information.current_waypoint
501 self.current_states = information.current_states
502
503 # Maybe access over subattribute, or properties
504 # NOTE: If other scripts, e.g. the scenario_runner, are used in parallel or sync=False
505 # the stored actors might could have been destroyed later on.
506
507 # Global Information
508 self.all_vehicles = information.vehicles
509 self.all_walkers = information.walkers
510 self.all_static_obstacles = information.static_obstacles
511 # Combination of the three lists
512 self.all_obstacles = information.obstacles
513
514 # Filtered by config.obstacles.nearby_vehicles_max_distance and nearby_vehicles_max_distance
515 self.vehicles_nearby = information.vehicles_nearby
516 self.walkers_nearby = information.walkers_nearby
517 self.static_obstacles_nearby = information.static_obstacles_nearby
518
519 # Combination of the three lists
520 self.all_obstacles_nearby = information.obstacles_nearby
521
522 self.traffic_lights_nearby = information.traffic_lights_nearby
523
524 # Find vehicles and walkers nearby; could be moved to the information manager
525
526 # ----------------------------
527
528 # Data Matrix
529 # update not every frame to save performance
530 if self._detection_matrix and self._detection_matrix.sync:
531 self._road_matrix_counter += 1
532 if (self._road_matrix_counter % self.config.detection_matrix.sync_interval) == 0:
533 #logger.debug("Updating Road Matrix")
534 # TODO: Still prevent async mode from using too much resources and slowing fps down too much.
535 self._detection_matrix.update() # NOTE: Does nothing if in async mode. self.road_matrix is updated by another thread.
536 else:
537 pass
538
539 # used for self._local_planner.get_incoming_waypoint_and_direction
540 self._look_ahead_steps = int((self.live_info.current_speed_limit) / 10) # TODO: Maybe make this an interpolation and make more use of it
541
542 # ----- Follow speed limits -------
543 # NOTE: This is once again set in the local planner, but only on the Context config!
544 if self.config.speed.follow_speed_limits:
545 self.config.speed.target_speed = self.config.live_info.current_speed_limit
546
547 # NOTE: This is the direction used by the planner in the *last* step.
548 self.live_info.executed_direction = self._local_planner.target_road_option
549
550 assert self.live_info.executed_direction == self._local_planner.target_road_option, "Executed direction should not change."
551
552 # -------------------------------------------------------------------
553 # Information that NEEDS TO BE UPDATED AFTER a plan / ROUTE CHANGE.
554 # -------------------------------------------------------------------
555 if not self.done():
556 # NOTE: This should be called after
557 self.live_info.incoming_waypoint, self.live_info.incoming_direction = self._local_planner.get_incoming_waypoint_and_direction( # pyright: ignore[reportAttributeAccessIssue]
558 steps=self._look_ahead_steps)
559 else:
560 assert second_pass is False, "In the second pass the agent should have replanned and agent.done() should be False"
561 # Assumes second_pass is False
562 # Queue is empty
563 self.live_info.incoming_waypoint = None
564 self.live_info.incoming_direction = RoadOption.VOID
565
566 # Information that requires updated waypoint and route information:
567 self.live_info.is_taking_turn = self.is_taking_turn()
568 self.live_info.is_changing_lane = self.is_changing_lane()
569
570 #logger.debug(f"Incoming Direction: {str(self.live_info.incoming_direction):<20} - Second Pass: {second_pass}")
571
572 # RSS
573 # todo uncomment if agent is created after world model
574 #self.rss_set_road_boundaries_mode() # in case this was adjusted during runtime. # TODO: maybe implement this update differently. As here it is called unnecessarily often.
575
576 if self._debug:
577 OmegaConf.to_container(self.live_info, resolve=True, throw_on_missing=True)
578
[docs]
579 def is_taking_turn(self) -> bool:
580 """Checks if the agent is taking a turn in a few steps"""
581 return self.live_info.incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT)
582
[docs]
583 def is_changing_lane(self) -> bool:
584 """Checks if the agent is changing lanes in a few steps"""
585 return self.live_info.incoming_direction in (RoadOption.CHANGELANELEFT, RoadOption.CHANGELANERIGHT)
586
587 # ------------------ Step & Loop Logic ------------------ #
588
[docs]
589 def execute_phase(self, phase: Phase, *, prior_results: Any, update_controls: Optional[carla.VehicleControl] = None) -> Context:
590 """
591 Sets the current phase of the agent and executes all rules that are associated with it.
592
593 Parameters:
594 phase : The phase to execute.
595 prior_results : The results of the previous phase, e.g. :py:attr:`detected_hazards`.
596 update_controls : Optionally controls that should be used from now onward.
597 """
598 normal_next = self.current_phase.next_phase() # sanity checking if everything is correct
599 if self._validate_phases:
600 assert (normal_next in {phase, Phase.USER_CONTROLLED}
601 or phase & Phase.EXCEPTIONS
602 or phase & Phase.USER_CONTROLLED),\
603 f"Phase {phase} is not the next phase of {self.current_phase} or an exception phase. Expected {normal_next}"
604
605 self.current_phase = phase # set next phase
606
607 if update_controls is not None:
608 self.set_control(update_controls)
609 self.ctx.prior_result = prior_results
610 self.ctx.phase_results[phase] = prior_results
611
612 rules_to_check = self.rules.get(phase, ()) # use get if a custom phase is added, without a rule
613 try:
614 for rule in rules_to_check: # todo: maybe dict? grouped by phase?
615 assert self.current_phase in rule.phases, f"Current phase {self.current_phase} not in Rule {rule.phases}" # TODO remove:
616 rule(self.ctx)
617 # NOTE: Blocking rules can change the and above assertion will fail.
618 if phase != self.current_phase:
619 logger.warning("Phase was changed by rule %s to %s. "
620 "Resting self.current_phase to %s. "
621 "To prevent his raise an exception in the rule or adjust the phase.",
622 rule, self.current_phase, phase)
623 self.current_phase = phase
624
625 except NoFurtherRulesException:
626 pass
627 except omegaconf.ReadonlyConfigError:
628 print("WARNING: A action likely tried to change `ctx.config` which is non-permanent. Use `ctx.agent.config.` instead.")
629 raise
630 return self.ctx
631
632 def _plan_path_phase(self, *, second_pass: bool, debug: bool = False):
633 try:
634 self.execute_phase(Phase.PLAN_PATH | Phase.BEGIN, prior_results=None)
635 # User defined action
636 # TODO: when going around corners / junctions and the distance between waypoints is too big,
637 # We should replan and and make a more fine grained plan, to stay on the road.
638 self.execute_phase(Phase.PLAN_PATH | Phase.END, prior_results=None)
639 except UpdatedPathException as e:
640 if second_pass:
641 logger.warning("UpdatedPathException was raised in the second pass. This should not happen: %s. Restrict your rule on ctx.second_pass.", e)
642 return self.run_step(debug, second_pass=True)
643 return None
644
645 def _make_context(self, last_context: Union[Context, None], **kwargs: Any) -> Context:
646 """Creates a new context object for the agent at the start of a step."""
647 if last_context is not None:
648 del last_context.last_context
649 ctx = Context(agent=self, last_context=last_context, **kwargs)
650 self.ctx = ctx
651 return ctx
652
[docs]
653 def __call__(self, debug: bool = False) -> carla.VehicleControl:
654 """Calculates the next vehicle control object."""
655 return self.run_step(debug, second_pass=False) # debug should be positional!
656
657 # Python 3.8+ add / for positional only arguments
[docs]
658 def run_step(self, debug: bool = False, second_pass: bool = False) -> carla.VehicleControl:
659 """
660 Calculates the next vehicle control object.
661
662 Arguments:
663 debug : Whether to enable debug mode.
664 This prints some more information and debug drawings.
665 second_pass : **Internal usage** set to :python:`True`
666 if this function is called a second time, e.g. after a route update.
667
668 Warning:
669 To be compatible with the :py:class:`.LunaticChallenger`, **always pass** :code:`debug` **as a positional argument**,
670 or use the :py:meth:`__call__` method.
671 """
672
673 if not second_pass:
674 ctx = self._make_context(last_context=self.ctx)
675 else:
676 ctx = self.ctx
677 ctx.second_pass = second_pass
678 try:
679 # ----------------------------
680 # Phase 0 - Update Information
681 # ----------------------------
682 # > Phase.UPDATE_INFORMATION | Phase.BEGIN
683 self.update_information(second_pass=second_pass)
684 # > Phase.UPDATE_INFORMATION | Phase.END
685
686 # ----------------------------
687 # Phase 1 - Plan Path
688 # ----------------------------
689
690 # Question: What TODO if the last phase was COLLISION (async), EMERGENCY
691 # Some information to PLAN_PATH should reflect this
692
693 # NOTE: Currently no option to diverge from existing path here, or plan a new path
694 # NOTE: Currently done in the local planner and behavior functions
695 self._plan_path_phase(second_pass=second_pass, debug=debug)
696
697 # Check whether the agent has reached its destination.
698 if self.done():
699 # NOTE: Might be in NONE phase here.
700 self.execute_phase(Phase.DONE | Phase.BEGIN, prior_results=None)
701 if self.done():
702 # No Rule set a new destination
703 logger.info("The target has been reached, stopping the simulation")
704 self.execute_phase(Phase.TERMINATING | Phase.BEGIN, prior_results=None)
705 raise AgentDoneException
706 self.execute_phase(Phase.DONE | Phase.END, prior_results=None)
707 return self.run_step(debug, second_pass=True) # NOTE! For child classes like the leaderboard agent this calls the higher level run_step.
708
709 # ----------------------------
710 # Phase NONE - Before Running step
711 # ----------------------------
712 try:
713 planned_control = self._inner_step(debug=debug) # debug=True draws waypoints
714 if self.detected_hazards:
715 # The must_clear_hazard decorator will raise this, but in case the _inner_step is overwritten
716 raise EmergencyStopException(self.detected_hazards) # noqa: TRY301
717 # Reraise Exceptions
718 except UserInterruption:
719 raise
720
721 # Handled Exceptions
722 except EmergencyStopException as emergency:
723 # ----------------------------
724 # Phase Emergency
725 # no Rule with Phase.EMERGENCY | BEGIN cleared the provided hazards in ctx.prior_results
726 # ----------------------------
727
728 emergency_controls = self.emergency_manager(reasons=emergency.hazards_detected)
729
730 # TODO: somehow backup the control defined before.
731 self.execute_phase(Phase.EMERGENCY | Phase.END,
732 update_controls=emergency_controls,
733 prior_results=emergency.hazards_detected)
734 planned_control = self.get_control() # type: carla.VehicleControl # type: ignore[assignment]
735
736 except SkipInnerLoopException as skip:
737 self.set_control(skip.planned_control)
738 self.current_phase = Phase.USER_CONTROLLED
739 planned_control = skip.planned_control
740 except UpdatedPathException as update_exception:
741 if second_pass > 5:
742 logger.warning("UpdatedPathException was raised more than %s times. "
743 "Warning: This might be an infinite loop.", second_pass)
744 elif second_pass > 50:
745 raise RecursionError("UpdatedPathException was raised more than 50 times. "
746 "Assuming an infinite loop and terminating") from update_exception
747 else:
748 logger.warning("%s was raised in the inner step, this should be done in "
749 "Phase.PLAN_PATH instead.", update_exception)
750 return self.run_step(second_pass=int(second_pass) + 1) # type: ignore
751 except LunaticAgentException as lae:
752 if self.ctx.control is None:
753 raise ValueError(f"A VehicleControl object must be set on the agent when {type(lae).__name__} is "
754 "raised during `._inner_step`") from lae
755 planned_control = self.get_control() # type: ignore[assignment]
756 # assert ctx.control
757
758 # ----------------------------
759 # No known Phase multiple exit points
760 # ----------------------------
761
762 # ----------------------------
763 # Phase RSS - Check RSS
764 # ----------------------------
765
766 ctx = self.execute_phase(Phase.RSS_EVALUATION | Phase.BEGIN,
767 prior_results=None,
768 update_controls=planned_control)
769 if AD_RSS_AVAILABLE and self.config.rss and self.config.rss.enabled:
770 rss_updated_controls = self._world_model.rss_check_control(ctx.control) # type: ignore[arg-type]
771 else:
772 rss_updated_controls = None
773 # NOTE: rss_updated_controls could be None.
774 ctx = self.execute_phase(Phase.RSS_EVALUATION | Phase.END, prior_results=rss_updated_controls)
775
776 #if ctx.control is not planned_control:
777 # logger.debug("RSS updated control accepted.")
778
779 except ContinueLoopException as e:
780 logger.debug("ContinueLoopException skipping rest of loop.")
781 if self.ctx.control is None:
782 raise ValueError(f"A VehicleControl object must be set on the agent when {type(e).__name__} is raised during `._inner_step`") from e
783
784 planned_control = self.ctx.control # type: carla.VehicleControl # type: ignore[assignment]
785 planned_control.manual_gear_shift = False
786 return self.get_control() # type: ignore[return-value]
787
[docs]
788 @must_clear_hazard
789 @result_to_context("control")
790 def _inner_step(self, debug: bool = False) -> carla.VehicleControl:
791 """
792 This is is the internal function to provide the next control object for
793 the agent; it should run every tick.
794
795 Raises:
796 EmergencyStopException: If :py:attr:`detected_hazards` is not empty when the
797 function returns.
798
799 :meta public:
800 """
801 self.debug = debug
802
803 # ----------------------------
804 # Phase 2 - Detection of Pedestrians and Traffic Lights
805 # ----------------------------
806
807 # Detect hazards
808 # phases are executed in detect_hazard
809 # > Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN # phases executed inside
810 pedestrians_and_tlight_hazard = self.detect_hazard()
811 # > Phase.DETECT_PEDESTRIANS | Phase.END
812
813 # Pedestrian avoidance behaviors
814 # currently doing either emergency (detect_hazard) stop or nothing
815 if self.detected_hazards:
816
817 # ----------------------------
818 # Phase Hazard Detected (traffic light or pedestrian)
819 # If no Rule with Phase.EMERGENCY | BEGIN clears pedestrians_or_traffic_light
820 # An EmergencyStopException is raised
821 # ----------------------------
822 self.react_to_hazard(pedestrians_and_tlight_hazard) # Optional[NoReturn]
823
824 # -----------------------------
825 # Phase Detect Static Obstacles
826 # -----------------------------
827
828 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.BEGIN, prior_results=None)
829 static_obstacle_detection_result = self.detect_obstacles_in_path(self.static_obstacles_nearby)
830 if static_obstacle_detection_result.obstacle_was_found:
831 self.current_states[AgentState.BLOCKED_BY_STATIC] += 1
832 # Must plan around it
833 self.add_hazard(Hazard.STATIC_OBSTACLE)
834 else:
835 self.current_states[AgentState.BLOCKED_BY_STATIC] = 0
836 # TODO: add a basic rule for circumventing static obstacles
837 self.execute_phase(Phase.DETECT_STATIC_OBSTACLES | Phase.END, prior_results=static_obstacle_detection_result)
838 # Not throwing an error here yet
839
840 # ----------------------------
841 # Phase 3 - Detection of Cars
842 # ----------------------------
843
844 self.execute_phase(Phase.DETECT_CARS | Phase.BEGIN, prior_results=None) # TODO: Maybe add some prio result
845 vehicle_detection_result = self.detect_obstacles_in_path(self.vehicles_nearby)
846
847 # TODO: add a way to let the execution overwrite
848 if vehicle_detection_result.obstacle_was_found:
849
850 # ----------------------------
851 # Phase 2.A - React to cars in front
852 # TODO: turn this into a rule.
853 # remove CAR_DETECTED -> pass detection_result to rules
854 # TODO some way to circumvent returning control here, like above.
855 # TODO: Needs refinement with the car_following_behavior
856 # ----------------------------
857
858 self.execute_phase(Phase.CAR_DETECTED | Phase.BEGIN, prior_results=vehicle_detection_result)
859 control = self.car_following_behavior(*vehicle_detection_result) # type: ignore[arg-type]
860 # NOTE: might throw EmergencyStopException
861 self.execute_phase(Phase.CAR_DETECTED | Phase.END, update_controls=control, prior_results=vehicle_detection_result)
862 return self.get_control() # type: ignore[return-value]
863
864 #TODO: maybe new phase instead of END or remove CAR_DETECTED and handle as rules (maybe better)
865 self.execute_phase(Phase.DETECT_CARS | Phase.END, prior_results=None) # NOTE: avoiding tailgate here
866
867 # Intersection behavior
868 # NOTE: is_taking_turn <- incoming_direction in (RoadOption.LEFT, RoadOption.RIGHT)
869 if self.live_info.incoming_waypoint.is_junction and self.is_taking_turn(): # pyright: ignore[reportOptionalMemberAccess]
870
871 # ----------------------------
872 # Phase Turning at Junction
873 # ----------------------------
874
875 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.BEGIN, prior_results=None)
876 control = self._calculate_control(debug=debug)
877 self.execute_phase(Phase.TURNING_AT_JUNCTION | Phase.END, update_controls=control, prior_results=None)
878 return self.get_control() # type: ignore[return-value]
879
880 # ----------------------------
881 # Phase 4 - Plan Path normally
882 # ----------------------------
883
884 # Normal behavior
885 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.BEGIN, prior_results=None)
886 control = self._calculate_control(debug=debug)
887 self.execute_phase(Phase.TAKE_NORMAL_STEP | Phase.END, prior_results=None, update_controls=control)
888
889 # Leave loop and apply controls outside
890 return self.get_control() # type: ignore[return-value]
891
892 def _calculate_control(self, debug: bool = False):
893 """
894 Plan the next step of the agent. This will execute the local planner
895 to retrieve the next control fitting the current path and settings.
896
897 Note:
898 This is the innermost function of the agents run_step function.
899 It should be called each step to acquire a desired control object.
900 Use this function inside rules if a control object is desired.
901
902 **[Context.get_or_calculate_control](#Context.get_or_calculate_control)
903 is a safer alternative to this function**
904
905 Warning:
906 If you do not use this function in a [`BlockingRule`](#BlockingRule)
907 you should raise a `SkipInnerLoopException` or `ContinueLoopException`
908 else the planned path will skip a waypoint.
909
910 Warning:
911 This function only calculates and returns the control object directly.
912 **It does not set the :py:attr:`agent/ctx.control <control>` attribute which is the one
913 the agent uses in [`apply_control`](#apply_control) to apply the final controls.**
914 """
915 if self.ctx.control is not None:
916 logger.error("Control was set before calling _calculate_control. This might lead to unexpected behavior.")
917
918 return self._local_planner.run_step(debug)
919
942
[docs]
943 def apply_control(self, control: Optional[carla.VehicleControl] = None):
944 """
945 Applies the control to the agent's actor.
946 Will execute the :py:class:`Phase.EXECUTION | Phase.BEGIN <classes.constants.Phase>`
947 and :py:class:`Phase.EXECUTION | Phase.END <classes.constants.Phase>` phases.
948
949 Note:
950 The final control object that is applied to the agent's actor
951 is stored in the :py:attr:`ctx.control <ctx>` attribute.
952
953 Raises ValueError:
954 If the control object is not set, i.e. :py:meth:`get_control` returns :python:`None`.
955 """
956 if control is None:
957 control = self.get_control()
958 if control is None:
959 raise ValueError("The agent has not yet performed a step this tick "
960 "and has no control object was passed.")
961 if self.current_phase != Phase.EXECUTION | Phase.BEGIN:
962 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control, update_controls=control)
963 else:
964 logger.debug("Agent is already in execution phase.")
965 # Set automatic control-related vehicle lights
966 final_control: carla.VehicleControl = self.get_control() # type: ignore[assignment]
967 self._update_lights(final_control)
968 self._vehicle.apply_control(final_control)
969 self.execute_phase(Phase.EXECUTION | Phase.END, prior_results=final_control)
970
971 # ------------------ Hazard Detection & Reaction ------------------ #
972
973 from agents.substep_managers import detect_traffic_light # -> TrafficLightDetectionResult
974 traffic_light_manager = detect_traffic_light # pyright: ignore[reportAssignmentType]
975 """Alias of :py:meth:`detect_traffic_light`"""
976
[docs]
977 def detect_hazard(self) -> Set[Hazard]:
978 """
979 Checks for red traffic lights and pedestrians in the agents path.
980
981 If :py:attr:`.LunaticAgentSettings.obstacles.detect_yellow_tlights` is set to :python:`True`,
982 then yellow traffic lights will also be regarded as a hazard that can trigger an
983 :py:exc:`EmergencyStopException` in :py:meth:`react_to_hazard` that is executed after this
984 function.
985 """
986 # Red lights and stops behavior
987
988 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.BEGIN, prior_results=None)
989 tlight_detection_result: TrafficLightDetectionResult = self.detect_traffic_light()
990 if tlight_detection_result.traffic_light_was_found:
991 if tlight_detection_result.traffic_light.state == carla.TrafficLightState.Red: # pyright: ignore[reportOptionalMemberAccess]
992 self.add_hazard(Hazard.TRAFFIC_LIGHT_RED)
993 else: # NOTE: self.config.obstacles.detect_yellow_tlights must be True
994 self.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, HazardSeverity.WARNING)
995
996 #assert self.live_info.next_traffic_light.id == tlight_detection_result.traffic_light.id, "Next assumed traffic light should be the same as the detected one." # TEMP
997
998 # NOTE next tlight is the next bounding box and might not be the next "correct" one
999 # self.live_info.next_traffic_light.id != tlight_detection_result.traffic_light.id:
1000
1001 self.execute_phase(Phase.DETECT_TRAFFIC_LIGHTS | Phase.END, prior_results=tlight_detection_result)
1002
1003 # Pedestrian avoidance behaviors
1004 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.BEGIN, prior_results=None)
1005 is_dangerous, detection_result = self.pedestrian_avoidance_behavior()
1006 if detection_result.obstacle_was_found:
1007 if is_dangerous:
1008 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.EMERGENCY)
1009 else:
1010 self.add_hazard(Hazard.PEDESTRIAN, hazard_level=HazardSeverity.WARNING)
1011 # NOTE: its a flag, could pack all in one bin
1012 # Pro: easier to check
1013 # Con: when to remove other states like warning
1014 # Make it a dict with the state as key and the detection result as value!
1015 self.execute_phase(Phase.DETECT_PEDESTRIANS | Phase.END, prior_results=(is_dangerous, detection_result))
1016
1017 return self.detected_hazards
1018
[docs]
1019 def react_to_hazard(self, hazard_detected: Union[Hazard, Iterable[Hazard], None]) -> Optional[NoReturn]:
1020 """
1021 Called when a hazard was detected-
1022
1023 Will store the detected hazards in the Context: `ctx.prior_result`
1024 If no rule clears this variable, the agent will throw a EmergencyStopException
1025
1026 Raises:
1027 EmergencyStopException: If a hazard was detected and no rule cleared it.
1028 """
1029
1030 # update state? prevent flodding of log information
1031 #logger.info("Hazard(s) detected: %s", self.detected_hazards)
1032 if hazard_detected:
1033 if not isinstance(hazard_detected, Hazard):
1034 self.detected_hazards.update(hazard_detected)
1035 else:
1036 self.detected_hazards.add(hazard_detected)
1037
1038 if self.detected_hazards:
1039 self.execute_phase(Phase.EMERGENCY | Phase.BEGIN, prior_results=hazard_detected)
1040 else:
1041 logger.info("react_to_hazard was called without any detected hazards.")
1042 if self.detected_hazards:
1043 raise EmergencyStopException(self.detected_hazards)
1044 logger.info("Hazards have been cleared.")
1045
1046 # ------------------ Behaviors ------------------ #
1047 # TODO: Section needs overhaul -> turn into rules
1048
[docs]
1049 def pedestrian_avoidance_behavior(self) -> tuple[bool, ObstacleDetectionResult]:
1050 """
1051 Detects pedestrians in the agents path.
1052
1053 Returns:
1054 A tuple containing a boolean indicating if the detected pedestrian is dangerous
1055 and the detection result.
1056 """
1057 # note ego_vehicle_wp is the current waypoint self._current_waypoint
1058 detection_result = self.detect_obstacles_in_path(self.walkers_nearby)
1059 if (detection_result.obstacle_was_found
1060 and (detection_result.distance - max(detection_result.obstacle.bounding_box.extent.y, # pyright: ignore[reportOptionalMemberAccess]
1061 detection_result.obstacle.bounding_box.extent.x) # pyright: ignore[reportOptionalMemberAccess]
1062 - max(self._vehicle.bounding_box.extent.y,
1063 self._vehicle.bounding_box.extent.x)
1064 < self.config.distance.emergency_braking_distance)):
1065 #print("Detected walker", detection_result.obstacle)
1066 # NOTE: should slow down here
1067 return True, detection_result
1068 # NOTE detected but not stopping -> ADD avoidance behavior
1069 if detection_result.obstacle_was_found:
1070 logger.debug("Detected a pedestrian but determined no intervention necessary (too far away).")
1071 return False, detection_result
1072
[docs]
1073 def car_following_behavior(self,
1074 vehicle_detected: bool, # noqa: ARG002,RUF100 # pylint: disable=unused-argument
1075 vehicle: carla.Actor,
1076 distance: float) -> carla.VehicleControl:
1077 """
1078 Parameters:
1079 Must match :py:class:`.ObstacleDetectionResult`
1080
1081 Assumes:
1082 - That an obstacle was detected:
1083 :py:attr:`vehicle_detected<.ObstacleDetectionResult.obstacle_was_found> is True and
1084 :py:attr:`vehicle<.ObstacleDetectionResult.obstacle>` is the detected vehicle.
1085 """
1086 exact_distance = distance - max(vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x) - max(
1087 self._vehicle.bounding_box.extent.y, self._vehicle.bounding_box.extent.x)
1088
1089 if exact_distance < self.config.distance.emergency_braking_distance:
1090 # Note: If the passed set is not cleared by a Phase.EMERGENCY | Phase.BEGIN rule,
1091 # an EmergencyStopException is raised.
1092 self.add_hazard(Hazard.CAR)
1093 self.react_to_hazard(hazard_detected={Hazard.CAR}) # Optional[NoReturn]
1094 controls = self.car_following_manager(vehicle, exact_distance)
1095 return controls
1096
1097 # ------------------ External Helpers ------------------ #
1098
1099 # Moved outside of the class for organization
1100 from agents.substep_managers import (
1101 car_following_manager, # pyright: ignore[reportAssignmentType]
1102 emergency_manager,
1103 )
1104
1105 # Subfunction of traffic_light_manager. In traffic_light_manager the parameters are chosen automatically
1106 # which is why traffic_light_manager should be used instead.
1107 # Kept for backwards compatibility and possible future use.
1108 from agents.substep_managers.traffic_light import affected_by_traffic_light
1109 from agents.tools.lunatic_agent_tools import detect_obstacles_in_path
1110
1111 # ----
1112
1113 #@override
[docs]
1114 def add_emergency_stop(self, control: carla.VehicleControl, reasons: "Optional[set[str]]" = None) -> carla.VehicleControl:
1115 """
1116 Modifies the control values to perform an emergency stop.
1117 The steering remains unchanged to avoid going out of the lane during turns.
1118
1119 :param control: (carla.VehicleControl) control to be modified
1120 :param enable_random_steer: (bool, optional) Flag to enable random steering
1121 """
1122 return self.emergency_manager(reasons=reasons, control=control) # type: ignore[arg-type]
1123
[docs]
1124 def lane_change(self,
1125 direction: Literal['left', 'right'],
1126 same_lane_time: float = 0,
1127 other_lane_time: float = 0,
1128 lane_change_time: float = 2,
1129 *,
1130 check: bool = False):
1131 """
1132 Changes the path so that the vehicle performs a lane change.
1133 Use 'direction' to specify either a 'left' or 'right' lane change,
1134 and the time parameters to fine tune the maneuver.
1135
1136 Steps for the lane change:
1137 1. **same_lane_time** seconds in the same lane.
1138 2. **lane_change_time** seconds to reach the other lane.
1139 3. **other_lane_time** seconds to stay in the other lane.
1140
1141 Parameters:
1142 waypoint: The starting waypoint.
1143 direction: The direction of the lane change, either 'left' or 'right'.
1144 Defaults to 'left'.
1145 same_lane_time: The time to follow the same lane before the lane change.
1146 other_lane_time: The time to follow the other lane after the lane change.
1147 lane_change_time: The time to reach the center of the last lane.
1148 A low value will make a fast lane change, while a high value will make slow lane change.
1149 check: If :python:`True`, the function will check if the lane change is possible, i.e.
1150 if there is a lane of :py:class:`carla.LaneType.Driving <carla.LaneType>` in
1151 the desired direction. Otherwise it can change to other lane types as well.
1152 Defaults to :code:`False`.
1153
1154 See Also:
1155 - :py:func:`agents.tools.lunatic_agent_tools.generate_lane_change_path`
1156 - :py:meth:`set_global_plan`
1157 """
1158 speed = self.live_info.current_speed / 3.6 # m/s
1159 # This is a staticfunction from BasicAgent function
1160 path: list[tuple[carla.Waypoint, RoadOption]] = self._generate_lane_change_path(
1161 self._current_waypoint, # NOTE: Assuming exact_waypoint
1162 direction,
1163 same_lane_time * speed, # get direction in meters t*V
1164 other_lane_time * speed,
1165 lane_change_time * speed,
1166 check=check,
1167 lane_changes=1, # changes only one lane
1168 step_distance=self.config.planner.sampling_resolution
1169 )
1170 if not path:
1171 logger.info("Ignoring the lane change as no path was found")
1172
1173 # Change path to take now
1174 # NOTE: use super with arguments here.
1175 super(LunaticAgent, self).set_global_plan(path) # noqa: UP008
1176 # TODO: # CRITICAL: Keep old global plan if it is some end goal -> Restore it.
1177
1178 # TODO: Use generate_lane_change_path to finetune
[docs]
1179 def make_lane_change(self,
1180 order: Sequence[Literal["left", "right"]] = ["left", "right"],
1181 up_angle_th: int = 180,
1182 low_angle_th: int = 0) -> "None | Literal[True]":
1183 """
1184 Move to the left/right lane if possible
1185
1186 Args:
1187 order : The order in
1188 which the agent should try to change lanes. If a single string is given, the agent
1189 will try to change to that lane.
1190 up_angle_th : The angle threshold for the upper limit of obstacle detection in the other lane.
1191 Default is 180 degrees, meaning that the agent will detect obstacles ahead.
1192 low_angle_th : The angle threshold for the lower limit of obstacle detection in the other lane.
1193 Default is 0 degrees, meaning that the agent will detect obstacles behind.
1194
1195 Assumes:
1196 - :python:`(self.config.live_info.incoming_direction == RoadOption.LANEFOLLOW \
1197 and not waypoint.is_junction and self.config.live_info.current_speed > 10)`
1198 - :python:`check_behind.obstacle_was_found and self.config.live_info.current_speed < get_speed(check_behind.obstacle)`
1199 """
1200 vehicle_list = self.vehicles_nearby
1201 waypoint = self._current_waypoint # todo use a getter
1202
1203 # There is a faster car behind us
1204 if isinstance(order, str):
1205 order = [order]
1206
1207 for direction in order:
1208 if direction == "right":
1209 right_turn = waypoint.right_lane_marking.lane_change
1210 can_change = (right_turn in {carla.LaneChange.Right, carla.LaneChange.Both})
1211 other_wpt = waypoint.get_right_lane()
1212 lane_offset = 1
1213 elif direction == "left":
1214 left_turn = waypoint.left_lane_marking.lane_change
1215 can_change = (left_turn in {carla.LaneChange.Left, carla.LaneChange.Both})
1216 other_wpt = waypoint.get_left_lane()
1217 lane_offset = -1
1218 else:
1219 raise ValueError(f"Direction must be 'left' or 'right', was {direction}")
1220 if (can_change # other_wpt is not None
1221 and lanes_have_same_direction(waypoint, other_wpt) # type: ignore[arg-type]
1222 and other_wpt.lane_type == carla.LaneType.Driving): # type: ignore[attr-defined]
1223 # Detect if right lane is free
1224 detection_result = detect_vehicles(self, vehicle_list,
1225 self.max_detection_distance("other_lane"),
1226 up_angle_th=up_angle_th,
1227 low_angle_th=low_angle_th,
1228 lane_offset=lane_offset)
1229 if not detection_result.obstacle_was_found:
1230 logger.debug("Change Lane, moving to the %s! Reason: %s", direction, "Overtaking" if tuple(order) == ("left", "right") else "Tailgating")
1231
1232 end_waypoint = self._local_planner.target_waypoint
1233 # TODO: How to set waypoint order? Or better use generate_lane_change_path!
1234 self.set_destination(end_location=other_wpt.transform.location, # type: ignore[arg-type]
1235 start_location=end_waypoint.transform.location, clean_queue=True)
1236 return True
1237 return None
1238
1239 # ------------------ Other Function ------------------ #
1240
1241 def _update_lights(self, vehicle_control: carla.VehicleControl):
1242 """Updates the light of the vehicle in the simulation."""
1243 current_lights: carla.VehicleLightState = self._vehicle_lights
1244 if vehicle_control.brake:
1245 current_lights |= carla.VehicleLightState.Brake
1246 else: # Remove the Brake flag
1247 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Brake
1248 if vehicle_control.reverse:
1249 current_lights |= carla.VehicleLightState.Reverse
1250 else: # Remove the Reverse flag
1251 current_lights &= carla.VehicleLightState.All ^ carla.VehicleLightState.Reverse
1252 if current_lights != self._vehicle_lights: # Change the light state only if necessary
1253 self._vehicle_lights = current_lights
1254 self._vehicle.set_light_state(carla.VehicleLightState(self._vehicle_lights))
1255
1256 def render_detection_matrix(self, display: "pygame.Surface", **options: Unpack["DetectionMatrix.RenderOptions"]):
1257 """
1258 See Also:
1259 - :py:meth:`DetectionMatrix.render`
1260
1261 Attention:
1262 **options** must match the arguments of :py:meth:`DetectionMatrix.render`.
1263
1264 :meta private:
1265 """
1266 if self._detection_matrix:
1267 # options should align with CameraConfig.DetectionMatrixHudConfig
1268 self._detection_matrix.render(display, **options)
1269
[docs]
1270 def verify_settings(self, config: Optional[LunaticAgentSettings] = None, *,
1271 verify_dataclass: Union["type[AgentConfig]", bool] = True,
1272 strictness: Literal[-1, 0, 1, 2, 3, 4] = 0):
1273 """
1274 Verifies the settings of the LunaticAgent.
1275 Foremost this checks if the :py:obj:`.planner.dt` value has been set
1276 to the speed of the world ticks in synchronous mode.
1277 Secondly if :python:`verify_dataclass=True` or a different AgentConfig class is provided,
1278 it will check for correct type usage.
1279
1280 Args:
1281 config: The configuration to verify.
1282 If not provided, the agent's default configuration will be used.
1283 Defaults to :code:`None`.
1284 verify_dataclass:
1285 Determines the dataclass to use for verification.
1286 If :python:`True`, the :py:attr:`.BASE_SETTINGS` dataclass will be used.
1287 See :py:meth:`AgentConfig.check_config` for more details.
1288 Defaults to True.
1289 strictness:
1290 The strictness level for :py:meth:`AgentConfig.check_config`.
1291 Defaults to :python:`3`.
1292
1293 Raises:
1294 TypeError: If **verify_dataclass** is not a valid AgentConfig subclass or True.
1295 MissingMandatoryValue: If :python:`config.planner.dt` is not present
1296 or not a :python:`float`
1297 """
1298 config = config or self.config
1299
1300 if self._world_model.world_settings.synchronous_mode:
1301 # Assure that dt is set
1302 if isinstance(config, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance]
1303 OmegaConf.select(config,
1304 "planner.dt",
1305 throw_on_missing=True
1306 )
1307 elif not isinstance(config.planner.dt, float):
1308 from omegaconf import MissingMandatoryValue
1309 raise MissingMandatoryValue("`config.planner.dt` needs to be set to a value. "
1310 "Cannot be:", type(config.planner.dt), config.planner.dt)
1311
1312 if strictness < 0:
1313 return
1314
1315 # NOTE: Below is experimental and might fail as the config at this point has already been setup
1316
1317 from agents.tools import config_creation
1318 _old = config_creation._WARN_LIVE_INFO # pyright: ignore[reportPrivateUsage]
1319 config_creation._WARN_LIVE_INFO = False # pyright: ignore[reportPrivateUsage]
1320
1321 if verify_dataclass:
1322 if verify_dataclass is True:
1323 dataclass = self.BASE_SETTINGS
1324 elif issubclass(verify_dataclass, AgentConfig): # pyright: ignore[reportUnnecessaryIsInstance]
1325 dataclass = verify_dataclass
1326 else:
1327 raise TypeError("`verify_dataclass` must be a `AgentConfig` or `True` to select the BASE_SETTINGS ")
1328 dataclass.check_config(config, dataclass.get("strict_config", strictness), as_dict_config=True)
1329
1330 config_creation._WARN_LIVE_INFO = _old # pyright: ignore[reportPrivateUsage]
1331
1332 # ------------------ Getter Function ------------------ #
1333
[docs]
1334 def get_control(self):
1335 """
1336 Returns the currently planned control of the agent.
1337
1338 If retrieved before the local planner has been run, it will return None.
1339 """
1340 return self.ctx.control
1341
1342 # ------------------ Setter Function ------------------ #
1343
[docs]
1344 def set_control(self, control: carla.VehicleControl):
1345 """
1346 Set new controls for the agent. Must be called before apply_control.
1347
1348 Raises:
1349 ValueError: If the control is None.
1350 """
1351 self.ctx.control = control
1352
1353 def _init_detection_matrix(self) -> None:
1354 if self.config.detection_matrix and self.config.detection_matrix.enabled:
1355 if self.config.detection_matrix.sync and self._world_model.world_settings.synchronous_mode:
1356 self._detection_matrix = DetectionMatrix(self._vehicle)
1357 else:
1358 self._detection_matrix = AsyncDetectionMatrix(self._vehicle)
1359 self._detection_matrix.start()
1360 else:
1361 self._detection_matrix = None
1362 self._road_matrix_counter = 0
1363
1364 if READTHEDOCS or TYPE_CHECKING:
1365 # From the parent class, but without type-hints
[docs]
1366 def set_destination(self,
1367 end_location: carla.Location,
1368 start_location: Optional[carla.Location] = None,
1369 clean_queue: bool = True) -> None:
1370 ...
1371
1372 def set_vehicle(self, vehicle: carla.Actor) -> None:
1373 """
1374 Set the vehicle for the agent (experimental if applied a second time)
1375
1376 :meta private:
1377 """
1378 self._vehicle = assure_type(carla.Vehicle, vehicle)
1379 # do not register same id twice
1380 if all(actor.id != vehicle.id for actor in CarlaDataProvider._actor_velocity_map): # pyright: ignore[reportPrivateUsage]
1381 with contextlib.suppress(KeyError):
1382 CarlaDataProvider.register_actor(vehicle, vehicle.get_transform()) # pyright: ignore[reportUnknownMemberType]
1383 else:
1384 # Exchange the key for a faster lookup
1385 for key in CarlaDataProvider._actor_velocity_map: # pyright: ignore[reportPrivateUsage]
1386 if key.id == vehicle.id:
1387 break
1388 else:
1389 # This does not happen because of the first if condition
1390 raise ValueError("The vehicle was not registered in the actor map.")
1391 CarlaDataProvider._actor_velocity_map[vehicle] = CarlaDataProvider._actor_velocity_map.pop(key) # pyright: ignore[reportPrivateUsage]
1392 CarlaDataProvider._actor_transform_map[vehicle] = CarlaDataProvider._actor_transform_map.pop(key) # pyright: ignore[reportPrivateUsage]
1393 CarlaDataProvider._actor_location_map[vehicle] = CarlaDataProvider._actor_location_map.pop(key) # pyright: ignore[reportPrivateUsage]
1394
1395 self._init_detection_matrix()
1396 self._local_planner = DynamicLocalPlannerWithRss(self,
1397 map_inst=CarlaDataProvider.get_map(),
1398 world=CarlaDataProvider.get_world(),
1399 rss_sensor=self._world_model.rss_sensor)
1400
1401 #@override
[docs]
1402 def set_target_speed(self, speed: float) -> None:
1403 """
1404 Changes the target speed of the agent.
1405 :param speed (float): target speed in Km/h
1406 """
1407 if self.config.speed.follow_speed_limits:
1408 print("WARNING: The max speed is currently set to follow the speed limits. "
1409 "Use 'follow_speed_limits' to deactivate this")
1410 self.config.speed.target_speed = speed # shared with planner
1411
[docs]
1412 def follow_speed_limits(self, value: bool = True) -> None:
1413 """
1414 If active, the agent will dynamically change the target speed according to the speed limits.
1415
1416 Arguments:
1417 value: Whether to activate this behavior
1418 """
1419 self.config.speed.follow_speed_limits = value
1420
[docs]
1421 def ignore_traffic_lights(self, active: bool = True) -> None:
1422 """(De)activates the checks for traffic lights"""
1423 self.config.obstacles.ignore_traffic_lights = active
1424
[docs]
1425 def ignore_stop_signs(self, active: bool = True) -> None:
1426 """(De)activates the checks for stop signs"""
1427 self.config.obstacles.ignore_stop_signs = active
1428
[docs]
1429 def ignore_vehicles(self, active: bool = True) -> None:
1430 """(De)activates the checks for stop signs"""
1431 self.config.obstacles.ignore_vehicles = active
1432
1439
1440 # ------------------ Overwritten & Outsourced functions ------------------ #
1441
1442 from agents.tools.lunatic_agent_tools import detect_vehicles, max_detection_distance
1443
1444 # signature of parent is str and not Literal["left", "right"]
1445 from agents.tools.lunatic_agent_tools import (
1446 generate_lane_change_path as _generate_lane_change_path, # pyright: ignore[reportAssignmentType]
1447 )
1448
1449 # NOTE: the original pedestrian_avoid_manager is still usable
1450 def pedestrian_avoid_manager(self, waypoint) -> NoReturn: # type: ignore
1451 """
1452 This function was replaced by ", substep_managers.pedestrian_detection_manager
1453
1454 :meta private:
1455 """
1456 raise NotImplementedError("This function was replaced by ", substep_managers.pedestrian_detection_manager)
1457
1458 #@override
1459 def collision_and_car_avoid_manager(self, waypoint) -> NoReturn: # type: ignore
1460 """
1461 This function was split into detect_obstacles_in_path and car_following_manager
1462
1463 :meta private:
1464 """
1465 raise NotImplementedError("This function was split into detect_obstacles_in_path and car_following_manager")
1466
1467 #@override
1468 def _tailgating(self, waypoint, vehicle_list) -> NoReturn: # type: ignore
1469 raise NotImplementedError("Tailgating has been implemented as a rule")
1470
1471 #@override
1472 def emergency_stop(self) -> NoReturn:
1473 """
1474 :meta private:
1475 """
1476 raise NotImplementedError("This function was overwritten use `add_emergency_stop` instead")
1477
1478 # ------------------------------------ #
1479 # As reference Parent Functions
1480 # ------------------------------------ #
1481 #def get_local_planner(self):
1482 #def get_global_planner(self):
1483
1484 #def done(self): # from base class self._local_planner.done()
1485
1486 #def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True):
1487 """
1488 Adds a specific plan to the agent.
1489
1490 :param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed
1491 :param stop_waypoint_creation: stops the automatic random creation of waypoints
1492 :param clean_queue: resets the current agent's plan
1493 """
1494
1495 #def trace_route(self, start_waypoint, end_waypoint):
1496 """
1497 Calculates the shortest route between a starting and ending waypoint.
1498
1499 :param start_waypoint (carla.Waypoint): initial waypoint
1500 :param end_waypoint (carla.Waypoint): final waypoint
1501 """
1502
1503 # ---------------- Cleanup ------------------- #
1504
1505 def _destroy_sensor(self):
1506 if self._detection_matrix:
1507 self._detection_matrix.stop()
1508 self._detection_matrix = None
1509 if self._collision_sensor:
1510 self._collision_sensor.destroy()
1511 self._collision_sensor = None # type: ignore[assignment]
1512
[docs]
1513 def destroy(self):
1514 """Resets attributes and destroys helpers like the :py:class:`.DetectionMatrix`."""
1515 self._destroy_sensor()
1516 self._world_model = None # type: ignore[assignment]
1517 self._world = None # type: ignore[assignment]
1518 if self.ctx:
1519 self.ctx.agent = None # type: ignore[assignment]
1520 self.ctx = None # type: ignore[assignment]
1521 try:
1522 self.all_vehicles.clear()
1523 self.vehicles_nearby.clear()
1524 self.all_walkers.clear()
1525 self.walkers_nearby.clear()
1526 except AttributeError:
1527 pass
1528
1529 def __del__(self):
1530 with contextlib.suppress(Exception):
1531 self.destroy()