Source code for classes.information_manager

  1"""
  2Aim of this module is to provide a less convoluted access to information,
  3i.e. distill the information from the data and return high level information
  4"""
  5
  6# pyright: reportPrivateUsage=none
  7
  8from __future__ import annotations
  9
 10# todo: maybe find another name for this module
 11from fnmatch import fnmatch
 12from functools import wraps
 13from typing import TYPE_CHECKING, Callable, ClassVar, Dict, List, NamedTuple, Optional, TypeVar, Union, cast
 14
 15import carla
 16from cachetools import cached
 17from typing_extensions import Concatenate, ParamSpec, Self
 18
 19from agents.tools.logs import logger
 20from classes.constants import AgentState
 21from launch_tools import CarlaDataProvider
 22
 23if TYPE_CHECKING:
 24    from agents.lunatic_agent import LunaticAgent
 25
 26_T = TypeVar("_T")
 27_P = ParamSpec("_P")
 28
 29DRIVING_SPEED_THRESHOLD = 0.05  # m/s >= check
 30STOPPED_SPEED_THRESHOLD = 0.05  # m/s < check
 31
 32
[docs] 33class InformationManager: 34 """ 35 Tracks global information, e.g. all actors, traffic lights, etc. as well as 36 agent specific information, e.g. current speed, current location, and nearby actors in 37 relation to the agent. 38 39 Tip: 40 global information is attributed by :py:obj:`ClassVar` and :py:obj:`staticmethod` and is 41 shared across all instances, this information is constant for the current tick. 42 """ 43 44 _tick: ClassVar[int] = 0 45 """Current tick to not double class :py:meth:`global_tick`""" 46 47 # Instance Variables 48 relevant_traffic_light: Union[carla.TrafficLight, None] = None 49 """Result of :py:meth:`.CarlaDataProvider.get_next_traffic_light`""" 50 51 relevant_traffic_light_distance: float = float('inf') 52 """ 53 Distance to the :py:attr:`relevant_traffic_light` 54 Is float('inf') if :py:attr:`relevant_traffic_light` is None. 55 """ 56 57 _relevant_traffic_light_location: carla.Location = None # type: ignore[assignment] 58 """ 59 Note: 60 Is None if :py:attr:`relevant_traffic_light` is :code:`None` 61 or :py:attr:`relevant_traffic_light_distance` is :python:`float('inf')` 62 """ 63 64 state_counter: Dict[AgentState, int] 65 """ 66 Tracks different :py:class:`AgentState` and the amount of ticks the agent is in this state. 67 """ 68 69 _vehicle_speed: float # m/s 70 71 gathered_information: "InformationManager.Information" 72 """:py:class:`.InformationManager.Information` gathered during :py:meth:`tick`""" 73 74 # Class & Global Variables 75 vehicles: ClassVar["list[carla.Vehicle]"] 76 """List of all tracked vehicles""" 77 78 walkers: ClassVar["list[carla.Walker]"] 79 """List of all tracked pedestrians""" 80 81 static_obstacles: ClassVar["list[carla.Actor]"] 82 """List of all tracked static obstacles""" 83 84 obstacles: ClassVar["list[carla.Actor]"] 85 """Union of :py:attr:`vehicles`, py:attr:`walkers` and py:attr:`static_obstacles`""" 86 87 lights_map: ClassVar["Dict[int, carla.Waypoint]"] = {} 88 """Map of traffic lights to their trigger waypoints""" 89 90 frame: ClassVar["int | None"] = None 91 """ 92 Last frame the InformationManager was updated. 93 94 The current frame of the world should be passed to the global_tick method. 95 """ 96 97 # ---- Agent Specific Information ---- 98
[docs] 99 def __init__(self, agent: "LunaticAgent", update_information: bool = True): 100 self._agent = agent 101 self.live_info = agent.live_info 102 103 self._vehicle = agent._vehicle # maybe use a property # pyright: ignore[reportPrivateUsage] 104 105 # Share the dict 106 if getattr(agent, "current_states", None) is not None: 107 self.state_counter = agent.current_states 108 else: 109 self.state_counter = agent.current_states = dict.fromkeys(AgentState, 0) 110 self._states_checked = dict.fromkeys(AgentState, False) 111 if update_information: 112 self.tick()
113 114 # AgentState detection 115 116 #@staticmethod # not possible in python3.7 117 def _check_state(state: AgentState): # pyright: ignore[reportSelfClsParameterName, reportGeneralTypeIssues] 118 """ 119 Updates the state counter and the state checked dict when the function is called. 120 """ 121 def wrapper(func: Callable[Concatenate[Self, _P], _T]) -> Callable[Concatenate[Self, _P], _T]: # -> _Wrapped[Callable[Concatenate[Self, _P], Any], bool | Non...: 122 @wraps(func) 123 def inner(self: Self, *args: _P.args, **kwargs: _P.kwargs): 124 result = func(self, *args, **kwargs) 125 if result is not None: 126 if result: 127 self.state_counter[state] += 1 128 else: 129 self.state_counter[state] = 0 130 self._states_checked[state] = True 131 return result 132 return inner 133 return wrapper 134 135 @_check_state(AgentState.DRIVING) 136 def detect_driving_state(self) -> bool: 137 """ 138 Increased the :py:attr:`AgentState.DRIVING` counter if the vehicle is driving. 139 140 :meta private: 141 """ 142 return self._vehicle_speed >= DRIVING_SPEED_THRESHOLD 143 144 @_check_state(AgentState.STOPPED) 145 def detect_stopped_state(self) -> bool: 146 """ 147 Increased the :py:attr:`AgentState.STOPPED` counter if the vehicle is stopped. 148 149 :meta private: 150 """ 151 return self._vehicle_speed < STOPPED_SPEED_THRESHOLD 152 153 @_check_state(AgentState.REVERSE) 154 def detect_reverse_state(self) -> bool: 155 """ 156 Determines if in the last tick the VehicleControl.reverse flag was set. 157 158 Increases the :py:attr:`AgentState.REVERSE` counter if the vehicle is driving in reverse. 159 160 :meta private: 161 """ 162 # TODO: can this be detected differently e.g. through the vehicle 163 return self.live_info.last_applied_controls.reverse 164 165 # Not implemented checks 166 167 @_check_state(AgentState.AGAINST_LANE_DIRECTION) 168 def detect_driving_against_lane_direction(self): 169 """ 170 :meta private: 171 """ 172 self._agent._current_waypoint.lane_id # positive or negative # noqa # pyright: ignore[reportPrivateUsage] 173 # TODO: How detect if the heading is against this direction? 174 # Need to also account for reverse state. 175 # ... NotImplemented 176 177 @_check_state(AgentState.OVERTAKING) 178 def detect_overtaking_state(self): 179 """ 180 :meta private: 181 """ 182 #... NotImplemented 183 # # Can probably not be done easily, and must be done from outside 184 185 def check_states(self): 186 """ 187 Updates all states. Called in :py:meth:`tick`. 188 189 :meta private: 190 """ 191 # Updates are handles trough the decorators 192 self.detect_driving_state() 193 self.detect_stopped_state() 194 self.detect_reverse_state() 195 196 # --- Traffic Light --- 197 198 def _get_next_traffic_light(self) -> Optional[carla.TrafficLight]: 199 # TODO: Do not use the CDP but use the planned route instead. 200 self.relevant_traffic_light = CarlaDataProvider.get_next_traffic_light(self._vehicle) 201 if self.relevant_traffic_light: 202 self._relevant_traffic_light_location = self.relevant_traffic_light.get_location() 203 self.relevant_traffic_light_distance = self._relevant_traffic_light_location.distance( 204 CarlaDataProvider.get_location(self._vehicle)) # pyright: ignore[reportArgumentType] 205 else: 206 # Is at an intersection; always check for tlight or distance 207 self._relevant_traffic_light_location = None # type: ignore[assignment] 208 self.relevant_traffic_light_distance = float('inf') 209 return self.relevant_traffic_light 210
[docs] 211 def detect_next_traffic_light(self): 212 """ 213 Set the :py:attr:`relevant_traffic_light` and :py:attr:`relevant_traffic_light_distance` if not set. 214 215 Note: 216 Does not check for planned path but current route along waypoints, might not be exact. 217 218 **This function is automatically called in :py:meth:`tick`** 219 """ 220 if self.relevant_traffic_light_distance < float('inf'): 221 tlight_distance = self._relevant_traffic_light_location.distance(self.live_info.current_location) 222 else: 223 tlight_distance = float('inf') 224 225 # Search for a traffic light if none is given or if the distance to the current one increased 226 # 1% tolerance to prevent permanent updates when far away from a traffic light 227 if not self.relevant_traffic_light or tlight_distance > self.relevant_traffic_light_distance * 1.01: 228 # Update if the distance increased, and we might need to target another one; # TODO: This might be circumvented by passing and intersection 229 if self.relevant_traffic_light and tlight_distance > self.relevant_traffic_light_distance * 1.01: 230 logger.debug("Traffic light distance increased %s, did slow update.", self.relevant_traffic_light_distance) 231 self._get_next_traffic_light() 232 elif self.relevant_traffic_light: 233 self.relevant_traffic_light_distance = tlight_distance
234 235 # ---- Tick ---- 236
[docs] 237 def tick(self): 238 """ 239 Tick the information manager and update the information for the corresponding agent. 240 """ 241 snapshot = CarlaDataProvider.get_world().get_snapshot() 242 self.global_tick(snapshot.frame) 243 244 # --- Vehicle Information --- 245 self.live_info.last_applied_controls = self._vehicle.get_control() 246 247 # - Speed - 248 # NOTE: get_velocity does not take the z axis into account. 249 self.live_info.current_speed_limit = self._vehicle.get_speed_limit() 250 251 self.live_info.velocity_vector = self._vehicle.get_velocity() 252 253 self._vehicle_speed = CarlaDataProvider.get_velocity(self._vehicle) # used for AgentState Checks 254 self.live_info.current_speed = self._vehicle_speed * 3.6 # km/h 255 256 # - Location - 257 # NOTE: That transform.location and location are similar but not identical. 258 self.live_info.current_transform = CarlaDataProvider.get_transform(self._vehicle) # pyright: ignore[reportAttributeAccessIssue] 259 self.live_info.current_location = _current_loc = CarlaDataProvider.get_location(self._vehicle) # NOTE: is None if past run not cleaned # noqa: E501 # type: ignore 260 # Only exact waypoint. TODO: update in agent 261 # Comment should be visible in traceback. 262 current_waypoint = cast(carla.Waypoint, CarlaDataProvider.get_map().get_waypoint(_current_loc)) # NOTE: Might throw error if past run was not cleaned; or the world did not tick yet. # noqa: E501 # pyright: ignore[reportCallIssue, reportArgumentType] 263 264 # Traffic Light 265 # NOTE: Must be AFTER the location update 266 self.detect_next_traffic_light() 267 self.live_info.next_traffic_light = self.relevant_traffic_light 268 self.live_info.next_traffic_light_distance = self.relevant_traffic_light_distance 269 270 # Nearby actors 271 self.distances: Dict[carla.Actor, float] = {} 272 273 @cached(cache=self.distances) 274 def dist(v: carla.Actor): 275 if not v.is_alive: 276 logger.warning("Actor is not alive - this should not happen.") 277 return _v_filter_dist # filter out 278 return v.get_location().distance(_current_loc) # pyright: ignore[reportArgumentType] 279 280 # Filter nearby 281 # Vehicles 282 _v_filter_dist = self._agent.config.obstacles.nearby_vehicles_max_distance 283 self.vehicles_nearby: List[carla.Vehicle] = [] 284 for v in self.vehicles: 285 if v.id != self._vehicle.id and dist(v) < _v_filter_dist: 286 self.vehicles_nearby.append(v) 287 self.vehicles_nearby = sorted(self.vehicles_nearby, key=dist) 288 289 # Static obstacles 290 self.static_obstacles_nearby: list[carla.Actor] = [] 291 for o in self.static_obstacles: 292 if dist(o) < _v_filter_dist: 293 self.static_obstacles_nearby.append(o) 294 self.static_obstacles_nearby = sorted(self.static_obstacles_nearby, key=dist) 295 296 # Walkers 297 _v_filter_dist = self._agent.config.obstacles.nearby_walkers_max_distance # in case of a different distance for walkers. 298 self.walkers_nearby: list[carla.Walker] = [] 299 for w in self.walkers: 300 if dist(w) < _v_filter_dist: 301 self.walkers_nearby.append(w) 302 303 self.walkers_nearby = sorted(self.walkers_nearby, key=dist) 304 # All actors to be tracked 305 self.obstacles_nearby = self.walkers_nearby + self.static_obstacles_nearby + self.vehicles_nearby 306 self.obstacles_nearby = sorted(self.obstacles_nearby, key=dist) 307 308 # Nearby Traffic lights 309 # By default this checks for 5 seconds range + 10 m 310 self.traffic_lights_nearby = [tl for tl in InformationManager.get_traffic_lights() 311 if dist(tl) < self._agent.config.obstacles.nearby_tlights_max_distance] 312 self.traffic_lights_nearby = sorted(self.traffic_lights_nearby, key=dist) 313 314 self.check_states() 315 316 # ----- Return Summary ----- 317 318 # TODO: Extend distances with carla lights 319 self.gathered_information = InformationManager.Information( 320 current_waypoint=current_waypoint, 321 current_speed=self.live_info.current_speed, 322 current_states=self.state_counter, 323 324 relevant_traffic_light=self.relevant_traffic_light, 325 relevant_traffic_light_distance=self.relevant_traffic_light_distance, 326 327 vehicles=self.vehicles, 328 walkers=self.walkers, 329 static_obstacles=self.static_obstacles, 330 obstacles=self.obstacles, 331 332 walkers_nearby=self.walkers_nearby, 333 vehicles_nearby=self.vehicles_nearby, 334 static_obstacles_nearby=self.static_obstacles_nearby, 335 obstacles_nearby=self.obstacles_nearby, 336 traffic_lights_nearby=self.traffic_lights_nearby, 337 338 distances=self.distances, 339 ) 340 return self.gathered_information
341 342 # Helper subclass 343
[docs] 344 class Information(NamedTuple): 345 """Data gathered by the InformationManager which is passed to the agent.""" 346 347 current_waypoint: carla.Waypoint 348 current_speed: float 349 current_states: Dict[AgentState, int] 350 351 relevant_traffic_light: Optional[carla.TrafficLight] 352 relevant_traffic_light_distance: float 353 354 vehicles: List[carla.Vehicle] 355 walkers: List[carla.Walker] 356 static_obstacles: List[carla.Actor] 357 """Filtered obstacles by InformationManager.OBSTACLE_FILTER""" 358 obstacles: List[carla.Actor] 359 """Union of vehicles, walkers and static_obstacles""" 360 361 walkers_nearby: List[carla.Walker] 362 vehicles_nearby: List[carla.Vehicle] 363 static_obstacles_nearby: List[carla.Actor] 364 obstacles_nearby: List[carla.Actor] 365 366 traffic_lights_nearby: List[carla.TrafficLight] 367 368 distances: Dict[carla.Actor, float] 369 """Distances to all actors in :py:attr:`obstacles`"""
370 371 # ---- Global Information ---- 372 373 OBSTACLE_FILTER: str = "static.prop.[cistmw]*" 374 """ 375 fnmatch for obstacles that the agent will consider in its path. 376 https://carla.readthedocs.io/en/latest/bp_library/#static 377 """ 378
[docs] 379 @staticmethod 380 def get_traffic_lights() -> Dict[carla.TrafficLight, carla.Transform]: 381 return CarlaDataProvider._traffic_light_map
382
[docs] 383 @staticmethod 384 def get_trafficlight_trigger_waypoint(traffic_light: "carla.TrafficLight") -> carla.Waypoint: 385 """ 386 Get the location where the traffic light is triggered. 387 """ 388 if traffic_light.id in InformationManager.lights_map: 389 return InformationManager.lights_map[traffic_light.id] 390 trigger_location = CarlaDataProvider.get_trafficlight_trigger_location(traffic_light) 391 trigger_wp = CarlaDataProvider.get_map().get_waypoint(trigger_location) 392 InformationManager.lights_map[traffic_light.id] = trigger_wp 393 return trigger_wp
394
[docs] 395 @staticmethod 396 def global_tick(frame: Optional[int] = None) -> None: 397 """ 398 Update global information that is constant for the current tick and not agent specific. 399 400 Updates: 401 - :py:attr:`vehicles` 402 - :py:attr:`walkers` 403 - :py:attr:`static_obstacles` 404 - :py:attr:`obstacles` 405 - :py:attr:`frame` 406 407 Parameters: 408 frame: The id of the current frame. If None retrieves the id from the current 409 :py:class:`carla.WorldSnapshot`. Multiple calls with the same frame are ignored. 410 (default: None) 411 """ 412 # Assure to call this only once 413 if frame is None: 414 frame = CarlaDataProvider.get_world().get_snapshot().frame 415 else: 416 # DEBUG; TEMP 417 snap_frame = CarlaDataProvider.get_world().get_snapshot().frame 418 if frame != snap_frame: 419 logger.warning(f"Frame {frame} does not match snapshot frame {snap_frame}") 420 if frame == InformationManager.frame: 421 return 422 InformationManager.frame = frame 423 424 # Filter vehicles 425 InformationManager.vehicles = [] 426 InformationManager.walkers = [] 427 InformationManager.static_obstacles = [] 428 InformationManager._other_actors: List[carla.Actor] = [] 429 # For traffic lights use: InformationManager.get_traffic_lights(), which is map-constant 430 431 # Use copy and check for None because of updates could be done by threads in parallel 432 for actor_id in CarlaDataProvider._carla_actor_pool.copy(): 433 try: 434 actor = CarlaDataProvider._carla_actor_pool[actor_id] # might be deleted in parallel 435 if actor is None or not actor.is_alive: # pyright: ignore[reportUnnecessaryComparison] 436 logger.debug("Detected dead actor in the pool. %s", (actor.id, 437 actor.type_id, 438 actor.attributes)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 439 del CarlaDataProvider._carla_actor_pool[actor_id] 440 continue 441 except KeyError: 442 # actor was deleted in parallel 443 print("KeyError", actor_id) 444 continue 445 if fnmatch(actor.type_id, "vehicle*"): 446 InformationManager.vehicles.append(actor) # pyright: ignore[reportArgumentType]] 447 elif fnmatch(actor.type_id, "walker.pedestrian*"): 448 InformationManager.walkers.append(actor) # pyright: ignore[reportArgumentType] 449 # TODO: we could assume that these actors are mostly constant and only created in slow intervals 450 elif fnmatch(actor.type_id, InformationManager.OBSTACLE_FILTER): 451 InformationManager.static_obstacles.append(actor) 452 else: 453 InformationManager._other_actors.append(actor) 454 455 InformationManager.obstacles = InformationManager.walkers + InformationManager.static_obstacles + InformationManager.vehicles
456
[docs] 457 @staticmethod 458 def get_vehicles() -> List[carla.Vehicle]: 459 return InformationManager.vehicles
460
[docs] 461 @staticmethod 462 def get_walkers() -> List[carla.Walker]: 463 return InformationManager.walkers
464
[docs] 465 @staticmethod 466 def cleanup() -> None: 467 """Resets the global information.""" 468 InformationManager.vehicles.clear() 469 InformationManager.walkers.clear() 470 InformationManager.static_obstacles.clear() 471 InformationManager.obstacles.clear() 472 InformationManager._other_actors.clear() 473 InformationManager.frame = None 474 InformationManager._tick = 0