1from __future__ import annotations
2
3import contextlib
4import threading
5import time
6from agents.tools.logs import logger
7from classes._data_gathering.car_detection_matrix.informationUtils import (
8 RoadLaneId,
9 check_ego_on_highway,
10 create_city_matrix,
11 detect_surrounding_cars,
12 get_all_road_lane_ids,
13)
14
15from launch_tools import CarlaDataProvider
16
17import matplotlib # noqa: ICN001
18
19matplotlib.use("Agg")
20import matplotlib.backends.backend_agg as agg
21import matplotlib.pyplot as plt
22import numpy as np
23import pygame
24
25import signal
26from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
27from typing_extensions import TypedDict
28
29if TYPE_CHECKING:
30 import carla
31 from _data_gathering.car_detection_matrix.informationUtils import HighWayShape
32 from matplotlib.axes import Axes as MplAxes
33
34
[docs]
35def matrix_for_actor(
36 ego_vehicle: carla.Actor,
37 road_lane_ids: "set[RoadLaneId]",
38 radius: float = 100.0,
39 highway_shape: Optional["HighWayShape"] = None,
40):
41 """
42 Calculates the detection matrix for the given actor.
43
44 Parameters:
45 ego_vehicle: The ego vehicle
46 highway_shape (tuple): Tuple containing highway_type, number of straight highway lanes, entry waypoint tuple and/ exit waypoint tuple.
47 Format: (highway_type: string, straight_lanes: int, entry_wps: ([wp,..], [wp,..]), exit_wps: ([wp,..], [wp,..]))
48
49 Note:
50 :py:class:`.CarlaDataProvider` needs to be set up before calling this function.
51 """
52 world = CarlaDataProvider.get_world()
53 world_map = CarlaDataProvider.get_map()
54 ego_location = ego_vehicle.get_location()
55 # ego_waypoint = world_map.get_waypoint(ego_location)
56 ego_on_highway = check_ego_on_highway(ego_location, road_lane_ids, world_map)
57
58 # current_lanes = [rl_id[1] for rl_id in road_lane_ids if rl_id[0] == ego_waypoint.road_id]
59
60 # Normal Road; TODO: Check if this is useful
61 # if ego_on_highway:
62 # street_type = StreetType.ON_HIGHWAY
63 # else:
64 # street_type = StreetType.NON_HIGHWAY_STREET
65
66 # NOTE: in rare unsupported cases, the function will return None
67 matrix = create_city_matrix(ego_location, road_lane_ids, world_map)
68
69 if matrix:
70 matrix, _ = detect_surrounding_cars(
71 ego_location, ego_vehicle, matrix, road_lane_ids, world, radius, ego_on_highway, highway_shape
72 )
73 else:
74 return None
75 # Removes the information about "left_outer_lane" by replacing it with numeric values.
76 # TODO: Should possibly revert this to be more compatible with source and differentiate cases!
77 # new_matrix = dict(enumerate(matrix.values()))
78 # matrix = new_matrix
79 # return matrix
80 return dict(enumerate(matrix.values()))
81
82
[docs]
83class DetectionMatrix:
84 """
85 Automatically create a matrix representing the lanes around the ego vehicle
86 on each :py:meth:`update` call.
87 """
88
89 matrix: Dict[int, List[int]]
90 """
91 A :py:class:`collections.OrderedDict`: An ordered dictionary representing the city matrix. The keys for existing lanes are the lane IDs in the format "road_id_lane_id".
92 For non-existing lanes different placeholder exist, e.g. left_outer_lane, left_inner_lane, No_4th_lane, No_opposing_direction
93 The values indicate whether a vehicle is present: 0 - No vehicle, 1 - Ego vehicle, 3 - No road.
94 Format example:
95
96 .. code-block:: python
97
98 {
99 "left_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3],
100 "left_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3],
101 "1_2": [0, 0, 0, 0, 0, 0, 0, 0],
102 "1_1": [0, 0, 0, 0, 0, 0, 0, 0],
103 "1_-1": [0, 0, 0, 0, 0, 0, 0, 0],
104 "1_-2": [0, 0, 0, 0, 0, 0, 0, 0],
105 "right_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3],
106 "right_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3],
107 }
108
109 Attention:
110 - Currently the keys are replaces by numbers.
111 - In case of an unsupported layout or before the very first update
112 the matrix can be :code:`None`. Be aware of this when using :py:class:`AsyncDetectionMatrix`.
113 """
114
[docs]
115 def __init__(
116 self,
117 ego_vehicle: carla.Actor,
118 road_lane_ids: Optional[Set[RoadLaneId]] = None,
119 radius: float = 100.0,
120 ):
121 self._ego_vehicle = ego_vehicle
122 self.running = True
123 """If the matrix will perform updates."""
124 self._sync = True
125 self._road_lane_ids = road_lane_ids or get_all_road_lane_ids(CarlaDataProvider._map)
126 """A set containing unique road and lane identifiers in the format "roadId_laneId"."""
127 self.matrix = None # type: ignore[assignment]
128 self._add_signal_handler()
129 self._radius = radius
130
131 def _calculate_update(self):
132 return matrix_for_actor(self._ego_vehicle, self._road_lane_ids, radius=self._radius)
133
[docs]
134 def update(self) -> "Dict[int, List[int]] | None":
135 """
136 If the matrix is :py:attr:`running`, it will update the matrix and return it,
137 otherwise returns :python:`None`.
138 """
139 if self.running:
140 self.matrix = self._calculate_update()
141 return self.matrix
142 return None
143
[docs]
144 def getMatrix(self) -> Dict[int, List[int]]:
145 return self.matrix
146
[docs]
147 def to_list(self) -> "list[list[int]] | None":
148 """
149 Returns the values of :py:attr:`matrix` as a list.
150 """
151 if self.matrix is None:
152 return None
153 return list(self.matrix.values())
154
[docs]
155 def to_numpy(self) -> "np.ndarray[int, Any] | None":
156 """
157 Returns the values of :py:attr:`matrix` as a numpy array.
158 """
159 if self.matrix is None:
160 return None
161 return np.array(self.to_list())
162
163 if TYPE_CHECKING:
164
165 class RenderOptions(TypedDict, total=False, closed=True):
166 """Signature for :py:meth:`.DetectionMatrix.render`."""
167
168 imshow_settings: dict[str, Any]
169 vertical: bool
170 draw_values: bool
171 text_settings: dict[str, Any]
172 draw: bool
173
[docs]
174 def render(
175 self,
176 display: pygame.Surface,
177 imshow_settings: dict[str, Any] = {"cmap": "jet"}, # noqa: B006
178 vertical: bool = True,
179 draw_values: bool = True,
180 text_settings: dict[str, Any] = {"color": "orange"}, # noqa: B006
181 *,
182 draw: bool = True,
183 ) -> None:
184 """
185 Renders the matrix on the given **surface** using :py:mod:`matplotlib`.
186
187 Parameters:
188 display: The surface to render the matrix on.
189 imshow_settings: The settings for :py:meth:`matplotlib.pyplot.imshow`.
190 Defaults to :python:`{'cmap': 'jet'}`.
191 vertical: If the lanes should be displayed vertically. Defaults to :python:`True`.
192 draw_values: If the entries should be displayed as text. Defaults to :python:`True`.
193 text_settings: The settings for :py:meth:`matplotlib.pyplot.text` when **draw_values**.
194 Defaults to :python:`{'color': 'orange'}`.
195 draw: If the matrix should be drawn. If :code:`False`, this function will do nothing.
196 """
197 if not draw:
198 return
199 matrix = self.to_numpy() # lanes are horizontal, OneLane: left to right, Left Lane at the top.
200 if matrix is None:
201 return
202 ax: MplAxes
203 fig, ax = plt.subplots(figsize=(2, 2), dpi=100)
204 if vertical:
205 matrix = np.rot90(matrix) # 1st/3rd perspective
206 ax.imshow(matrix, **imshow_settings)
207 if draw_values:
208 for (i, j), val in np.ndenumerate(matrix):
209 ax.text(j, i, val, ha="center", va="center", **text_settings)
210 ax.axis("off")
211 fig.tight_layout(pad=0)
212
213 canvas: agg.FigureCanvasAgg = fig.canvas # type: ignore[assignment]
214 canvas.draw()
215 buffer_data: memoryview = canvas.buffer_rgba()
216
217 size = canvas.get_width_height()
218 surf = pygame.image.frombuffer(buffer_data, size, "RGBA")
219
220 display.blit(surf, (220, display.get_height() - surf.get_height() - 40))
221 plt.close(fig)
222
223 @property
224 def sync(self):
225 """
226 Weather the matrix is synchronous or not.
227
228 :meta private:
229 """
230 return self._sync
231
[docs]
232 def start(self):
233 """Allows the matrix to update."""
234 self.running = True
235
[docs]
236 def stop(self):
237 """Prevents the matrix from updating."""
238 self.running = False
239 self.matrix = None # prevent rendering # type: ignore[assignment]
240
241 def __del__(self):
242 if self.running:
243 with contextlib.suppress(Exception):
244 self.stop()
245
246 def _signal_handler(self, signum: int, _):
247 """
248 Signal handler for stopping the simulation, e.g. when pressing Ctrl+C
249 in the terminal.
250
251 Calls :py:meth:`.stop`.
252
253 :meta private:
254 """
255 from classes.ui.keyboard_controls import RSSKeyboardControl # noqa: PLC0415 # lazy import
256
257 logger.info(f"DetectionMatrix: signal {signum} received. Stopping.")
258 self.stop()
259 # Can only have one signal handler!
260 RSSKeyboardControl._signal_handler(signum, _)
261
262 def _add_signal_handler(self):
263 """
264 Adds the signal handler for stopping the simulation.
265
266 :meta private:
267 """
268 signal.signal(signal.SIGINT, self._signal_handler)
269
270
[docs]
271class AsyncDetectionMatrix(DetectionMatrix):
272 """
273 Asynchronous version of the :py:class:`DetectionMatrix`.
274
275 Will calculate the matrix update in a separate thread.
276 """
277
[docs]
278 def __init__(
279 self, ego_vehicle: carla.Actor, *, road_lane_ids: Optional[Set[RoadLaneId]] = None, sleep_time: float = 0.1
280 ):
281 """
282 Parameters:
283 ego_vehicle: The ego vehicle.
284 sleep_time: The time to sleep between updates. Defaults to 0.1 seconds
285 road_lane_ids: The road and lane IDs to consider. If not provided, all will be considered.
286 """
287 super().__init__(ego_vehicle, road_lane_ids)
288 self._sync = False
289 self.sleep_time = sleep_time
290 self.lock = threading.Lock()
291 self.worker_thread = threading.Thread(target=self._worker, daemon=True)
292
293 # TODO: add signal handler to interrupt the thread faster
294
[docs]
295 def update(self) -> None:
296 """Not available in the async version."""
297
298 def _worker(self) -> None:
299 while self.running:
300 try:
301 new_matrix = self._calculate_update()
302 with self.lock:
303 self.matrix = new_matrix
304 except (RuntimeError, OSError) as e:
305 print(f"Fatal Error in matrix calculation: {e}")
306 raise
307 except Exception as e:
308 print(f"Error in matrix calculation: {e}")
309 if self.sleep_time:
310 time.sleep(self.sleep_time)
311
[docs]
312 def getMatrix(self):
313 with self.lock:
314 return self.matrix
315
[docs]
316 def start(self):
317 self.running = True
318 self.worker_thread.start() # NOTE: This does not allow restart
319
[docs]
320 def stop(self, timeout: float | None = None):
321 """
322 See Also:
323 :meth:`threading.Thread.join`
324
325 Raises:
326 RuntimeError: if **timeout** is not None and the thread is still alive after the time.
327 """
328 self.running = False
329 if self.worker_thread.is_alive():
330 self.worker_thread.join(timeout)
331 else:
332 logger.info("DetectionMatrix.stop called multiple times.")
333 self.matrix = None # prevent rendering # type: ignore[assignment]
334
335 def __del__(self):
336 self.running = False
337 with contextlib.suppress(Exception):
338 self.worker_thread.join(3.0)
339 self.matrix = None # type: ignore[assignment]