Source code for classes.information_manager

  1"""
  2Aim of this module is to provide a less convoluted access to information,
  3i.e. distill the information from the data and return high level information
  4"""
  5
  6# pyright: reportPrivateUsage=none
  7
  8from __future__ import annotations
  9
 10# todo: maybe find another name for this module
 11from fnmatch import fnmatch
 12from functools import wraps
 13from typing import TYPE_CHECKING, Callable, ClassVar, Dict, List, NamedTuple, Optional, TypeVar, Union, cast
 14
 15from cachetools import cached
 16from typing_extensions import Concatenate, ParamSpec, Self
 17
 18from agents.tools.logs import logger
 19from classes.constants import AgentState
 20from launch_tools import CarlaDataProvider
 21
 22if TYPE_CHECKING:
 23    import carla
 24    from agents.lunatic_agent import LunaticAgent
 25
 26_T = TypeVar("_T")
 27_P = ParamSpec("_P")
 28
 29DRIVING_SPEED_THRESHOLD = 0.05  # m/s >= check
 30STOPPED_SPEED_THRESHOLD = 0.05  # m/s < check
 31
 32
[docs] 33class InformationManager: 34 """ 35 Tracks global information, e.g. all actors, traffic lights, etc. as well as 36 agent specific information, e.g. current speed, current location, and nearby actors in 37 relation to the agent. 38 39 Tip: 40 global information is attributed by :py:obj:`ClassVar` and :py:obj:`staticmethod` and is 41 shared across all instances, this information is constant for the current tick. 42 """ 43 44 _tick: ClassVar[int] = 0 45 """Current tick to not double class :py:meth:`global_tick`""" 46 47 # Instance Variables 48 relevant_traffic_light: Union[carla.TrafficLight, None] = None 49 """Result of :py:meth:`.CarlaDataProvider.get_next_traffic_light`""" 50 51 relevant_traffic_light_distance: float = float("inf") 52 """ 53 Distance to the :py:attr:`relevant_traffic_light` 54 Is float('inf') if :py:attr:`relevant_traffic_light` is None. 55 """ 56 57 _relevant_traffic_light_location: carla.Location = None # type: ignore[assignment] 58 """ 59 Note: 60 Is None if :py:attr:`relevant_traffic_light` is :code:`None` 61 or :py:attr:`relevant_traffic_light_distance` is :python:`float('inf')` 62 """ 63 64 state_counter: Dict[AgentState, int] 65 """ 66 Tracks different :py:class:`AgentState` and the amount of ticks the agent is in this state. 67 """ 68 69 _vehicle_speed: float # m/s 70 71 gathered_information: "InformationManager.Information" 72 """:py:class:`.InformationManager.Information` gathered during :py:meth:`tick`""" 73 74 # Class & Global Variables 75 vehicles: ClassVar["list[carla.Vehicle]"] 76 """List of all tracked vehicles""" 77 78 walkers: ClassVar["list[carla.Walker]"] 79 """List of all tracked pedestrians""" 80 81 static_obstacles: ClassVar["list[carla.Actor]"] 82 """List of all tracked static obstacles""" 83 84 obstacles: ClassVar["list[carla.Actor]"] 85 """Union of :py:attr:`vehicles`, py:attr:`walkers` and py:attr:`static_obstacles`""" 86 87 lights_map: ClassVar["Dict[int, carla.Waypoint]"] = {} 88 """Map of traffic lights to their trigger waypoints""" 89 90 frame: ClassVar["int | None"] = None 91 """ 92 Last frame the InformationManager was updated. 93 94 The current frame of the world should be passed to the global_tick method. 95 """ 96 97 # ---- Agent Specific Information ---- 98
[docs] 99 def __init__(self, agent: "LunaticAgent", update_information: bool = True): 100 self._agent = agent 101 self.live_info = agent.live_info 102 103 self._vehicle = agent._vehicle # maybe use a property # pyright: ignore[reportPrivateUsage] 104 105 # Share the dict 106 if getattr(agent, "current_states", None) is not None: 107 self.state_counter = agent.current_states 108 else: 109 self.state_counter = agent.current_states = dict.fromkeys(AgentState, 0) 110 self._states_checked = dict.fromkeys(AgentState, False) 111 if update_information: 112 self.tick()
113 114 # AgentState detection 115 116 # @staticmethod # not possible in python3.7 117 def _check_state(state: AgentState): # pyright: ignore[reportSelfClsParameterName, reportGeneralTypeIssues] 118 """ 119 Updates the state counter and the state checked dict when the function is called. 120 """ 121 122 def wrapper( 123 func: Callable[Concatenate[Self, _P], _T], 124 ) -> Callable[Concatenate[Self, _P], _T]: # -> _Wrapped[Callable[Concatenate[Self, _P], Any], bool | Non...: 125 @wraps(func) 126 def inner(self: Self, *args: _P.args, **kwargs: _P.kwargs): 127 result = func(self, *args, **kwargs) 128 if result is not None: 129 if result: 130 self.state_counter[state] += 1 131 else: 132 self.state_counter[state] = 0 133 self._states_checked[state] = True 134 return result 135 136 return inner 137 138 return wrapper 139 140 @_check_state(AgentState.DRIVING) 141 def detect_driving_state(self) -> bool: 142 """ 143 Increased the :py:attr:`AgentState.DRIVING` counter if the vehicle is driving. 144 145 :meta private: 146 """ 147 return self._vehicle_speed >= DRIVING_SPEED_THRESHOLD 148 149 @_check_state(AgentState.STOPPED) 150 def detect_stopped_state(self) -> bool: 151 """ 152 Increased the :py:attr:`AgentState.STOPPED` counter if the vehicle is stopped. 153 154 :meta private: 155 """ 156 return self._vehicle_speed < STOPPED_SPEED_THRESHOLD 157 158 @_check_state(AgentState.REVERSE) 159 def detect_reverse_state(self) -> bool: 160 """ 161 Determines if in the last tick the VehicleControl.reverse flag was set. 162 163 Increases the :py:attr:`AgentState.REVERSE` counter if the vehicle is driving in reverse. 164 165 :meta private: 166 """ 167 # TODO: can this be detected differently e.g. through the vehicle 168 return self.live_info.last_applied_controls.reverse 169 170 # Not implemented checks 171 172 @_check_state(AgentState.AGAINST_LANE_DIRECTION) 173 def detect_driving_against_lane_direction(self): 174 """ 175 :meta private: 176 """ 177 self._agent._current_waypoint.lane_id # positive or negative # noqa # pyright: ignore[reportPrivateUsage] 178 # TODO: How detect if the heading is against this direction? 179 # Need to also account for reverse state. 180 # ... NotImplemented 181 182 @_check_state(AgentState.OVERTAKING) 183 def detect_overtaking_state(self): 184 """ 185 :meta private: 186 """ 187 # ... NotImplemented 188 # # Can probably not be done easily, and must be done from outside 189 190 def check_states(self): 191 """ 192 Updates all states. Called in :py:meth:`tick`. 193 194 :meta private: 195 """ 196 # Updates are handles trough the decorators 197 self.detect_driving_state() 198 self.detect_stopped_state() 199 self.detect_reverse_state() 200 201 # --- Traffic Light --- 202 203 def _get_next_traffic_light(self) -> Optional[carla.TrafficLight]: 204 # TODO: Do not use the CDP but use the planned route instead. 205 self.relevant_traffic_light = CarlaDataProvider.get_next_traffic_light(self._vehicle) 206 if self.relevant_traffic_light: 207 self._relevant_traffic_light_location = self.relevant_traffic_light.get_location() 208 self.relevant_traffic_light_distance = self._relevant_traffic_light_location.distance( 209 CarlaDataProvider.get_location(self._vehicle) 210 ) # pyright: ignore[reportArgumentType] 211 else: 212 # Is at an intersection; always check for tlight or distance 213 self._relevant_traffic_light_location = None # type: ignore[assignment] 214 self.relevant_traffic_light_distance = float("inf") 215 return self.relevant_traffic_light 216
[docs] 217 def detect_next_traffic_light(self): 218 """ 219 Set the :py:attr:`relevant_traffic_light` and :py:attr:`relevant_traffic_light_distance` if not set. 220 221 Note: 222 Does not check for planned path but current route along waypoints, might not be exact. 223 224 **This function is automatically called in :py:meth:`tick`** 225 """ 226 if self.relevant_traffic_light_distance < float("inf"): 227 tlight_distance = self._relevant_traffic_light_location.distance(self.live_info.current_location) 228 else: 229 tlight_distance = float("inf") 230 231 # Search for a traffic light if none is given or if the distance to the current one increased 232 # 1% tolerance to prevent permanent updates when far away from a traffic light 233 if not self.relevant_traffic_light or tlight_distance > self.relevant_traffic_light_distance * 1.01: 234 # Update if the distance increased, and we might need to target another one; # TODO: This might be circumvented by passing and intersection 235 if self.relevant_traffic_light and tlight_distance > self.relevant_traffic_light_distance * 1.01: 236 logger.debug( 237 "Traffic light distance increased %s, did slow update.", self.relevant_traffic_light_distance 238 ) 239 self._get_next_traffic_light() 240 elif self.relevant_traffic_light: 241 self.relevant_traffic_light_distance = tlight_distance
242 243 # ---- Tick ---- 244
[docs] 245 def tick(self): 246 """ 247 Tick the information manager and update the information for the corresponding agent. 248 """ 249 snapshot = CarlaDataProvider.get_world().get_snapshot() 250 self.global_tick(snapshot.frame) 251 252 # --- Vehicle Information --- 253 self.live_info.last_applied_controls = self._vehicle.get_control() 254 255 # - Speed - 256 # NOTE: get_velocity does not take the z axis into account. 257 self.live_info.current_speed_limit = self._vehicle.get_speed_limit() 258 259 self.live_info.velocity_vector = self._vehicle.get_velocity() 260 261 self._vehicle_speed = CarlaDataProvider.get_velocity(self._vehicle) # used for AgentState Checks 262 self.live_info.current_speed = self._vehicle_speed * 3.6 # km/h 263 264 # - Location - 265 # NOTE: That transform.location and location are similar but not identical. 266 self.live_info.current_transform = CarlaDataProvider.get_transform(self._vehicle) # pyright: ignore[reportAttributeAccessIssue] 267 self.live_info.current_location = current_loc = CarlaDataProvider.get_location( 268 self._vehicle 269 ) # NOTE: is None if past run not cleaned # noqa: E501 # type: ignore 270 # Only exact waypoint. TODO: update in agent 271 # Comment should be visible in traceback. 272 current_waypoint = cast( 273 "carla.Waypoint", CarlaDataProvider.get_map().get_waypoint(current_loc) 274 ) # NOTE: Might throw error if past run was not cleaned; or the world did not tick yet. # noqa: E501 # pyright: ignore[reportCallIssue, reportArgumentType] 275 276 # Traffic Light 277 # NOTE: Must be AFTER the location update 278 self.detect_next_traffic_light() 279 self.live_info.next_traffic_light = self.relevant_traffic_light 280 self.live_info.next_traffic_light_distance = self.relevant_traffic_light_distance 281 282 # Nearby actors 283 self.distances: Dict[carla.Actor, float] = {} 284 285 @cached(cache=self.distances) 286 def dist(v: carla.Actor): 287 if not v.is_alive: 288 logger.warning("Actor is not alive - this should not happen.") 289 return v_filter_dist # filter out 290 return v.get_location().distance(current_loc) # pyright: ignore[reportArgumentType] 291 292 # Filter nearby 293 # Vehicles 294 v_filter_dist = self._agent.config.obstacles.nearby_vehicles_max_distance 295 self.vehicles_nearby: List[carla.Vehicle] = [] 296 for v in self.vehicles: 297 if v.id != self._vehicle.id and dist(v) < v_filter_dist: 298 self.vehicles_nearby.append(v) 299 self.vehicles_nearby = sorted(self.vehicles_nearby, key=dist) 300 301 # Static obstacles 302 self.static_obstacles_nearby: list[carla.Actor] = [] 303 for o in self.static_obstacles: 304 if dist(o) < v_filter_dist: 305 self.static_obstacles_nearby.append(o) 306 self.static_obstacles_nearby = sorted(self.static_obstacles_nearby, key=dist) 307 308 # Walkers 309 v_filter_dist = ( 310 self._agent.config.obstacles.nearby_walkers_max_distance 311 ) # in case of a different distance for walkers. 312 self.walkers_nearby: list[carla.Walker] = [] 313 for w in self.walkers: 314 if dist(w) < v_filter_dist: 315 self.walkers_nearby.append(w) 316 317 self.walkers_nearby = sorted(self.walkers_nearby, key=dist) 318 # All actors to be tracked 319 self.obstacles_nearby = self.walkers_nearby + self.static_obstacles_nearby + self.vehicles_nearby 320 self.obstacles_nearby = sorted(self.obstacles_nearby, key=dist) 321 322 # Nearby Traffic lights 323 # By default this checks for 5 seconds range + 10 m 324 self.traffic_lights_nearby = [ 325 tl 326 for tl in InformationManager.get_traffic_lights() 327 if dist(tl) < self._agent.config.obstacles.nearby_tlights_max_distance 328 ] 329 self.traffic_lights_nearby = sorted(self.traffic_lights_nearby, key=dist) 330 331 self.check_states() 332 333 # ----- Return Summary ----- 334 335 # TODO: Extend distances with carla lights 336 self.gathered_information = InformationManager.Information( 337 current_waypoint=current_waypoint, 338 current_speed=self.live_info.current_speed, 339 current_states=self.state_counter, 340 relevant_traffic_light=self.relevant_traffic_light, 341 relevant_traffic_light_distance=self.relevant_traffic_light_distance, 342 vehicles=self.vehicles, 343 walkers=self.walkers, 344 static_obstacles=self.static_obstacles, 345 obstacles=self.obstacles, 346 walkers_nearby=self.walkers_nearby, 347 vehicles_nearby=self.vehicles_nearby, 348 static_obstacles_nearby=self.static_obstacles_nearby, 349 obstacles_nearby=self.obstacles_nearby, 350 traffic_lights_nearby=self.traffic_lights_nearby, 351 distances=self.distances, 352 ) 353 return self.gathered_information
354 355 # Helper subclass 356
[docs] 357 class Information(NamedTuple): 358 """Data gathered by the InformationManager which is passed to the agent.""" 359 360 current_waypoint: carla.Waypoint 361 current_speed: float 362 current_states: Dict[AgentState, int] 363 364 relevant_traffic_light: Optional[carla.TrafficLight] 365 relevant_traffic_light_distance: float 366 367 vehicles: List[carla.Vehicle] 368 walkers: List[carla.Walker] 369 static_obstacles: List[carla.Actor] 370 """Filtered obstacles by InformationManager.OBSTACLE_FILTER""" 371 obstacles: List[carla.Actor] 372 """Union of vehicles, walkers and static_obstacles""" 373 374 walkers_nearby: List[carla.Walker] 375 vehicles_nearby: List[carla.Vehicle] 376 static_obstacles_nearby: List[carla.Actor] 377 obstacles_nearby: List[carla.Actor] 378 379 traffic_lights_nearby: List[carla.TrafficLight] 380 381 distances: Dict[carla.Actor, float] 382 """Distances to all actors in :py:attr:`obstacles`"""
383 384 # ---- Global Information ---- 385 386 OBSTACLE_FILTER: str = "static.prop.[cistmw]*" 387 """ 388 fnmatch for obstacles that the agent will consider in its path. 389 https://carla.readthedocs.io/en/latest/bp_library/#static 390 """ 391
[docs] 392 @staticmethod 393 def get_traffic_lights() -> Dict[carla.TrafficLight, carla.Transform]: 394 return CarlaDataProvider._traffic_light_map
395
[docs] 396 @staticmethod 397 def get_trafficlight_trigger_waypoint(traffic_light: "carla.TrafficLight") -> carla.Waypoint: 398 """ 399 Get the location where the traffic light is triggered. 400 """ 401 if traffic_light.id in InformationManager.lights_map: 402 return InformationManager.lights_map[traffic_light.id] 403 trigger_location = CarlaDataProvider.get_trafficlight_trigger_location(traffic_light) 404 trigger_wp = CarlaDataProvider.get_map().get_waypoint(trigger_location) 405 InformationManager.lights_map[traffic_light.id] = trigger_wp 406 return trigger_wp
407
[docs] 408 @staticmethod 409 def global_tick(frame: Optional[int] = None) -> None: 410 """ 411 Update global information that is constant for the current tick and not agent specific. 412 413 Updates: 414 - :py:attr:`vehicles` 415 - :py:attr:`walkers` 416 - :py:attr:`static_obstacles` 417 - :py:attr:`obstacles` 418 - :py:attr:`frame` 419 420 Parameters: 421 frame: The id of the current frame. If None retrieves the id from the current 422 :py:class:`carla.WorldSnapshot`. Multiple calls with the same frame are ignored. 423 (default: None) 424 """ 425 # Assure to call this only once 426 if frame is None: 427 frame = CarlaDataProvider.get_world().get_snapshot().frame 428 else: 429 # DEBUG; TEMP 430 snap_frame = CarlaDataProvider.get_world().get_snapshot().frame 431 if frame != snap_frame: 432 logger.warning(f"Frame {frame} does not match snapshot frame {snap_frame}") 433 if frame == InformationManager.frame: 434 return 435 InformationManager.frame = frame 436 437 # Filter vehicles 438 InformationManager.vehicles = [] 439 InformationManager.walkers = [] 440 InformationManager.static_obstacles = [] 441 InformationManager._other_actors: List[carla.Actor] = [] 442 # For traffic lights use: InformationManager.get_traffic_lights(), which is map-constant 443 444 # Use copy and check for None because of updates could be done by threads in parallel 445 for actor_id in CarlaDataProvider._carla_actor_pool.copy(): 446 try: 447 actor = CarlaDataProvider._carla_actor_pool[actor_id] # might be deleted in parallel 448 if actor is None or not actor.is_alive: # pyright: ignore[reportUnnecessaryComparison] 449 logger.debug("Detected dead actor in the pool. %s", (actor.id, actor.type_id, actor.attributes)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 450 del CarlaDataProvider._carla_actor_pool[actor_id] 451 continue 452 except KeyError: 453 # actor was deleted in parallel 454 print("KeyError", actor_id) 455 continue 456 if fnmatch(actor.type_id, "vehicle*"): 457 InformationManager.vehicles.append(actor) # pyright: ignore[reportArgumentType]] 458 elif fnmatch(actor.type_id, "walker.pedestrian*"): 459 InformationManager.walkers.append(actor) # pyright: ignore[reportArgumentType] 460 # TODO: we could assume that these actors are mostly constant and only created in slow intervals 461 elif fnmatch(actor.type_id, InformationManager.OBSTACLE_FILTER): 462 InformationManager.static_obstacles.append(actor) 463 else: 464 InformationManager._other_actors.append(actor) 465 466 InformationManager.obstacles = ( 467 InformationManager.walkers + InformationManager.static_obstacles + InformationManager.vehicles 468 )
469
[docs] 470 @staticmethod 471 def get_vehicles() -> List[carla.Vehicle]: 472 return InformationManager.vehicles
473
[docs] 474 @staticmethod 475 def get_walkers() -> List[carla.Walker]: 476 return InformationManager.walkers
477
[docs] 478 @staticmethod 479 def cleanup() -> None: 480 """Resets the global information.""" 481 InformationManager.vehicles.clear() 482 InformationManager.walkers.clear() 483 InformationManager.static_obstacles.clear() 484 InformationManager.obstacles.clear() 485 InformationManager._other_actors.clear() 486 InformationManager.frame = None 487 InformationManager._tick = 0