Source code for classes.sensors.rss_visualization
1"""
2This module is based on the RSS example shipped with CARLA.
3"""
4
5#
6# Copyright (c) 2020 Intel Corporation
7#
8# pyright: reportAttributeAccessIssue=warning
9
10from __future__ import annotations
11
12import math
13import weakref
14from enum import Enum
15from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union
16from typing import cast as assure_type
17
18import carla
19import numpy as np
20import pygame
21
22from classes import CanBeDummy
23from classes.sensors._sensor_interface import CustomSensorInterface
24from classes.constants import AD_RSS_AVAILABLE
25from launch_tools import CarlaDataProvider
26
27if AD_RSS_AVAILABLE:
28 from carla import ad
29
30 RssStateEvaluator = ad.rss.state.RssStateEvaluator
31 EVALUATOR_NONE_STATE = RssStateEvaluator.names["None"]
32 _RSS_EVALUATOR_SAME_DIRECTION = {
33 RssStateEvaluator.LongitudinalDistanceSameDirectionOtherInFront,
34 RssStateEvaluator.LongitudinalDistanceSameDirectionEgoFront,
35 }
36 _RSS_EVALUATOR_OPPOSITE_DIRECTION = {
37 RssStateEvaluator.LongitudinalDistanceOppositeDirectionEgoCorrectLane,
38 RssStateEvaluator.LongitudinalDistanceOppositeDirection,
39 }
40
41if TYPE_CHECKING:
42 assert ad # remove Unbound type # type: ignore
43 from classes.sensors.rss_sensor import RssStateInfo
44
45
[docs]
46class Color:
47 black = (0, 0, 0)
48 gray = (128, 128, 128)
49 white = (255, 255, 255)
50 red = (255, 0, 0)
51 green = (0, 255, 0)
52 blue = (0, 0, 255)
53
54 carla_gray = carla.Color(150, 150, 150)
55 carla_red = carla.Color(255, 0, 0)
56 carla_green = carla.Color(0, 255, 0)
57 carla_blue = carla.Color(0, 0, 255)
58
59
60class RssStateVisualizer(CanBeDummy):
61 def __init__(self, display_dimensions: tuple[int, int], font: pygame.font.Font, world: carla.World): # noqa: ARG002
62 self._surface = None
63 self._display_dimensions = display_dimensions
64 self._font = font
65 self._world = CarlaDataProvider.get_world()
66
67 def tick(self, individual_rss_states: Iterable["RssStateInfo"]) -> None:
68 state_surface = pygame.Surface((220, self._display_dimensions[1]))
69 state_surface.set_colorkey(pygame.Color("black"))
70 v_offset = 0
71
72 if individual_rss_states:
73 surface = self._font.render("RSS States:", True, (255, 255, 255))
74 state_surface.blit(surface, (8, v_offset))
75 v_offset += 26
76 for state in individual_rss_states:
77 object_name = "Obj"
78 if state.rss_state.objectId == 18446744073709551614:
79 object_name = "Border Left"
80 elif state.rss_state.objectId == 18446744073709551615:
81 object_name = "Border Right"
82 else:
83 other_actor = state.get_actor(self._world)
84 if other_actor:
85 li = list(other_actor.type_id.split("."))
86 if li:
87 li.pop(0)
88 li = [element.capitalize() for element in li]
89
90 object_name = " ".join(li).strip()[:15]
91
92 mode = "?"
93 if state.actor_calculation_mode == ad.rss.map.RssMode.Structured:
94 mode = "S"
95 elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured:
96 mode = "U"
97 elif state.actor_calculation_mode == ad.rss.map.RssMode.NotRelevant:
98 mode = "-"
99 item = f"{mode:>4} {state.distance:>2.0f}m {object_name:>8}"
100
101 surface = self._font.render(item, True, Color.white)
102 state_surface.blit(surface, (5, v_offset))
103 color = Color.gray
104 if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant:
105 color = Color.red if state.is_dangerous else Color.green
106 pygame.draw.circle(state_surface, color, (12, v_offset + 7), 5)
107 xpos = 184
108 if state.actor_calculation_mode == ad.rss.map.RssMode.Structured:
109 # Unsafe longitudinalState
110 if not state.rss_state.longitudinalState.isSafe:
111 if state.rss_state.longitudinalState.rssStateInformation.evaluator in _RSS_EVALUATOR_SAME_DIRECTION:
112 pygame.draw.polygon(
113 state_surface,
114 (255, 255, 255),
115 (
116 (xpos + 1, v_offset + 1 + 4),
117 (xpos + 6, v_offset + 1 + 0),
118 (xpos + 11, v_offset + 1 + 4),
119 (xpos + 7, v_offset + 1 + 4),
120 (xpos + 7, v_offset + 1 + 12),
121 (xpos + 5, v_offset + 1 + 12),
122 (xpos + 5, v_offset + 1 + 4),
123 ),
124 )
125 xpos += 14
126 elif (
127 state.rss_state.longitudinalState.rssStateInformation.evaluator
128 in _RSS_EVALUATOR_OPPOSITE_DIRECTION
129 ):
130 pygame.draw.polygon(
131 state_surface,
132 (255, 255, 255),
133 (
134 (xpos + 2, v_offset + 1 + 8),
135 (xpos + 6, v_offset + 1 + 12),
136 (xpos + 10, v_offset + 1 + 8),
137 (xpos + 7, v_offset + 1 + 8),
138 (xpos + 7, v_offset + 1 + 0),
139 (xpos + 5, v_offset + 1 + 0),
140 (xpos + 5, v_offset + 1 + 8),
141 ),
142 )
143 xpos += 14
144
145 # Unsafe Laterals: Draw Left/Right arrows
146 # Right
147 if (
148 not state.rss_state.lateralStateRight.isSafe
149 and state.rss_state.lateralStateRight.rssStateInformation.evaluator != EVALUATOR_NONE_STATE
150 ):
151 pygame.draw.polygon(
152 state_surface,
153 (255, 255, 255),
154 (
155 (xpos + 0, v_offset + 1 + 4),
156 (xpos + 8, v_offset + 1 + 4),
157 (xpos + 8, v_offset + 1 + 1),
158 (xpos + 12, v_offset + 1 + 6),
159 (xpos + 8, v_offset + 1 + 10),
160 (xpos + 8, v_offset + 1 + 8),
161 (xpos + 0, v_offset + 1 + 8),
162 ),
163 )
164 xpos += 14
165 # Left
166 if (
167 not state.rss_state.lateralStateLeft.isSafe
168 and state.rss_state.lateralStateLeft.rssStateInformation.evaluator != EVALUATOR_NONE_STATE
169 ):
170 pygame.draw.polygon(
171 state_surface,
172 (255, 255, 255),
173 (
174 (xpos + 0, v_offset + 1 + 6),
175 (xpos + 4, v_offset + 1 + 1),
176 (xpos + 4, v_offset + 1 + 4),
177 (xpos + 12, v_offset + 1 + 4),
178 (xpos + 12, v_offset + 1 + 8),
179 (xpos + 4, v_offset + 1 + 8),
180 (xpos + 4, v_offset + 1 + 10),
181 ),
182 )
183 xpos += 14
184 elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured:
185 text = ""
186 if state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.DriveAway:
187 text = " D"
188 elif (
189 state.rss_state.unstructuredSceneState.response
190 == ad.rss.state.UnstructuredSceneResponse.ContinueForward
191 ):
192 text = " C"
193 elif state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.Brake:
194 text = " B"
195 surface = self._font.render(text, True, Color.white)
196 state_surface.blit(surface, (xpos, v_offset))
197
198 v_offset += 14
199 self._surface = state_surface
200
201 def render(self, display: pygame.Surface, v_offset: int):
202 if self._surface:
203 display.blit(self._surface, (0, v_offset))
204
205
206# ==============================================================================
207# -- RssUnstructuredSceneVisualizer ------------------------------------------------
208# ==============================================================================
209
210
[docs]
211class RssUnstructuredSceneVisualizerMode(Enum):
212 disabled = 1
213 window = 2
214 fullscreen = 3
215
216
[docs]
217class RssUnstructuredSceneVisualizer(CanBeDummy, CustomSensorInterface):
218 """Provides a top-view over the setting?"""
219
[docs]
220 def __init__(
221 self, parent_actor: carla.Actor, world, display_dimensions: tuple[int, int], gamma_correction: float = 2.2
222 ): # noqa: ARG002
223 self._last_rendered_frame = -1
224 self._surface = None
225 self._current_rss_surface: Optional[Tuple[int, pygame.Surface]] = None
226 self.current_camera_surface: Tuple[int, pygame.Surface] = (0, None)
227 self._world: carla.World = CarlaDataProvider.get_world()
228 self._parent_actor = parent_actor
229 self._display_dimensions = display_dimensions
230 self._camera = None # type: ignore[assignment]
231 self._mode = RssUnstructuredSceneVisualizerMode.disabled
232 self._gamma = gamma_correction
233
234 self.restart(RssUnstructuredSceneVisualizerMode.window)
235
236 @property
237 def sensor(self) -> carla.Sensor:
238 return self._camera
239
240 @sensor.setter
241 def sensor(self, value):
242 # Needed for SensorInterface.destroy
243 if value is None:
244 self._camera = None # type: ignore[assignment]
245 else:
246 raise ValueError("Cannot set sensor of RssUnstructuredSceneVisualizer. Use _camera instead.")
247
[docs]
248 def destroy(self):
249 super().destroy()
250 self._camera = None # type: ignore[assignment]
251
[docs]
252 def restart(self, mode):
253 # setup up top down camera
254 self.destroy()
255 self._mode = mode
256
257 spawn_sensor = False
258 if mode == RssUnstructuredSceneVisualizerMode.window:
259 self._dim = (self._display_dimensions[0] / 3, self._display_dimensions[1] / 2)
260 spawn_sensor = True
261 elif mode == RssUnstructuredSceneVisualizerMode.fullscreen:
262 self._dim = (self._display_dimensions[0], self._display_dimensions[1])
263 spawn_sensor = True
264 else:
265 self._surface = None
266
267 if spawn_sensor:
268 self._calibration = np.identity(3)
269 self._calibration[0, 2] = self._dim[0] / 2.0
270 self._calibration[1, 2] = self._dim[1] / 2.0
271 self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / (
272 2.0 * np.tan(90.0 * np.pi / 360.0)
273 ) # fov default: 90.0
274
275 bp_library = CarlaDataProvider._blueprint_library
276 bp: carla.ActorBlueprint = bp_library.find("sensor.camera.rgb")
277 bp.set_attribute("image_size_x", str(self._dim[0]))
278 bp.set_attribute("image_size_y", str(self._dim[1]))
279 if bp.has_attribute("gamma"):
280 bp.set_attribute("gamma", str(self._gamma))
281
282 self._camera: carla.Sensor = assure_type(
283 "carla.Sensor",
284 self._world.spawn_actor(
285 bp,
286 carla.Transform(carla.Location(x=7.5, z=10), carla.Rotation(pitch=-90)),
287 attach_to=self._parent_actor,
288 ),
289 )
290 # We need to pass the lambda a weak reference to self to avoid
291 # circular reference.
292 weak_self = weakref.ref(self)
293 self._camera.listen(
294 lambda image: self._parse_image(weak_self, image) # type: ignore[arg-type]
295 )
296
[docs]
297 def update_surface(self, cam_frame: Union[int, None], rss_frame: Union[int, None]):
298 if self._mode == RssUnstructuredSceneVisualizerMode.disabled:
299 return
300 render = False
301
302 if cam_frame and self._current_rss_surface and self._current_rss_surface[0] == cam_frame:
303 render = True
304
305 if rss_frame and self.current_camera_surface and self.current_camera_surface[0] == rss_frame:
306 render = True
307
308 if render:
309 surface = self.current_camera_surface[1]
310 surface.blit(self._current_rss_surface[1], (0, 0)) # pyright: ignore[reportOptionalSubscript]
311 rect = pygame.Rect((0, 0), (2, surface.get_height()))
312 pygame.draw.rect(surface, Color.black, rect, 0)
313 rect = pygame.Rect((0, 0), (surface.get_width(), 2))
314 pygame.draw.rect(surface, Color.black, rect, 0)
315 rect = pygame.Rect((0, surface.get_height() - 2), (surface.get_width(), surface.get_height()))
316 pygame.draw.rect(surface, Color.black, rect, 0)
317 rect = pygame.Rect((surface.get_width() - 2, 0), (surface.get_width(), surface.get_width()))
318 pygame.draw.rect(surface, Color.black, rect, 0)
319 self._surface = surface
320
[docs]
321 def toggle_camera(self):
322 print("Toggle RssUnstructuredSceneVisualizer")
323 if self._mode == RssUnstructuredSceneVisualizerMode.window:
324 self.restart(RssUnstructuredSceneVisualizerMode.fullscreen)
325 elif self._mode == RssUnstructuredSceneVisualizerMode.fullscreen:
326 self.restart(RssUnstructuredSceneVisualizerMode.disabled)
327 elif self._mode == RssUnstructuredSceneVisualizerMode.disabled:
328 self.restart(RssUnstructuredSceneVisualizerMode.window)
329
330 @staticmethod
331 def _parse_image(weak_self: "weakref.ReferenceType[RssUnstructuredSceneVisualizer]", image: carla.Image):
332 self = weak_self()
333 if not self:
334 return
335 image.convert(carla.ColorConverter.Raw)
336 array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
337 array = np.reshape(array, (image.height, image.width, 4))
338 array = array[:, :, :3]
339 array = array[:, :, ::-1]
340 surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
341 self.current_camera_surface = (image.frame, surface)
342 self.update_surface(image.frame, None)
343
[docs]
344 @staticmethod
345 def rotate_around_point(xy, radians, origin):
346 """Rotate a point around a given point."""
347 x, y = xy
348 offset_x, offset_y = origin
349 adjusted_x = x - offset_x
350 adjusted_y = y - offset_y
351 cos_rad = math.cos(radians)
352 sin_rad = math.sin(radians)
353 qx = offset_x + cos_rad * adjusted_x - sin_rad * adjusted_y
354 qy = offset_y + sin_rad * adjusted_x + cos_rad * adjusted_y
355
356 return qx, qy
357
[docs]
358 def tick(self, frame, rss_response, allowed_heading_ranges):
359 if not self._camera:
360 return
361 surface = pygame.Surface(self._dim)
362 surface.set_colorkey(pygame.Color("black"))
363 surface.set_alpha(180)
364 try:
365 lines = RssUnstructuredSceneVisualizer.get_trajectory_sets(
366 rss_response.rss_state_snapshot, self._camera.get_transform(), self._calibration
367 )
368
369 polygons = [
370 (
371 RssUnstructuredSceneVisualizer.transform_points(
372 RssUnstructuredSceneVisualizer._get_points_from_pairs(
373 RssUnstructuredSceneVisualizer.draw_heading_range(
374 heading_range, rss_response.ego_dynamics_on_route
375 )
376 ),
377 self._camera.get_transform(),
378 self._calibration,
379 ),
380 Color.blue,
381 )
382 for heading_range in allowed_heading_ranges
383 ]
384
385 RssUnstructuredSceneVisualizer.draw_lines(surface, lines)
386 RssUnstructuredSceneVisualizer.draw_polygons(surface, polygons)
387 except RuntimeError as e:
388 print(f"ERROR {e}")
389 self._current_rss_surface = (frame, surface)
390 self.update_surface(None, frame)
391
[docs]
392 def render(self, display):
393 if self._surface:
394 display.blit(self._surface, (display.get_width() - self._dim[0], 0))
395
[docs]
396 @staticmethod
397 def draw_heading_range(heading_range, ego_dynamics_on_route):
398 line = [(float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y))]
399 length = 3.0
400 current_angle = float(heading_range.begin)
401 max_angle = float(heading_range.end)
402 if heading_range.end < heading_range.begin:
403 max_angle += 2.0 * np.pi
404
405 while current_angle < max_angle:
406 line.append((
407 float(ego_dynamics_on_route.ego_center.x) + length * np.cos(current_angle),
408 float(ego_dynamics_on_route.ego_center.y) + length * np.sin(current_angle),
409 ))
410 current_angle += 0.2
411
412 if current_angle != max_angle:
413 line.append((
414 float(ego_dynamics_on_route.ego_center.x) + length * np.cos(max_angle),
415 float(ego_dynamics_on_route.ego_center.y) + length * np.sin(max_angle),
416 ))
417
418 line.append((float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y)))
419 return line
420
[docs]
421 @staticmethod
422 def get_trajectory_sets(rss_state_snapshot: "ad.rss.state.RssStateSnapshot", camera_transform, calibration):
423 """
424 Creates 3D bounding boxes based on carla vehicle list and camera.
425 """
426 trajectory_sets = [
427 # ego
428 (
429 RssUnstructuredSceneVisualizer.transform_points(
430 RssUnstructuredSceneVisualizer._get_trajectory_set_points(
431 rss_state_snapshot.unstructuredSceneEgoInformation.brakeTrajectorySet
432 ),
433 camera_transform,
434 calibration,
435 ),
436 Color.red,
437 ),
438 (
439 RssUnstructuredSceneVisualizer.transform_points(
440 RssUnstructuredSceneVisualizer._get_trajectory_set_points(
441 rss_state_snapshot.unstructuredSceneEgoInformation.continueForwardTrajectorySet
442 ),
443 camera_transform,
444 calibration,
445 ),
446 Color.green,
447 ),
448 ]
449
450 # others
451 for state in rss_state_snapshot.individualResponses:
452 if state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet:
453 trajectory_sets.append((
454 RssUnstructuredSceneVisualizer.transform_points(
455 RssUnstructuredSceneVisualizer._get_trajectory_set_points(
456 state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet
457 ),
458 camera_transform,
459 calibration,
460 ),
461 Color.red,
462 ))
463 if state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet:
464 trajectory_sets.append((
465 RssUnstructuredSceneVisualizer.transform_points(
466 RssUnstructuredSceneVisualizer._get_trajectory_set_points(
467 state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet
468 ),
469 camera_transform,
470 calibration,
471 ),
472 Color.green,
473 ))
474
475 return trajectory_sets
476
[docs]
477 @staticmethod
478 def draw_lines(surface, lines):
479 """
480 Draws lines on pygame display.
481 """
482 for line, color in lines:
483 if len(line) > 1:
484 pygame.draw.lines(surface, color, True, line, 2)
485
[docs]
486 @staticmethod
487 def draw_polygons(surface, polygons):
488 """
489 Draws polygons on pygame display.
490 """
491 for polygon, color in polygons:
492 if len(polygon) > 1:
493 pygame.draw.polygon(surface, color, polygon)
494
[docs]
495 @staticmethod
496 def transform_points(world_cords, camera_transform, calibration):
497 """
498 Returns trajectory set projected to camera view
499 """
500 if world_cords.size == 0:
501 return []
502 world_cords = np.transpose(world_cords)
503 cords_x_y_z = RssUnstructuredSceneVisualizer._world_to_sensor(world_cords, camera_transform)[:3, :]
504 cords_y_minus_z_x = np.stack([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]])
505 ts = np.transpose(np.dot(calibration, cords_y_minus_z_x))
506 camera_ts = np.stack([ts[:, 0] / ts[:, 2], ts[:, 1] / ts[:, 2], ts[:, 2]], axis=1)
507 return [(int(point[0]), int(point[1])) for point in camera_ts] # line_to_draw
508
509 @staticmethod
510 def _get_trajectory_set_points(trajectory_set):
511 """ """
512 cords = np.zeros((len(trajectory_set), 4))
513 for i, pt in enumerate(trajectory_set):
514 cords[i, :] = [pt.x, -pt.y, 0, 1]
515 return cords
516
517 @staticmethod
518 def _get_points_from_pairs(trajectory_set):
519 """ """
520 cords = np.zeros((len(trajectory_set), 4))
521 for i, pt in enumerate(trajectory_set):
522 cords[i, :] = [pt[0], -pt[1], 0, 1]
523 return cords
524
525 @staticmethod
526 def _world_to_sensor(cords, camera_transform):
527 """
528 Transforms world coordinates to sensor.
529 """
530 world_sensor_matrix = camera_transform.get_inverse_matrix()
531 sensor_cords = np.dot(world_sensor_matrix, cords)
532 return sensor_cords
533
534
535# ==============================================================================
536# -- RssBoundingBoxVisualizer ------------------------------------------------------
537# ==============================================================================
538
539
540class RssBoundingBoxVisualizer(CanBeDummy):
541 def __init__(self, display_dimensions, world, camera: carla.Sensor):
542 self._last_camera_frame = 0
543 self._surface_for_frame = []
544 self._world = world
545 self._dim = display_dimensions
546 self._calibration = np.identity(3)
547 self._calibration[0, 2] = self._dim[0] / 2.0
548 self._calibration[1, 2] = self._dim[1] / 2.0
549 self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / (
550 2.0 * np.tan(90.0 * np.pi / 360.0)
551 ) # fov default: 90.0
552 self._camera = camera
553
554 def tick(self, frame, individual_rss_states):
555 if len(self._surface_for_frame) > 0:
556 try:
557 while self._surface_for_frame[0][0] < self._last_camera_frame:
558 self._surface_for_frame.pop(0)
559 except IndexError:
560 return
561
562 # only render on new frame
563 if len(self._surface_for_frame) > 0 and self._surface_for_frame[0][0] == frame:
564 return
565
566 surface = pygame.Surface(self._dim)
567 surface.set_colorkey(pygame.Color("black"))
568 surface.set_alpha(80)
569 try:
570 bounding_boxes = RssBoundingBoxVisualizer.get_bounding_boxes(
571 individual_rss_states, self._camera.get_transform(), self._calibration, self._world
572 )
573 RssBoundingBoxVisualizer.draw_bounding_boxes(surface, bounding_boxes)
574 self._surface_for_frame.append((frame, surface, len(bounding_boxes)))
575 except RuntimeError:
576 pass
577
578 def render(self, display, current_camera_frame):
579 rendered = False
580 boxes_to_render = 0
581 for frame, surface, box_count in self._surface_for_frame:
582 if frame == current_camera_frame:
583 display.blit(surface, (0, 0))
584 boxes_to_render = box_count
585 rendered = True
586 break
587 if not rendered and boxes_to_render > 0:
588 print(f"Warning: {boxes_to_render} bounding boxes were not drawn.")
589 self._last_camera_frame = current_camera_frame
590
591 @staticmethod
592 def get_bounding_boxes(individual_rss_states, camera_transform, calibration, world: carla.World):
593 """
594 Creates 3D bounding boxes based on carla vehicle list and camera.
595 """
596 bounding_boxes = []
597 for state in individual_rss_states:
598 if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant and state.is_dangerous:
599 other_actor = state.get_actor(world)
600 if other_actor:
601 bounding_boxes.append(
602 RssBoundingBoxVisualizer.get_bounding_box(other_actor, camera_transform, calibration)
603 )
604 # filter objects behind camera
605 bounding_boxes = [bb for bb in bounding_boxes if all(bb[:, 2] > 0)]
606 return bounding_boxes
607
608 @staticmethod
609 def draw_bounding_boxes(
610 surface: pygame.Surface, bounding_boxes: list[np.ndarray], color: pygame.Color = pygame.Color("red") # noqa: B008
611 ) -> None: # noqa: B008
612 """
613 Draws bounding boxes on pygame display.
614 """
615 for bbox in bounding_boxes:
616 points = [(int(bbox[i, 0]), int(bbox[i, 1])) for i in range(8)]
617 # draw lines
618 # base
619 polygon = [points[0], points[1], points[2], points[3]]
620 pygame.draw.polygon(surface, color, polygon)
621 # top
622 polygon = [points[4], points[5], points[6], points[7]]
623 pygame.draw.polygon(surface, color, polygon)
624 # base-top
625 polygon = [points[0], points[1], points[5], points[4]]
626 pygame.draw.polygon(surface, color, polygon)
627 polygon = [points[1], points[2], points[6], points[5]]
628 pygame.draw.polygon(surface, color, polygon)
629 polygon = [points[2], points[6], points[7], points[3]]
630 pygame.draw.polygon(surface, color, polygon)
631 polygon = [points[0], points[4], points[7], points[3]]
632 pygame.draw.polygon(surface, color, polygon)
633
634 @staticmethod
635 def get_bounding_box(vehicle, camera_transform, calibration):
636 """
637 Returns 3D bounding box for a vehicle based on camera view.
638 """
639
640 bb_cords = RssBoundingBoxVisualizer._create_bb_points(vehicle)
641 cords_x_y_z = RssBoundingBoxVisualizer._vehicle_to_sensor(bb_cords, vehicle, camera_transform)[:3, :]
642 cords_y_minus_z_x = np.stack([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]])
643 bbox = np.transpose(np.dot(calibration, cords_y_minus_z_x))
644 camera_bbox = np.stack([bbox[:, 0] / bbox[:, 2], bbox[:, 1] / bbox[:, 2], bbox[:, 2]], axis=1)
645 return camera_bbox
646
647 @staticmethod
648 def _create_bb_points(vehicle: carla.Actor):
649 """
650 Returns 3D bounding box for a vehicle.
651 """
652
653 cords = np.empty((8, 4))
654 extent = vehicle.bounding_box.extent
655 x = extent.x
656 y = extent.y
657 z = extent.z
658 cords[0] = [x, y, -z, 1.0]
659 cords[1] = [-x, y, -z, 1.0]
660 cords[2] = [-x, -y, -z, 1.0]
661 cords[3] = [x, -y, -z, 1.0]
662 cords[4] = [x, y, z, 1.0]
663 cords[5] = [-x, y, z, 1.0]
664 cords[6] = [-x, -y, z, 1.0]
665 cords[7] = [x, -y, z, 1.0]
666 return cords
667
668 @staticmethod
669 def _vehicle_to_sensor(cords, vehicle, camera_transform):
670 """
671 Transforms coordinates of a vehicle bounding box to sensor.
672 """
673
674 world_cord = RssBoundingBoxVisualizer._vehicle_to_world(cords, vehicle)
675 # Sensor coordinates
676 return RssBoundingBoxVisualizer._world_to_sensor(world_cord, camera_transform)
677
678 @staticmethod
679 def _vehicle_to_world(cords, vehicle: carla.Actor):
680 """
681 Transforms coordinates of a vehicle bounding box to world.
682 """
683
684 bb_vehicle_matrix = carla.Transform(vehicle.bounding_box.location).get_matrix()
685 vehicle_world_matrix = vehicle.get_transform().get_matrix()
686 bb_world_matrix = np.dot(vehicle_world_matrix, bb_vehicle_matrix)
687 # World coordinates
688 return np.dot(bb_world_matrix, np.transpose(cords))
689
690 @staticmethod
691 def _world_to_sensor(cords, camera_transform):
692 """
693 Transforms world coordinates to sensor.
694 """
695
696 world_sensor_matrix = camera_transform.get_inverse_matrix()
697 # Sensor coordinates
698 return np.dot(world_sensor_matrix, cords)
699
700
701# ==============================================================================
702# -- RssDebugVisualizer ------------------------------------------------------------
703# ==============================================================================
704
705
[docs]
706class RssDebugVisualizationMode(Enum):
707 Off = 1
708 RouteOnly = 2
709 VehicleStateOnly = 3
710 VehicleStateAndRoute = 4
711 All = 5
712
713
[docs]
714class RssDebugVisualizer:
[docs]
715 def __init__(
716 self,
717 player: carla.Vehicle,
718 world: carla.World,
719 visualization_mode: Union[RssDebugVisualizationMode, str, int] = RssDebugVisualizationMode.Off,
720 ):
721 self._world = world
722 self._player = player
723 if isinstance(visualization_mode, str):
724 visualization_mode = RssDebugVisualizationMode[visualization_mode]
725 elif isinstance(visualization_mode, int):
726 visualization_mode = RssDebugVisualizationMode(visualization_mode)
727 self._visualization_mode: RssDebugVisualizationMode = visualization_mode
728
[docs]
729 def toggleMode(self):
730 if self._visualization_mode == RssDebugVisualizationMode.All:
731 self._visualization_mode = RssDebugVisualizationMode.Off
732 elif self._visualization_mode == RssDebugVisualizationMode.Off:
733 self._visualization_mode = RssDebugVisualizationMode.RouteOnly
734 elif self._visualization_mode == RssDebugVisualizationMode.RouteOnly:
735 self._visualization_mode = RssDebugVisualizationMode.VehicleStateOnly
736 elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateOnly:
737 self._visualization_mode = RssDebugVisualizationMode.VehicleStateAndRoute
738 elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute:
739 self._visualization_mode = RssDebugVisualizationMode.All
740 print(f"New Debug Visualizer Mode {self._visualization_mode}")
741
[docs]
742 def tick(self, route, dangerous, individual_rss_states, ego_dynamics_on_route):
743 if self._visualization_mode in {
744 RssDebugVisualizationMode.RouteOnly,
745 RssDebugVisualizationMode.VehicleStateAndRoute,
746 RssDebugVisualizationMode.All,
747 }:
748 self.visualize_route(dangerous, route)
749
750 if self._visualization_mode in {
751 RssDebugVisualizationMode.VehicleStateOnly,
752 RssDebugVisualizationMode.VehicleStateAndRoute,
753 RssDebugVisualizationMode.All,
754 }:
755 self.visualize_rss_results(individual_rss_states)
756
757 if self._visualization_mode == RssDebugVisualizationMode.All:
758 self.visualize_ego_dynamics(ego_dynamics_on_route)
759
[docs]
760 def visualize_route(self, dangerous: bool, route: Optional[ad.map.route.FullRoute]):
761 if not route:
762 return
763 right_lane_edges = {}
764 left_lane_edges = {}
765
766 for road_segment in route.roadSegments:
767 right_most_lane = road_segment.drivableLaneSegments[0]
768 if right_most_lane.laneInterval.laneId not in right_lane_edges:
769 edge = ad.map.route.getRightProjectedENUEdge(right_most_lane.laneInterval)
770 right_lane_edges[right_most_lane.laneInterval.laneId] = edge
771 intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection(
772 right_most_lane.laneInterval.laneId
773 )
774
775 color = carla.Color(r=(128 if dangerous else 255))
776 if intersection_lane:
777 color.b = 128 if dangerous else 255
778
779 self.visualize_enu_edge(edge, color, self._player.get_location().z)
780
781 left_most_lane = road_segment.drivableLaneSegments[-1]
782 if left_most_lane.laneInterval.laneId not in left_lane_edges:
783 edge = ad.map.route.getLeftProjectedENUEdge(left_most_lane.laneInterval)
784 left_lane_edges[left_most_lane.laneInterval.laneId] = edge
785 intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection(
786 left_most_lane.laneInterval.laneId
787 )
788 color = carla.Color(g=(128 if dangerous else 255))
789 if intersection_lane:
790 color.b = 128 if dangerous else 255
791
792 self.visualize_enu_edge(edge, color, self._player.get_location().z)
793
[docs]
794 def visualize_enu_edge(self, edge, color, z_offset):
795 for point in edge:
796 carla_point = carla.Location(x=float(point.x), y=-1.0 * float(point.y), z=float(point.z) + z_offset)
797 self._world.debug.draw_point(carla_point, 0.1, color, 0.1, False)
798
[docs]
799 def visualize_rss_results(self, state_snapshot: Iterable[RssStateInfo]):
800 for state in state_snapshot:
801 other_actor = state.get_actor(self._world)
802 if not other_actor:
803 # print("Actor not found. Skip visualizing state {}".format(state))
804 continue
805 ego_point = self._player.get_location()
806 ego_point.z += 0.05
807 yaw = self._player.get_transform().rotation.yaw
808 cosine = math.cos(math.radians(yaw))
809 sine = math.sin(math.radians(yaw))
810 line_offset = carla.Location(-sine * 0.1, cosine * 0.1, 0.0)
811
812 point = other_actor.get_location()
813 point.z += 0.05
814 indicator_color = Color.carla_green
815 dangerous = ad.rss.state.isDangerous(state.rss_state)
816 if dangerous:
817 indicator_color = Color.carla_red
818 elif state.rss_state.situationType == ad.rss.situation.SituationType.NotRelevant:
819 indicator_color = Color.carla_gray
820
821 if self._visualization_mode == RssDebugVisualizationMode.All:
822 # the connection lines are only visualized if All is requested
823 lon_color = carla.Color(indicator_color.r, indicator_color.g, indicator_color.b)
824 lat_l_color = carla.Color(indicator_color.r, indicator_color.g, indicator_color.b)
825 lat_r_color = carla.Color(indicator_color.r, indicator_color.g, indicator_color.b)
826 if not state.rss_state.longitudinalState.isSafe:
827 lon_color.r = 255
828 lon_color.g = 0 if dangerous else 255
829 if not state.rss_state.lateralStateLeft.isSafe:
830 lat_l_color.r = 255
831 lat_l_color.g = 0 if dangerous else 255
832 if not state.rss_state.lateralStateRight.isSafe:
833 lat_r_color.r = 255
834 lat_r_color.g = 0 if dangerous else 255
835 self._world.debug.draw_line(ego_point, point, 0.1, lon_color, 0.066, False)
836 self._world.debug.draw_line(
837 ego_point - line_offset, point - line_offset, 0.1, lat_l_color, 0.066, False
838 )
839 self._world.debug.draw_line(
840 ego_point + line_offset, point + line_offset, 0.1, lat_r_color, 0.066, False
841 )
842 point.z += 3.0
843 self._world.debug.draw_point(point, 0.2, indicator_color, 0.066, False)
844
[docs]
845 def visualize_ego_dynamics(self, ego_dynamics_on_route):
846 sin_heading = math.sin(float(ego_dynamics_on_route.route_heading))
847 cos_heading = math.cos(float(ego_dynamics_on_route.route_heading))
848
849 heading_location_start = self._player.get_location()
850 heading_location_start.x -= cos_heading * 10.0
851 heading_location_start.y += sin_heading * 10.0
852 heading_location_start.z += 0.5
853 heading_location_end = self._player.get_location()
854 heading_location_end.x += cos_heading * 10.0
855 heading_location_end.y -= sin_heading * 10.0
856 heading_location_end.z += 0.5
857
858 self._world.debug.draw_arrow(
859 heading_location_start, heading_location_end, 0.1, 0.1, Color.carla_blue, 0.066, False
860 )
861
862 sin_center = math.sin(float(ego_dynamics_on_route.route_heading) + math.pi / 2.0)
863 cos_center = math.cos(float(ego_dynamics_on_route.route_heading) + math.pi / 2.0)
864 center_location_start = self._player.get_location()
865 center_location_start.x -= cos_center * 2.0
866 center_location_start.y += sin_center * 2.0
867 center_location_start.z += 0.5
868 center_location_end = self._player.get_location()
869 center_location_end.x += cos_center * 2.0
870 center_location_end.y -= sin_center * 2.0
871 center_location_end.z += 0.5
872
873 self._world.debug.draw_line(center_location_start, center_location_end, 0.1, Color.carla_blue, 0.066, False)