Source code for classes.detection_matrix

  1from __future__ import annotations
  2
  3import contextlib
  4import threading
  5import time
  6from agents.tools.logs import logger
  7from classes._data_gathering.car_detection_matrix.informationUtils import (
  8    RoadLaneId,
  9    check_ego_on_highway,
 10    create_city_matrix,
 11    detect_surrounding_cars,
 12    get_all_road_lane_ids,
 13)
 14
 15from launch_tools import CarlaDataProvider
 16
 17import matplotlib  # noqa: ICN001
 18
 19matplotlib.use("Agg")
 20import matplotlib.backends.backend_agg as agg
 21import matplotlib.pyplot as plt
 22import numpy as np
 23import pygame
 24
 25import signal
 26from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
 27from typing_extensions import TypedDict
 28
 29if TYPE_CHECKING:
 30    import carla
 31    from _data_gathering.car_detection_matrix.informationUtils import HighWayShape
 32    from matplotlib.axes import Axes as MplAxes
 33
 34
[docs] 35def matrix_for_actor( 36 ego_vehicle: carla.Actor, 37 road_lane_ids: "set[RoadLaneId]", 38 radius: float = 100.0, 39 highway_shape: Optional["HighWayShape"] = None, 40): 41 """ 42 Calculates the detection matrix for the given actor. 43 44 Parameters: 45 ego_vehicle: The ego vehicle 46 highway_shape (tuple): Tuple containing highway_type, number of straight highway lanes, entry waypoint tuple and/ exit waypoint tuple. 47 Format: (highway_type: string, straight_lanes: int, entry_wps: ([wp,..], [wp,..]), exit_wps: ([wp,..], [wp,..])) 48 49 Note: 50 :py:class:`.CarlaDataProvider` needs to be set up before calling this function. 51 """ 52 world = CarlaDataProvider.get_world() 53 world_map = CarlaDataProvider.get_map() 54 ego_location = ego_vehicle.get_location() 55 # ego_waypoint = world_map.get_waypoint(ego_location) 56 ego_on_highway = check_ego_on_highway(ego_location, road_lane_ids, world_map) 57 58 # current_lanes = [rl_id[1] for rl_id in road_lane_ids if rl_id[0] == ego_waypoint.road_id] 59 60 # Normal Road; TODO: Check if this is useful 61 # if ego_on_highway: 62 # street_type = StreetType.ON_HIGHWAY 63 # else: 64 # street_type = StreetType.NON_HIGHWAY_STREET 65 66 # NOTE: in rare unsupported cases, the function will return None 67 matrix = create_city_matrix(ego_location, road_lane_ids, world_map) 68 69 if matrix: 70 matrix, _ = detect_surrounding_cars( 71 ego_location, ego_vehicle, matrix, road_lane_ids, world, radius, ego_on_highway, highway_shape 72 ) 73 else: 74 return None 75 # Removes the information about "left_outer_lane" by replacing it with numeric values. 76 # TODO: Should possibly revert this to be more compatible with source and differentiate cases! 77 # new_matrix = dict(enumerate(matrix.values())) 78 # matrix = new_matrix 79 # return matrix 80 return dict(enumerate(matrix.values()))
81 82
[docs] 83class DetectionMatrix: 84 """ 85 Automatically create a matrix representing the lanes around the ego vehicle 86 on each :py:meth:`update` call. 87 """ 88 89 matrix: Dict[int, List[int]] 90 """ 91 A :py:class:`collections.OrderedDict`: An ordered dictionary representing the city matrix. The keys for existing lanes are the lane IDs in the format "road_id_lane_id". 92 For non-existing lanes different placeholder exist, e.g. left_outer_lane, left_inner_lane, No_4th_lane, No_opposing_direction 93 The values indicate whether a vehicle is present: 0 - No vehicle, 1 - Ego vehicle, 3 - No road. 94 Format example: 95 96 .. code-block:: python 97 98 { 99 "left_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3], 100 "left_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3], 101 "1_2": [0, 0, 0, 0, 0, 0, 0, 0], 102 "1_1": [0, 0, 0, 0, 0, 0, 0, 0], 103 "1_-1": [0, 0, 0, 0, 0, 0, 0, 0], 104 "1_-2": [0, 0, 0, 0, 0, 0, 0, 0], 105 "right_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3], 106 "right_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3], 107 } 108 109 Attention: 110 - Currently the keys are replaces by numbers. 111 - In case of an unsupported layout or before the very first update 112 the matrix can be :code:`None`. Be aware of this when using :py:class:`AsyncDetectionMatrix`. 113 """ 114
[docs] 115 def __init__( 116 self, 117 ego_vehicle: carla.Actor, 118 road_lane_ids: Optional[Set[RoadLaneId]] = None, 119 radius: float = 100.0, 120 ): 121 self._ego_vehicle = ego_vehicle 122 self.running = True 123 """If the matrix will perform updates.""" 124 self._sync = True 125 self._road_lane_ids = road_lane_ids or get_all_road_lane_ids(CarlaDataProvider._map) 126 """A set containing unique road and lane identifiers in the format "roadId_laneId".""" 127 self.matrix = None # type: ignore[assignment] 128 self._add_signal_handler() 129 self._radius = radius
130 131 def _calculate_update(self): 132 return matrix_for_actor(self._ego_vehicle, self._road_lane_ids, radius=self._radius) 133
[docs] 134 def update(self) -> "Dict[int, List[int]] | None": 135 """ 136 If the matrix is :py:attr:`running`, it will update the matrix and return it, 137 otherwise returns :python:`None`. 138 """ 139 if self.running: 140 self.matrix = self._calculate_update() 141 return self.matrix 142 return None
143
[docs] 144 def getMatrix(self) -> Dict[int, List[int]]: 145 return self.matrix
146
[docs] 147 def to_list(self) -> "list[list[int]] | None": 148 """ 149 Returns the values of :py:attr:`matrix` as a list. 150 """ 151 if self.matrix is None: 152 return None 153 return list(self.matrix.values())
154
[docs] 155 def to_numpy(self) -> "np.ndarray[int, Any] | None": 156 """ 157 Returns the values of :py:attr:`matrix` as a numpy array. 158 """ 159 if self.matrix is None: 160 return None 161 return np.array(self.to_list())
162 163 if TYPE_CHECKING: 164 165 class RenderOptions(TypedDict, total=False, closed=True): 166 """Signature for :py:meth:`.DetectionMatrix.render`.""" 167 168 imshow_settings: dict[str, Any] 169 vertical: bool 170 draw_values: bool 171 text_settings: dict[str, Any] 172 draw: bool 173
[docs] 174 def render( 175 self, 176 display: pygame.Surface, 177 imshow_settings: dict[str, Any] = {"cmap": "jet"}, # noqa: B006 178 vertical: bool = True, 179 draw_values: bool = True, 180 text_settings: dict[str, Any] = {"color": "orange"}, # noqa: B006 181 *, 182 draw: bool = True, 183 ) -> None: 184 """ 185 Renders the matrix on the given **surface** using :py:mod:`matplotlib`. 186 187 Parameters: 188 display: The surface to render the matrix on. 189 imshow_settings: The settings for :py:meth:`matplotlib.pyplot.imshow`. 190 Defaults to :python:`{'cmap': 'jet'}`. 191 vertical: If the lanes should be displayed vertically. Defaults to :python:`True`. 192 draw_values: If the entries should be displayed as text. Defaults to :python:`True`. 193 text_settings: The settings for :py:meth:`matplotlib.pyplot.text` when **draw_values**. 194 Defaults to :python:`{'color': 'orange'}`. 195 draw: If the matrix should be drawn. If :code:`False`, this function will do nothing. 196 """ 197 if not draw: 198 return 199 matrix = self.to_numpy() # lanes are horizontal, OneLane: left to right, Left Lane at the top. 200 if matrix is None: 201 return 202 ax: MplAxes 203 fig, ax = plt.subplots(figsize=(2, 2), dpi=100) 204 if vertical: 205 matrix = np.rot90(matrix) # 1st/3rd perspective 206 ax.imshow(matrix, **imshow_settings) 207 if draw_values: 208 for (i, j), val in np.ndenumerate(matrix): 209 ax.text(j, i, val, ha="center", va="center", **text_settings) 210 ax.axis("off") 211 fig.tight_layout(pad=0) 212 213 canvas: agg.FigureCanvasAgg = fig.canvas # type: ignore[assignment] 214 canvas.draw() 215 buffer_data: memoryview = canvas.buffer_rgba() 216 217 size = canvas.get_width_height() 218 surf = pygame.image.frombuffer(buffer_data, size, "RGBA") 219 220 display.blit(surf, (220, display.get_height() - surf.get_height() - 40)) 221 plt.close(fig)
222 223 @property 224 def sync(self): 225 """ 226 Weather the matrix is synchronous or not. 227 228 :meta private: 229 """ 230 return self._sync 231
[docs] 232 def start(self): 233 """Allows the matrix to update.""" 234 self.running = True
235
[docs] 236 def stop(self): 237 """Prevents the matrix from updating.""" 238 self.running = False 239 self.matrix = None # prevent rendering # type: ignore[assignment]
240 241 def __del__(self): 242 if self.running: 243 with contextlib.suppress(Exception): 244 self.stop() 245 246 def _signal_handler(self, signum: int, _): 247 """ 248 Signal handler for stopping the simulation, e.g. when pressing Ctrl+C 249 in the terminal. 250 251 Calls :py:meth:`.stop`. 252 253 :meta private: 254 """ 255 from classes.ui.keyboard_controls import RSSKeyboardControl # noqa: PLC0415 # lazy import 256 257 logger.info(f"DetectionMatrix: signal {signum} received. Stopping.") 258 self.stop() 259 # Can only have one signal handler! 260 RSSKeyboardControl._signal_handler(signum, _) 261 262 def _add_signal_handler(self): 263 """ 264 Adds the signal handler for stopping the simulation. 265 266 :meta private: 267 """ 268 signal.signal(signal.SIGINT, self._signal_handler)
269 270
[docs] 271class AsyncDetectionMatrix(DetectionMatrix): 272 """ 273 Asynchronous version of the :py:class:`DetectionMatrix`. 274 275 Will calculate the matrix update in a separate thread. 276 """ 277
[docs] 278 def __init__( 279 self, ego_vehicle: carla.Actor, *, road_lane_ids: Optional[Set[RoadLaneId]] = None, sleep_time: float = 0.1 280 ): 281 """ 282 Parameters: 283 ego_vehicle: The ego vehicle. 284 sleep_time: The time to sleep between updates. Defaults to 0.1 seconds 285 road_lane_ids: The road and lane IDs to consider. If not provided, all will be considered. 286 """ 287 super().__init__(ego_vehicle, road_lane_ids) 288 self._sync = False 289 self.sleep_time = sleep_time 290 self.lock = threading.Lock() 291 self.worker_thread = threading.Thread(target=self._worker, daemon=True)
292 293 # TODO: add signal handler to interrupt the thread faster 294
[docs] 295 def update(self) -> None: 296 """Not available in the async version."""
297 298 def _worker(self) -> None: 299 while self.running: 300 try: 301 new_matrix = self._calculate_update() 302 with self.lock: 303 self.matrix = new_matrix 304 except (RuntimeError, OSError) as e: 305 print(f"Fatal Error in matrix calculation: {e}") 306 raise 307 except Exception as e: 308 print(f"Error in matrix calculation: {e}") 309 if self.sleep_time: 310 time.sleep(self.sleep_time) 311
[docs] 312 def getMatrix(self): 313 with self.lock: 314 return self.matrix
315
[docs] 316 def start(self): 317 self.running = True 318 self.worker_thread.start() # NOTE: This does not allow restart
319
[docs] 320 def stop(self, timeout: float | None = None): 321 """ 322 See Also: 323 :meth:`threading.Thread.join` 324 325 Raises: 326 RuntimeError: if **timeout** is not None and the thread is still alive after the time. 327 """ 328 self.running = False 329 if self.worker_thread.is_alive(): 330 self.worker_thread.join(timeout) 331 else: 332 logger.info("DetectionMatrix.stop called multiple times.") 333 self.matrix = None # prevent rendering # type: ignore[assignment]
334 335 def __del__(self): 336 self.running = False 337 with contextlib.suppress(Exception): 338 self.worker_thread.join(3.0) 339 self.matrix = None # type: ignore[assignment]