Source code for classes.information_manager
1"""
2Aim of this module is to provide a less convoluted access to information,
3i.e. distill the information from the data and return high level information
4"""
5
6# pyright: reportPrivateUsage=none
7
8from __future__ import annotations
9
10# todo: maybe find another name for this module
11from fnmatch import fnmatch
12from functools import wraps
13from typing import TYPE_CHECKING, Callable, ClassVar, Dict, List, NamedTuple, Optional, TypeVar, Union, cast
14
15from cachetools import cached
16from typing_extensions import Concatenate, ParamSpec, Self
17
18from agents.tools.logs import logger
19from classes.constants import AgentState
20from launch_tools import CarlaDataProvider
21
22if TYPE_CHECKING:
23 import carla
24 from agents.lunatic_agent import LunaticAgent
25
26_T = TypeVar("_T")
27_P = ParamSpec("_P")
28
29DRIVING_SPEED_THRESHOLD = 0.05 # m/s >= check
30STOPPED_SPEED_THRESHOLD = 0.05 # m/s < check
31
32
[docs]
33class InformationManager:
34 """
35 Tracks global information, e.g. all actors, traffic lights, etc. as well as
36 agent specific information, e.g. current speed, current location, and nearby actors in
37 relation to the agent.
38
39 Tip:
40 global information is attributed by :py:obj:`ClassVar` and :py:obj:`staticmethod` and is
41 shared across all instances, this information is constant for the current tick.
42 """
43
44 _tick: ClassVar[int] = 0
45 """Current tick to not double class :py:meth:`global_tick`"""
46
47 # Instance Variables
48 relevant_traffic_light: Union[carla.TrafficLight, None] = None
49 """Result of :py:meth:`.CarlaDataProvider.get_next_traffic_light`"""
50
51 relevant_traffic_light_distance: float = float("inf")
52 """
53 Distance to the :py:attr:`relevant_traffic_light`
54 Is float('inf') if :py:attr:`relevant_traffic_light` is None.
55 """
56
57 _relevant_traffic_light_location: carla.Location = None # type: ignore[assignment]
58 """
59 Note:
60 Is None if :py:attr:`relevant_traffic_light` is :code:`None`
61 or :py:attr:`relevant_traffic_light_distance` is :python:`float('inf')`
62 """
63
64 state_counter: Dict[AgentState, int]
65 """
66 Tracks different :py:class:`AgentState` and the amount of ticks the agent is in this state.
67 """
68
69 _vehicle_speed: float # m/s
70
71 gathered_information: "InformationManager.Information"
72 """:py:class:`.InformationManager.Information` gathered during :py:meth:`tick`"""
73
74 # Class & Global Variables
75 vehicles: ClassVar["list[carla.Vehicle]"]
76 """List of all tracked vehicles"""
77
78 walkers: ClassVar["list[carla.Walker]"]
79 """List of all tracked pedestrians"""
80
81 static_obstacles: ClassVar["list[carla.Actor]"]
82 """List of all tracked static obstacles"""
83
84 obstacles: ClassVar["list[carla.Actor]"]
85 """Union of :py:attr:`vehicles`, py:attr:`walkers` and py:attr:`static_obstacles`"""
86
87 lights_map: ClassVar["Dict[int, carla.Waypoint]"] = {}
88 """Map of traffic lights to their trigger waypoints"""
89
90 frame: ClassVar["int | None"] = None
91 """
92 Last frame the InformationManager was updated.
93
94 The current frame of the world should be passed to the global_tick method.
95 """
96
97 # ---- Agent Specific Information ----
98
[docs]
99 def __init__(self, agent: "LunaticAgent", update_information: bool = True):
100 self._agent = agent
101 self.live_info = agent.live_info
102
103 self._vehicle = agent._vehicle # maybe use a property # pyright: ignore[reportPrivateUsage]
104
105 # Share the dict
106 if getattr(agent, "current_states", None) is not None:
107 self.state_counter = agent.current_states
108 else:
109 self.state_counter = agent.current_states = dict.fromkeys(AgentState, 0)
110 self._states_checked = dict.fromkeys(AgentState, False)
111 if update_information:
112 self.tick()
113
114 # AgentState detection
115
116 # @staticmethod # not possible in python3.7
117 def _check_state(state: AgentState): # pyright: ignore[reportSelfClsParameterName, reportGeneralTypeIssues]
118 """
119 Updates the state counter and the state checked dict when the function is called.
120 """
121
122 def wrapper(
123 func: Callable[Concatenate[Self, _P], _T],
124 ) -> Callable[Concatenate[Self, _P], _T]: # -> _Wrapped[Callable[Concatenate[Self, _P], Any], bool | Non...:
125 @wraps(func)
126 def inner(self: Self, *args: _P.args, **kwargs: _P.kwargs):
127 result = func(self, *args, **kwargs)
128 if result is not None:
129 if result:
130 self.state_counter[state] += 1
131 else:
132 self.state_counter[state] = 0
133 self._states_checked[state] = True
134 return result
135
136 return inner
137
138 return wrapper
139
140 @_check_state(AgentState.DRIVING)
141 def detect_driving_state(self) -> bool:
142 """
143 Increased the :py:attr:`AgentState.DRIVING` counter if the vehicle is driving.
144
145 :meta private:
146 """
147 return self._vehicle_speed >= DRIVING_SPEED_THRESHOLD
148
149 @_check_state(AgentState.STOPPED)
150 def detect_stopped_state(self) -> bool:
151 """
152 Increased the :py:attr:`AgentState.STOPPED` counter if the vehicle is stopped.
153
154 :meta private:
155 """
156 return self._vehicle_speed < STOPPED_SPEED_THRESHOLD
157
158 @_check_state(AgentState.REVERSE)
159 def detect_reverse_state(self) -> bool:
160 """
161 Determines if in the last tick the VehicleControl.reverse flag was set.
162
163 Increases the :py:attr:`AgentState.REVERSE` counter if the vehicle is driving in reverse.
164
165 :meta private:
166 """
167 # TODO: can this be detected differently e.g. through the vehicle
168 return self.live_info.last_applied_controls.reverse
169
170 # Not implemented checks
171
172 @_check_state(AgentState.AGAINST_LANE_DIRECTION)
173 def detect_driving_against_lane_direction(self):
174 """
175 :meta private:
176 """
177 self._agent._current_waypoint.lane_id # positive or negative # noqa # pyright: ignore[reportPrivateUsage]
178 # TODO: How detect if the heading is against this direction?
179 # Need to also account for reverse state.
180 # ... NotImplemented
181
182 @_check_state(AgentState.OVERTAKING)
183 def detect_overtaking_state(self):
184 """
185 :meta private:
186 """
187 # ... NotImplemented
188 # # Can probably not be done easily, and must be done from outside
189
190 def check_states(self):
191 """
192 Updates all states. Called in :py:meth:`tick`.
193
194 :meta private:
195 """
196 # Updates are handles trough the decorators
197 self.detect_driving_state()
198 self.detect_stopped_state()
199 self.detect_reverse_state()
200
201 # --- Traffic Light ---
202
203 def _get_next_traffic_light(self) -> Optional[carla.TrafficLight]:
204 # TODO: Do not use the CDP but use the planned route instead.
205 self.relevant_traffic_light = CarlaDataProvider.get_next_traffic_light(self._vehicle)
206 if self.relevant_traffic_light:
207 self._relevant_traffic_light_location = self.relevant_traffic_light.get_location()
208 self.relevant_traffic_light_distance = self._relevant_traffic_light_location.distance(
209 CarlaDataProvider.get_location(self._vehicle)
210 ) # pyright: ignore[reportArgumentType]
211 else:
212 # Is at an intersection; always check for tlight or distance
213 self._relevant_traffic_light_location = None # type: ignore[assignment]
214 self.relevant_traffic_light_distance = float("inf")
215 return self.relevant_traffic_light
216
[docs]
217 def detect_next_traffic_light(self):
218 """
219 Set the :py:attr:`relevant_traffic_light` and :py:attr:`relevant_traffic_light_distance` if not set.
220
221 Note:
222 Does not check for planned path but current route along waypoints, might not be exact.
223
224 **This function is automatically called in :py:meth:`tick`**
225 """
226 if self.relevant_traffic_light_distance < float("inf"):
227 tlight_distance = self._relevant_traffic_light_location.distance(self.live_info.current_location)
228 else:
229 tlight_distance = float("inf")
230
231 # Search for a traffic light if none is given or if the distance to the current one increased
232 # 1% tolerance to prevent permanent updates when far away from a traffic light
233 if not self.relevant_traffic_light or tlight_distance > self.relevant_traffic_light_distance * 1.01:
234 # Update if the distance increased, and we might need to target another one; # TODO: This might be circumvented by passing and intersection
235 if self.relevant_traffic_light and tlight_distance > self.relevant_traffic_light_distance * 1.01:
236 logger.debug(
237 "Traffic light distance increased %s, did slow update.", self.relevant_traffic_light_distance
238 )
239 self._get_next_traffic_light()
240 elif self.relevant_traffic_light:
241 self.relevant_traffic_light_distance = tlight_distance
242
243 # ---- Tick ----
244
[docs]
245 def tick(self):
246 """
247 Tick the information manager and update the information for the corresponding agent.
248 """
249 snapshot = CarlaDataProvider.get_world().get_snapshot()
250 self.global_tick(snapshot.frame)
251
252 # --- Vehicle Information ---
253 self.live_info.last_applied_controls = self._vehicle.get_control()
254
255 # - Speed -
256 # NOTE: get_velocity does not take the z axis into account.
257 self.live_info.current_speed_limit = self._vehicle.get_speed_limit()
258
259 self.live_info.velocity_vector = self._vehicle.get_velocity()
260
261 self._vehicle_speed = CarlaDataProvider.get_velocity(self._vehicle) # used for AgentState Checks
262 self.live_info.current_speed = self._vehicle_speed * 3.6 # km/h
263
264 # - Location -
265 # NOTE: That transform.location and location are similar but not identical.
266 self.live_info.current_transform = CarlaDataProvider.get_transform(self._vehicle) # pyright: ignore[reportAttributeAccessIssue]
267 self.live_info.current_location = current_loc = CarlaDataProvider.get_location(
268 self._vehicle
269 ) # NOTE: is None if past run not cleaned # noqa: E501 # type: ignore
270 # Only exact waypoint. TODO: update in agent
271 # Comment should be visible in traceback.
272 current_waypoint = cast(
273 "carla.Waypoint", CarlaDataProvider.get_map().get_waypoint(current_loc)
274 ) # NOTE: Might throw error if past run was not cleaned; or the world did not tick yet. # noqa: E501 # pyright: ignore[reportCallIssue, reportArgumentType]
275
276 # Traffic Light
277 # NOTE: Must be AFTER the location update
278 self.detect_next_traffic_light()
279 self.live_info.next_traffic_light = self.relevant_traffic_light
280 self.live_info.next_traffic_light_distance = self.relevant_traffic_light_distance
281
282 # Nearby actors
283 self.distances: Dict[carla.Actor, float] = {}
284
285 @cached(cache=self.distances)
286 def dist(v: carla.Actor):
287 if not v.is_alive:
288 logger.warning("Actor is not alive - this should not happen.")
289 return v_filter_dist # filter out
290 return v.get_location().distance(current_loc) # pyright: ignore[reportArgumentType]
291
292 # Filter nearby
293 # Vehicles
294 v_filter_dist = self._agent.config.obstacles.nearby_vehicles_max_distance
295 self.vehicles_nearby: List[carla.Vehicle] = []
296 for v in self.vehicles:
297 if v.id != self._vehicle.id and dist(v) < v_filter_dist:
298 self.vehicles_nearby.append(v)
299 self.vehicles_nearby = sorted(self.vehicles_nearby, key=dist)
300
301 # Static obstacles
302 self.static_obstacles_nearby: list[carla.Actor] = []
303 for o in self.static_obstacles:
304 if dist(o) < v_filter_dist:
305 self.static_obstacles_nearby.append(o)
306 self.static_obstacles_nearby = sorted(self.static_obstacles_nearby, key=dist)
307
308 # Walkers
309 v_filter_dist = (
310 self._agent.config.obstacles.nearby_walkers_max_distance
311 ) # in case of a different distance for walkers.
312 self.walkers_nearby: list[carla.Walker] = []
313 for w in self.walkers:
314 if dist(w) < v_filter_dist:
315 self.walkers_nearby.append(w)
316
317 self.walkers_nearby = sorted(self.walkers_nearby, key=dist)
318 # All actors to be tracked
319 self.obstacles_nearby = self.walkers_nearby + self.static_obstacles_nearby + self.vehicles_nearby
320 self.obstacles_nearby = sorted(self.obstacles_nearby, key=dist)
321
322 # Nearby Traffic lights
323 # By default this checks for 5 seconds range + 10 m
324 self.traffic_lights_nearby = [
325 tl
326 for tl in InformationManager.get_traffic_lights()
327 if dist(tl) < self._agent.config.obstacles.nearby_tlights_max_distance
328 ]
329 self.traffic_lights_nearby = sorted(self.traffic_lights_nearby, key=dist)
330
331 self.check_states()
332
333 # ----- Return Summary -----
334
335 # TODO: Extend distances with carla lights
336 self.gathered_information = InformationManager.Information(
337 current_waypoint=current_waypoint,
338 current_speed=self.live_info.current_speed,
339 current_states=self.state_counter,
340 relevant_traffic_light=self.relevant_traffic_light,
341 relevant_traffic_light_distance=self.relevant_traffic_light_distance,
342 vehicles=self.vehicles,
343 walkers=self.walkers,
344 static_obstacles=self.static_obstacles,
345 obstacles=self.obstacles,
346 walkers_nearby=self.walkers_nearby,
347 vehicles_nearby=self.vehicles_nearby,
348 static_obstacles_nearby=self.static_obstacles_nearby,
349 obstacles_nearby=self.obstacles_nearby,
350 traffic_lights_nearby=self.traffic_lights_nearby,
351 distances=self.distances,
352 )
353 return self.gathered_information
354
355 # Helper subclass
356
[docs]
357 class Information(NamedTuple):
358 """Data gathered by the InformationManager which is passed to the agent."""
359
360 current_waypoint: carla.Waypoint
361 current_speed: float
362 current_states: Dict[AgentState, int]
363
364 relevant_traffic_light: Optional[carla.TrafficLight]
365 relevant_traffic_light_distance: float
366
367 vehicles: List[carla.Vehicle]
368 walkers: List[carla.Walker]
369 static_obstacles: List[carla.Actor]
370 """Filtered obstacles by InformationManager.OBSTACLE_FILTER"""
371 obstacles: List[carla.Actor]
372 """Union of vehicles, walkers and static_obstacles"""
373
374 walkers_nearby: List[carla.Walker]
375 vehicles_nearby: List[carla.Vehicle]
376 static_obstacles_nearby: List[carla.Actor]
377 obstacles_nearby: List[carla.Actor]
378
379 traffic_lights_nearby: List[carla.TrafficLight]
380
381 distances: Dict[carla.Actor, float]
382 """Distances to all actors in :py:attr:`obstacles`"""
383
384 # ---- Global Information ----
385
386 OBSTACLE_FILTER: str = "static.prop.[cistmw]*"
387 """
388 fnmatch for obstacles that the agent will consider in its path.
389 https://carla.readthedocs.io/en/latest/bp_library/#static
390 """
391
[docs]
392 @staticmethod
393 def get_traffic_lights() -> Dict[carla.TrafficLight, carla.Transform]:
394 return CarlaDataProvider._traffic_light_map
395
[docs]
396 @staticmethod
397 def get_trafficlight_trigger_waypoint(traffic_light: "carla.TrafficLight") -> carla.Waypoint:
398 """
399 Get the location where the traffic light is triggered.
400 """
401 if traffic_light.id in InformationManager.lights_map:
402 return InformationManager.lights_map[traffic_light.id]
403 trigger_location = CarlaDataProvider.get_trafficlight_trigger_location(traffic_light)
404 trigger_wp = CarlaDataProvider.get_map().get_waypoint(trigger_location)
405 InformationManager.lights_map[traffic_light.id] = trigger_wp
406 return trigger_wp
407
[docs]
408 @staticmethod
409 def global_tick(frame: Optional[int] = None) -> None:
410 """
411 Update global information that is constant for the current tick and not agent specific.
412
413 Updates:
414 - :py:attr:`vehicles`
415 - :py:attr:`walkers`
416 - :py:attr:`static_obstacles`
417 - :py:attr:`obstacles`
418 - :py:attr:`frame`
419
420 Parameters:
421 frame: The id of the current frame. If None retrieves the id from the current
422 :py:class:`carla.WorldSnapshot`. Multiple calls with the same frame are ignored.
423 (default: None)
424 """
425 # Assure to call this only once
426 if frame is None:
427 frame = CarlaDataProvider.get_world().get_snapshot().frame
428 else:
429 # DEBUG; TEMP
430 snap_frame = CarlaDataProvider.get_world().get_snapshot().frame
431 if frame != snap_frame:
432 logger.warning(f"Frame {frame} does not match snapshot frame {snap_frame}")
433 if frame == InformationManager.frame:
434 return
435 InformationManager.frame = frame
436
437 # Filter vehicles
438 InformationManager.vehicles = []
439 InformationManager.walkers = []
440 InformationManager.static_obstacles = []
441 InformationManager._other_actors: List[carla.Actor] = []
442 # For traffic lights use: InformationManager.get_traffic_lights(), which is map-constant
443
444 # Use copy and check for None because of updates could be done by threads in parallel
445 for actor_id in CarlaDataProvider._carla_actor_pool.copy():
446 try:
447 actor = CarlaDataProvider._carla_actor_pool[actor_id] # might be deleted in parallel
448 if actor is None or not actor.is_alive: # pyright: ignore[reportUnnecessaryComparison]
449 logger.debug("Detected dead actor in the pool. %s", (actor.id, actor.type_id, actor.attributes)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
450 del CarlaDataProvider._carla_actor_pool[actor_id]
451 continue
452 except KeyError:
453 # actor was deleted in parallel
454 print("KeyError", actor_id)
455 continue
456 if fnmatch(actor.type_id, "vehicle*"):
457 InformationManager.vehicles.append(actor) # pyright: ignore[reportArgumentType]]
458 elif fnmatch(actor.type_id, "walker.pedestrian*"):
459 InformationManager.walkers.append(actor) # pyright: ignore[reportArgumentType]
460 # TODO: we could assume that these actors are mostly constant and only created in slow intervals
461 elif fnmatch(actor.type_id, InformationManager.OBSTACLE_FILTER):
462 InformationManager.static_obstacles.append(actor)
463 else:
464 InformationManager._other_actors.append(actor)
465
466 InformationManager.obstacles = (
467 InformationManager.walkers + InformationManager.static_obstacles + InformationManager.vehicles
468 )
469
[docs]
470 @staticmethod
471 def get_vehicles() -> List[carla.Vehicle]:
472 return InformationManager.vehicles
473
[docs]
474 @staticmethod
475 def get_walkers() -> List[carla.Walker]:
476 return InformationManager.walkers
477
[docs]
478 @staticmethod
479 def cleanup() -> None:
480 """Resets the global information."""
481 InformationManager.vehicles.clear()
482 InformationManager.walkers.clear()
483 InformationManager.static_obstacles.clear()
484 InformationManager.obstacles.clear()
485 InformationManager._other_actors.clear()
486 InformationManager.frame = None
487 InformationManager._tick = 0