Source code for classes.information_manager
1"""
2Aim of this module is to provide a less convoluted access to information,
3i.e. distill the information from the data and return high level information
4"""
5
6# pyright: reportPrivateUsage=none
7
8from __future__ import annotations
9
10# todo: maybe find another name for this module
11from fnmatch import fnmatch
12from functools import wraps
13from typing import TYPE_CHECKING, Callable, ClassVar, Dict, List, NamedTuple, Optional, TypeVar, Union, cast
14
15import carla
16from cachetools import cached
17from typing_extensions import Concatenate, ParamSpec, Self
18
19from agents.tools.logs import logger
20from classes.constants import AgentState
21from launch_tools import CarlaDataProvider
22
23if TYPE_CHECKING:
24 from agents.lunatic_agent import LunaticAgent
25
26_T = TypeVar("_T")
27_P = ParamSpec("_P")
28
29DRIVING_SPEED_THRESHOLD = 0.05 # m/s >= check
30STOPPED_SPEED_THRESHOLD = 0.05 # m/s < check
31
32
[docs]
33class InformationManager:
34 """
35 Tracks global information, e.g. all actors, traffic lights, etc. as well as
36 agent specific information, e.g. current speed, current location, and nearby actors in
37 relation to the agent.
38
39 Tip:
40 global information is attributed by :py:obj:`ClassVar` and :py:obj:`staticmethod` and is
41 shared across all instances, this information is constant for the current tick.
42 """
43
44 _tick: ClassVar[int] = 0
45 """Current tick to not double class :py:meth:`global_tick`"""
46
47 # Instance Variables
48 relevant_traffic_light: Union[carla.TrafficLight, None] = None
49 """Result of :py:meth:`.CarlaDataProvider.get_next_traffic_light`"""
50
51 relevant_traffic_light_distance: float = float('inf')
52 """
53 Distance to the :py:attr:`relevant_traffic_light`
54 Is float('inf') if :py:attr:`relevant_traffic_light` is None.
55 """
56
57 _relevant_traffic_light_location: carla.Location = None # type: ignore[assignment]
58 """
59 Note:
60 Is None if :py:attr:`relevant_traffic_light` is :code:`None`
61 or :py:attr:`relevant_traffic_light_distance` is :python:`float('inf')`
62 """
63
64 state_counter: Dict[AgentState, int]
65 """
66 Tracks different :py:class:`AgentState` and the amount of ticks the agent is in this state.
67 """
68
69 _vehicle_speed: float # m/s
70
71 gathered_information: "InformationManager.Information"
72 """:py:class:`.InformationManager.Information` gathered during :py:meth:`tick`"""
73
74 # Class & Global Variables
75 vehicles: ClassVar["list[carla.Vehicle]"]
76 """List of all tracked vehicles"""
77
78 walkers: ClassVar["list[carla.Walker]"]
79 """List of all tracked pedestrians"""
80
81 static_obstacles: ClassVar["list[carla.Actor]"]
82 """List of all tracked static obstacles"""
83
84 obstacles: ClassVar["list[carla.Actor]"]
85 """Union of :py:attr:`vehicles`, py:attr:`walkers` and py:attr:`static_obstacles`"""
86
87 lights_map: ClassVar["Dict[int, carla.Waypoint]"] = {}
88 """Map of traffic lights to their trigger waypoints"""
89
90 frame: ClassVar["int | None"] = None
91 """
92 Last frame the InformationManager was updated.
93
94 The current frame of the world should be passed to the global_tick method.
95 """
96
97 # ---- Agent Specific Information ----
98
[docs]
99 def __init__(self, agent: "LunaticAgent", update_information: bool = True):
100 self._agent = agent
101 self.live_info = agent.live_info
102
103 self._vehicle = agent._vehicle # maybe use a property # pyright: ignore[reportPrivateUsage]
104
105 # Share the dict
106 if getattr(agent, "current_states", None) is not None:
107 self.state_counter = agent.current_states
108 else:
109 self.state_counter = agent.current_states = dict.fromkeys(AgentState, 0)
110 self._states_checked = dict.fromkeys(AgentState, False)
111 if update_information:
112 self.tick()
113
114 # AgentState detection
115
116 #@staticmethod # not possible in python3.7
117 def _check_state(state: AgentState): # pyright: ignore[reportSelfClsParameterName, reportGeneralTypeIssues]
118 """
119 Updates the state counter and the state checked dict when the function is called.
120 """
121 def wrapper(func: Callable[Concatenate[Self, _P], _T]) -> Callable[Concatenate[Self, _P], _T]: # -> _Wrapped[Callable[Concatenate[Self, _P], Any], bool | Non...:
122 @wraps(func)
123 def inner(self: Self, *args: _P.args, **kwargs: _P.kwargs):
124 result = func(self, *args, **kwargs)
125 if result is not None:
126 if result:
127 self.state_counter[state] += 1
128 else:
129 self.state_counter[state] = 0
130 self._states_checked[state] = True
131 return result
132 return inner
133 return wrapper
134
135 @_check_state(AgentState.DRIVING)
136 def detect_driving_state(self) -> bool:
137 """
138 Increased the :py:attr:`AgentState.DRIVING` counter if the vehicle is driving.
139
140 :meta private:
141 """
142 return self._vehicle_speed >= DRIVING_SPEED_THRESHOLD
143
144 @_check_state(AgentState.STOPPED)
145 def detect_stopped_state(self) -> bool:
146 """
147 Increased the :py:attr:`AgentState.STOPPED` counter if the vehicle is stopped.
148
149 :meta private:
150 """
151 return self._vehicle_speed < STOPPED_SPEED_THRESHOLD
152
153 @_check_state(AgentState.REVERSE)
154 def detect_reverse_state(self) -> bool:
155 """
156 Determines if in the last tick the VehicleControl.reverse flag was set.
157
158 Increases the :py:attr:`AgentState.REVERSE` counter if the vehicle is driving in reverse.
159
160 :meta private:
161 """
162 # TODO: can this be detected differently e.g. through the vehicle
163 return self.live_info.last_applied_controls.reverse
164
165 # Not implemented checks
166
167 @_check_state(AgentState.AGAINST_LANE_DIRECTION)
168 def detect_driving_against_lane_direction(self):
169 """
170 :meta private:
171 """
172 self._agent._current_waypoint.lane_id # positive or negative # noqa # pyright: ignore[reportPrivateUsage]
173 # TODO: How detect if the heading is against this direction?
174 # Need to also account for reverse state.
175 # ... NotImplemented
176
177 @_check_state(AgentState.OVERTAKING)
178 def detect_overtaking_state(self):
179 """
180 :meta private:
181 """
182 #... NotImplemented
183 # # Can probably not be done easily, and must be done from outside
184
185 def check_states(self):
186 """
187 Updates all states. Called in :py:meth:`tick`.
188
189 :meta private:
190 """
191 # Updates are handles trough the decorators
192 self.detect_driving_state()
193 self.detect_stopped_state()
194 self.detect_reverse_state()
195
196 # --- Traffic Light ---
197
198 def _get_next_traffic_light(self) -> Optional[carla.TrafficLight]:
199 # TODO: Do not use the CDP but use the planned route instead.
200 self.relevant_traffic_light = CarlaDataProvider.get_next_traffic_light(self._vehicle)
201 if self.relevant_traffic_light:
202 self._relevant_traffic_light_location = self.relevant_traffic_light.get_location()
203 self.relevant_traffic_light_distance = self._relevant_traffic_light_location.distance(
204 CarlaDataProvider.get_location(self._vehicle)) # pyright: ignore[reportArgumentType]
205 else:
206 # Is at an intersection; always check for tlight or distance
207 self._relevant_traffic_light_location = None # type: ignore[assignment]
208 self.relevant_traffic_light_distance = float('inf')
209 return self.relevant_traffic_light
210
[docs]
211 def detect_next_traffic_light(self):
212 """
213 Set the :py:attr:`relevant_traffic_light` and :py:attr:`relevant_traffic_light_distance` if not set.
214
215 Note:
216 Does not check for planned path but current route along waypoints, might not be exact.
217
218 **This function is automatically called in :py:meth:`tick`**
219 """
220 if self.relevant_traffic_light_distance < float('inf'):
221 tlight_distance = self._relevant_traffic_light_location.distance(self.live_info.current_location)
222 else:
223 tlight_distance = float('inf')
224
225 # Search for a traffic light if none is given or if the distance to the current one increased
226 # 1% tolerance to prevent permanent updates when far away from a traffic light
227 if not self.relevant_traffic_light or tlight_distance > self.relevant_traffic_light_distance * 1.01:
228 # Update if the distance increased, and we might need to target another one; # TODO: This might be circumvented by passing and intersection
229 if self.relevant_traffic_light and tlight_distance > self.relevant_traffic_light_distance * 1.01:
230 logger.debug("Traffic light distance increased %s, did slow update.", self.relevant_traffic_light_distance)
231 self._get_next_traffic_light()
232 elif self.relevant_traffic_light:
233 self.relevant_traffic_light_distance = tlight_distance
234
235 # ---- Tick ----
236
[docs]
237 def tick(self):
238 """
239 Tick the information manager and update the information for the corresponding agent.
240 """
241 snapshot = CarlaDataProvider.get_world().get_snapshot()
242 self.global_tick(snapshot.frame)
243
244 # --- Vehicle Information ---
245 self.live_info.last_applied_controls = self._vehicle.get_control()
246
247 # - Speed -
248 # NOTE: get_velocity does not take the z axis into account.
249 self.live_info.current_speed_limit = self._vehicle.get_speed_limit()
250
251 self.live_info.velocity_vector = self._vehicle.get_velocity()
252
253 self._vehicle_speed = CarlaDataProvider.get_velocity(self._vehicle) # used for AgentState Checks
254 self.live_info.current_speed = self._vehicle_speed * 3.6 # km/h
255
256 # - Location -
257 # NOTE: That transform.location and location are similar but not identical.
258 self.live_info.current_transform = CarlaDataProvider.get_transform(self._vehicle) # pyright: ignore[reportAttributeAccessIssue]
259 self.live_info.current_location = _current_loc = CarlaDataProvider.get_location(self._vehicle) # NOTE: is None if past run not cleaned # noqa: E501 # type: ignore
260 # Only exact waypoint. TODO: update in agent
261 # Comment should be visible in traceback.
262 current_waypoint = cast(carla.Waypoint, CarlaDataProvider.get_map().get_waypoint(_current_loc)) # NOTE: Might throw error if past run was not cleaned; or the world did not tick yet. # noqa: E501 # pyright: ignore[reportCallIssue, reportArgumentType]
263
264 # Traffic Light
265 # NOTE: Must be AFTER the location update
266 self.detect_next_traffic_light()
267 self.live_info.next_traffic_light = self.relevant_traffic_light
268 self.live_info.next_traffic_light_distance = self.relevant_traffic_light_distance
269
270 # Nearby actors
271 self.distances: Dict[carla.Actor, float] = {}
272
273 @cached(cache=self.distances)
274 def dist(v: carla.Actor):
275 if not v.is_alive:
276 logger.warning("Actor is not alive - this should not happen.")
277 return _v_filter_dist # filter out
278 return v.get_location().distance(_current_loc) # pyright: ignore[reportArgumentType]
279
280 # Filter nearby
281 # Vehicles
282 _v_filter_dist = self._agent.config.obstacles.nearby_vehicles_max_distance
283 self.vehicles_nearby: List[carla.Vehicle] = []
284 for v in self.vehicles:
285 if v.id != self._vehicle.id and dist(v) < _v_filter_dist:
286 self.vehicles_nearby.append(v)
287 self.vehicles_nearby = sorted(self.vehicles_nearby, key=dist)
288
289 # Static obstacles
290 self.static_obstacles_nearby: list[carla.Actor] = []
291 for o in self.static_obstacles:
292 if dist(o) < _v_filter_dist:
293 self.static_obstacles_nearby.append(o)
294 self.static_obstacles_nearby = sorted(self.static_obstacles_nearby, key=dist)
295
296 # Walkers
297 _v_filter_dist = self._agent.config.obstacles.nearby_walkers_max_distance # in case of a different distance for walkers.
298 self.walkers_nearby: list[carla.Walker] = []
299 for w in self.walkers:
300 if dist(w) < _v_filter_dist:
301 self.walkers_nearby.append(w)
302
303 self.walkers_nearby = sorted(self.walkers_nearby, key=dist)
304 # All actors to be tracked
305 self.obstacles_nearby = self.walkers_nearby + self.static_obstacles_nearby + self.vehicles_nearby
306 self.obstacles_nearby = sorted(self.obstacles_nearby, key=dist)
307
308 # Nearby Traffic lights
309 # By default this checks for 5 seconds range + 10 m
310 self.traffic_lights_nearby = [tl for tl in InformationManager.get_traffic_lights()
311 if dist(tl) < self._agent.config.obstacles.nearby_tlights_max_distance]
312 self.traffic_lights_nearby = sorted(self.traffic_lights_nearby, key=dist)
313
314 self.check_states()
315
316 # ----- Return Summary -----
317
318 # TODO: Extend distances with carla lights
319 self.gathered_information = InformationManager.Information(
320 current_waypoint=current_waypoint,
321 current_speed=self.live_info.current_speed,
322 current_states=self.state_counter,
323
324 relevant_traffic_light=self.relevant_traffic_light,
325 relevant_traffic_light_distance=self.relevant_traffic_light_distance,
326
327 vehicles=self.vehicles,
328 walkers=self.walkers,
329 static_obstacles=self.static_obstacles,
330 obstacles=self.obstacles,
331
332 walkers_nearby=self.walkers_nearby,
333 vehicles_nearby=self.vehicles_nearby,
334 static_obstacles_nearby=self.static_obstacles_nearby,
335 obstacles_nearby=self.obstacles_nearby,
336 traffic_lights_nearby=self.traffic_lights_nearby,
337
338 distances=self.distances,
339 )
340 return self.gathered_information
341
342 # Helper subclass
343
[docs]
344 class Information(NamedTuple):
345 """Data gathered by the InformationManager which is passed to the agent."""
346
347 current_waypoint: carla.Waypoint
348 current_speed: float
349 current_states: Dict[AgentState, int]
350
351 relevant_traffic_light: Optional[carla.TrafficLight]
352 relevant_traffic_light_distance: float
353
354 vehicles: List[carla.Vehicle]
355 walkers: List[carla.Walker]
356 static_obstacles: List[carla.Actor]
357 """Filtered obstacles by InformationManager.OBSTACLE_FILTER"""
358 obstacles: List[carla.Actor]
359 """Union of vehicles, walkers and static_obstacles"""
360
361 walkers_nearby: List[carla.Walker]
362 vehicles_nearby: List[carla.Vehicle]
363 static_obstacles_nearby: List[carla.Actor]
364 obstacles_nearby: List[carla.Actor]
365
366 traffic_lights_nearby: List[carla.TrafficLight]
367
368 distances: Dict[carla.Actor, float]
369 """Distances to all actors in :py:attr:`obstacles`"""
370
371 # ---- Global Information ----
372
373 OBSTACLE_FILTER: str = "static.prop.[cistmw]*"
374 """
375 fnmatch for obstacles that the agent will consider in its path.
376 https://carla.readthedocs.io/en/latest/bp_library/#static
377 """
378
[docs]
379 @staticmethod
380 def get_traffic_lights() -> Dict[carla.TrafficLight, carla.Transform]:
381 return CarlaDataProvider._traffic_light_map
382
[docs]
383 @staticmethod
384 def get_trafficlight_trigger_waypoint(traffic_light: "carla.TrafficLight") -> carla.Waypoint:
385 """
386 Get the location where the traffic light is triggered.
387 """
388 if traffic_light.id in InformationManager.lights_map:
389 return InformationManager.lights_map[traffic_light.id]
390 trigger_location = CarlaDataProvider.get_trafficlight_trigger_location(traffic_light)
391 trigger_wp = CarlaDataProvider.get_map().get_waypoint(trigger_location)
392 InformationManager.lights_map[traffic_light.id] = trigger_wp
393 return trigger_wp
394
[docs]
395 @staticmethod
396 def global_tick(frame: Optional[int] = None) -> None:
397 """
398 Update global information that is constant for the current tick and not agent specific.
399
400 Updates:
401 - :py:attr:`vehicles`
402 - :py:attr:`walkers`
403 - :py:attr:`static_obstacles`
404 - :py:attr:`obstacles`
405 - :py:attr:`frame`
406
407 Parameters:
408 frame: The id of the current frame. If None retrieves the id from the current
409 :py:class:`carla.WorldSnapshot`. Multiple calls with the same frame are ignored.
410 (default: None)
411 """
412 # Assure to call this only once
413 if frame is None:
414 frame = CarlaDataProvider.get_world().get_snapshot().frame
415 else:
416 # DEBUG; TEMP
417 snap_frame = CarlaDataProvider.get_world().get_snapshot().frame
418 if frame != snap_frame:
419 logger.warning(f"Frame {frame} does not match snapshot frame {snap_frame}")
420 if frame == InformationManager.frame:
421 return
422 InformationManager.frame = frame
423
424 # Filter vehicles
425 InformationManager.vehicles = []
426 InformationManager.walkers = []
427 InformationManager.static_obstacles = []
428 InformationManager._other_actors: List[carla.Actor] = []
429 # For traffic lights use: InformationManager.get_traffic_lights(), which is map-constant
430
431 # Use copy and check for None because of updates could be done by threads in parallel
432 for actor_id in CarlaDataProvider._carla_actor_pool.copy():
433 try:
434 actor = CarlaDataProvider._carla_actor_pool[actor_id] # might be deleted in parallel
435 if actor is None or not actor.is_alive: # pyright: ignore[reportUnnecessaryComparison]
436 logger.debug("Detected dead actor in the pool. %s", (actor.id,
437 actor.type_id,
438 actor.attributes)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
439 del CarlaDataProvider._carla_actor_pool[actor_id]
440 continue
441 except KeyError:
442 # actor was deleted in parallel
443 print("KeyError", actor_id)
444 continue
445 if fnmatch(actor.type_id, "vehicle*"):
446 InformationManager.vehicles.append(actor) # pyright: ignore[reportArgumentType]]
447 elif fnmatch(actor.type_id, "walker.pedestrian*"):
448 InformationManager.walkers.append(actor) # pyright: ignore[reportArgumentType]
449 # TODO: we could assume that these actors are mostly constant and only created in slow intervals
450 elif fnmatch(actor.type_id, InformationManager.OBSTACLE_FILTER):
451 InformationManager.static_obstacles.append(actor)
452 else:
453 InformationManager._other_actors.append(actor)
454
455 InformationManager.obstacles = InformationManager.walkers + InformationManager.static_obstacles + InformationManager.vehicles
456
[docs]
457 @staticmethod
458 def get_vehicles() -> List[carla.Vehicle]:
459 return InformationManager.vehicles
460
[docs]
461 @staticmethod
462 def get_walkers() -> List[carla.Walker]:
463 return InformationManager.walkers
464
[docs]
465 @staticmethod
466 def cleanup() -> None:
467 """Resets the global information."""
468 InformationManager.vehicles.clear()
469 InformationManager.walkers.clear()
470 InformationManager.static_obstacles.clear()
471 InformationManager.obstacles.clear()
472 InformationManager._other_actors.clear()
473 InformationManager.frame = None
474 InformationManager._tick = 0