Source code for classes.sensors.rss_visualization

  1"""
  2This module is based on the RSS example shipped with CARLA.
  3"""
  4
  5#
  6# Copyright (c) 2020 Intel Corporation
  7#
  8# pyright: reportAttributeAccessIssue=warning
  9
 10from __future__ import annotations
 11
 12import math
 13import weakref
 14from enum import Enum
 15from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union
 16from typing import cast as assure_type
 17
 18import carla
 19import numpy as np
 20import pygame
 21
 22from classes import CanBeDummy
 23from classes.sensors._sensor_interface import CustomSensorInterface
 24from classes.constants import AD_RSS_AVAILABLE
 25from launch_tools import CarlaDataProvider
 26
 27if AD_RSS_AVAILABLE:
 28    from carla import ad
 29
 30    RssStateEvaluator = ad.rss.state.RssStateEvaluator
 31    EVALUATOR_NONE_STATE = RssStateEvaluator.names["None"]
 32    _RSS_EVALUATOR_SAME_DIRECTION = {
 33        RssStateEvaluator.LongitudinalDistanceSameDirectionOtherInFront,
 34        RssStateEvaluator.LongitudinalDistanceSameDirectionEgoFront,
 35    }
 36    _RSS_EVALUATOR_OPPOSITE_DIRECTION = {
 37        RssStateEvaluator.LongitudinalDistanceOppositeDirectionEgoCorrectLane,
 38        RssStateEvaluator.LongitudinalDistanceOppositeDirection,
 39    }
 40
 41if TYPE_CHECKING:
 42    assert ad  # remove Unbound type # type: ignore
 43    from classes.sensors.rss_sensor import RssStateInfo
 44
 45
[docs] 46class Color: 47 black = (0, 0, 0) 48 gray = (128, 128, 128) 49 white = (255, 255, 255) 50 red = (255, 0, 0) 51 green = (0, 255, 0) 52 blue = (0, 0, 255) 53 54 carla_gray = carla.Color(150, 150, 150) 55 carla_red = carla.Color(255, 0, 0) 56 carla_green = carla.Color(0, 255, 0) 57 carla_blue = carla.Color(0, 0, 255)
58 59 60class RssStateVisualizer(CanBeDummy): 61 def __init__(self, display_dimensions: tuple[int, int], font: pygame.font.Font, world: carla.World): # noqa: ARG002 62 self._surface = None 63 self._display_dimensions = display_dimensions 64 self._font = font 65 self._world = CarlaDataProvider.get_world() 66 67 def tick(self, individual_rss_states: Iterable["RssStateInfo"]) -> None: 68 state_surface = pygame.Surface((220, self._display_dimensions[1])) 69 state_surface.set_colorkey(pygame.Color("black")) 70 v_offset = 0 71 72 if individual_rss_states: 73 surface = self._font.render("RSS States:", True, (255, 255, 255)) 74 state_surface.blit(surface, (8, v_offset)) 75 v_offset += 26 76 for state in individual_rss_states: 77 object_name = "Obj" 78 if state.rss_state.objectId == 18446744073709551614: 79 object_name = "Border Left" 80 elif state.rss_state.objectId == 18446744073709551615: 81 object_name = "Border Right" 82 else: 83 other_actor = state.get_actor(self._world) 84 if other_actor: 85 li = list(other_actor.type_id.split(".")) 86 if li: 87 li.pop(0) 88 li = [element.capitalize() for element in li] 89 90 object_name = " ".join(li).strip()[:15] 91 92 mode = "?" 93 if state.actor_calculation_mode == ad.rss.map.RssMode.Structured: 94 mode = "S" 95 elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured: 96 mode = "U" 97 elif state.actor_calculation_mode == ad.rss.map.RssMode.NotRelevant: 98 mode = "-" 99 item = f"{mode:>4} {state.distance:>2.0f}m {object_name:>8}" 100 101 surface = self._font.render(item, True, Color.white) 102 state_surface.blit(surface, (5, v_offset)) 103 color = Color.gray 104 if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant: 105 color = Color.red if state.is_dangerous else Color.green 106 pygame.draw.circle(state_surface, color, (12, v_offset + 7), 5) 107 xpos = 184 108 if state.actor_calculation_mode == ad.rss.map.RssMode.Structured: 109 # Unsafe longitudinalState 110 if not state.rss_state.longitudinalState.isSafe: 111 if state.rss_state.longitudinalState.rssStateInformation.evaluator in _RSS_EVALUATOR_SAME_DIRECTION: 112 pygame.draw.polygon( 113 state_surface, 114 (255, 255, 255), 115 ( 116 (xpos + 1, v_offset + 1 + 4), 117 (xpos + 6, v_offset + 1 + 0), 118 (xpos + 11, v_offset + 1 + 4), 119 (xpos + 7, v_offset + 1 + 4), 120 (xpos + 7, v_offset + 1 + 12), 121 (xpos + 5, v_offset + 1 + 12), 122 (xpos + 5, v_offset + 1 + 4), 123 ), 124 ) 125 xpos += 14 126 elif ( 127 state.rss_state.longitudinalState.rssStateInformation.evaluator 128 in _RSS_EVALUATOR_OPPOSITE_DIRECTION 129 ): 130 pygame.draw.polygon( 131 state_surface, 132 (255, 255, 255), 133 ( 134 (xpos + 2, v_offset + 1 + 8), 135 (xpos + 6, v_offset + 1 + 12), 136 (xpos + 10, v_offset + 1 + 8), 137 (xpos + 7, v_offset + 1 + 8), 138 (xpos + 7, v_offset + 1 + 0), 139 (xpos + 5, v_offset + 1 + 0), 140 (xpos + 5, v_offset + 1 + 8), 141 ), 142 ) 143 xpos += 14 144 145 # Unsafe Laterals: Draw Left/Right arrows 146 # Right 147 if ( 148 not state.rss_state.lateralStateRight.isSafe 149 and state.rss_state.lateralStateRight.rssStateInformation.evaluator != EVALUATOR_NONE_STATE 150 ): 151 pygame.draw.polygon( 152 state_surface, 153 (255, 255, 255), 154 ( 155 (xpos + 0, v_offset + 1 + 4), 156 (xpos + 8, v_offset + 1 + 4), 157 (xpos + 8, v_offset + 1 + 1), 158 (xpos + 12, v_offset + 1 + 6), 159 (xpos + 8, v_offset + 1 + 10), 160 (xpos + 8, v_offset + 1 + 8), 161 (xpos + 0, v_offset + 1 + 8), 162 ), 163 ) 164 xpos += 14 165 # Left 166 if ( 167 not state.rss_state.lateralStateLeft.isSafe 168 and state.rss_state.lateralStateLeft.rssStateInformation.evaluator != EVALUATOR_NONE_STATE 169 ): 170 pygame.draw.polygon( 171 state_surface, 172 (255, 255, 255), 173 ( 174 (xpos + 0, v_offset + 1 + 6), 175 (xpos + 4, v_offset + 1 + 1), 176 (xpos + 4, v_offset + 1 + 4), 177 (xpos + 12, v_offset + 1 + 4), 178 (xpos + 12, v_offset + 1 + 8), 179 (xpos + 4, v_offset + 1 + 8), 180 (xpos + 4, v_offset + 1 + 10), 181 ), 182 ) 183 xpos += 14 184 elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured: 185 text = "" 186 if state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.DriveAway: 187 text = " D" 188 elif ( 189 state.rss_state.unstructuredSceneState.response 190 == ad.rss.state.UnstructuredSceneResponse.ContinueForward 191 ): 192 text = " C" 193 elif state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.Brake: 194 text = " B" 195 surface = self._font.render(text, True, Color.white) 196 state_surface.blit(surface, (xpos, v_offset)) 197 198 v_offset += 14 199 self._surface = state_surface 200 201 def render(self, display: pygame.Surface, v_offset: int): 202 if self._surface: 203 display.blit(self._surface, (0, v_offset)) 204 205 206# ============================================================================== 207# -- RssUnstructuredSceneVisualizer ------------------------------------------------ 208# ============================================================================== 209 210
[docs] 211class RssUnstructuredSceneVisualizerMode(Enum): 212 disabled = 1 213 window = 2 214 fullscreen = 3
215 216
[docs] 217class RssUnstructuredSceneVisualizer(CanBeDummy, CustomSensorInterface): 218 """Provides a top-view over the setting?""" 219
[docs] 220 def __init__( 221 self, parent_actor: carla.Actor, world, display_dimensions: tuple[int, int], gamma_correction: float = 2.2 222 ): # noqa: ARG002 223 self._last_rendered_frame = -1 224 self._surface = None 225 self._current_rss_surface: Optional[Tuple[int, pygame.Surface]] = None 226 self.current_camera_surface: Tuple[int, pygame.Surface] = (0, None) 227 self._world: carla.World = CarlaDataProvider.get_world() 228 self._parent_actor = parent_actor 229 self._display_dimensions = display_dimensions 230 self._camera = None # type: ignore[assignment] 231 self._mode = RssUnstructuredSceneVisualizerMode.disabled 232 self._gamma = gamma_correction 233 234 self.restart(RssUnstructuredSceneVisualizerMode.window)
235 236 @property 237 def sensor(self) -> carla.Sensor: 238 return self._camera 239 240 @sensor.setter 241 def sensor(self, value): 242 # Needed for SensorInterface.destroy 243 if value is None: 244 self._camera = None # type: ignore[assignment] 245 else: 246 raise ValueError("Cannot set sensor of RssUnstructuredSceneVisualizer. Use _camera instead.") 247
[docs] 248 def destroy(self): 249 super().destroy() 250 self._camera = None # type: ignore[assignment]
251
[docs] 252 def restart(self, mode): 253 # setup up top down camera 254 self.destroy() 255 self._mode = mode 256 257 spawn_sensor = False 258 if mode == RssUnstructuredSceneVisualizerMode.window: 259 self._dim = (self._display_dimensions[0] / 3, self._display_dimensions[1] / 2) 260 spawn_sensor = True 261 elif mode == RssUnstructuredSceneVisualizerMode.fullscreen: 262 self._dim = (self._display_dimensions[0], self._display_dimensions[1]) 263 spawn_sensor = True 264 else: 265 self._surface = None 266 267 if spawn_sensor: 268 self._calibration = np.identity(3) 269 self._calibration[0, 2] = self._dim[0] / 2.0 270 self._calibration[1, 2] = self._dim[1] / 2.0 271 self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / ( 272 2.0 * np.tan(90.0 * np.pi / 360.0) 273 ) # fov default: 90.0 274 275 bp_library = CarlaDataProvider._blueprint_library 276 bp: carla.ActorBlueprint = bp_library.find("sensor.camera.rgb") 277 bp.set_attribute("image_size_x", str(self._dim[0])) 278 bp.set_attribute("image_size_y", str(self._dim[1])) 279 if bp.has_attribute("gamma"): 280 bp.set_attribute("gamma", str(self._gamma)) 281 282 self._camera: carla.Sensor = assure_type( 283 "carla.Sensor", 284 self._world.spawn_actor( 285 bp, 286 carla.Transform(carla.Location(x=7.5, z=10), carla.Rotation(pitch=-90)), 287 attach_to=self._parent_actor, 288 ), 289 ) 290 # We need to pass the lambda a weak reference to self to avoid 291 # circular reference. 292 weak_self = weakref.ref(self) 293 self._camera.listen( 294 lambda image: self._parse_image(weak_self, image) # type: ignore[arg-type] 295 )
296
[docs] 297 def update_surface(self, cam_frame: Union[int, None], rss_frame: Union[int, None]): 298 if self._mode == RssUnstructuredSceneVisualizerMode.disabled: 299 return 300 render = False 301 302 if cam_frame and self._current_rss_surface and self._current_rss_surface[0] == cam_frame: 303 render = True 304 305 if rss_frame and self.current_camera_surface and self.current_camera_surface[0] == rss_frame: 306 render = True 307 308 if render: 309 surface = self.current_camera_surface[1] 310 surface.blit(self._current_rss_surface[1], (0, 0)) # pyright: ignore[reportOptionalSubscript] 311 rect = pygame.Rect((0, 0), (2, surface.get_height())) 312 pygame.draw.rect(surface, Color.black, rect, 0) 313 rect = pygame.Rect((0, 0), (surface.get_width(), 2)) 314 pygame.draw.rect(surface, Color.black, rect, 0) 315 rect = pygame.Rect((0, surface.get_height() - 2), (surface.get_width(), surface.get_height())) 316 pygame.draw.rect(surface, Color.black, rect, 0) 317 rect = pygame.Rect((surface.get_width() - 2, 0), (surface.get_width(), surface.get_width())) 318 pygame.draw.rect(surface, Color.black, rect, 0) 319 self._surface = surface
320
[docs] 321 def toggle_camera(self): 322 print("Toggle RssUnstructuredSceneVisualizer") 323 if self._mode == RssUnstructuredSceneVisualizerMode.window: 324 self.restart(RssUnstructuredSceneVisualizerMode.fullscreen) 325 elif self._mode == RssUnstructuredSceneVisualizerMode.fullscreen: 326 self.restart(RssUnstructuredSceneVisualizerMode.disabled) 327 elif self._mode == RssUnstructuredSceneVisualizerMode.disabled: 328 self.restart(RssUnstructuredSceneVisualizerMode.window)
329 330 @staticmethod 331 def _parse_image(weak_self: "weakref.ReferenceType[RssUnstructuredSceneVisualizer]", image: carla.Image): 332 self = weak_self() 333 if not self: 334 return 335 image.convert(carla.ColorConverter.Raw) 336 array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) 337 array = np.reshape(array, (image.height, image.width, 4)) 338 array = array[:, :, :3] 339 array = array[:, :, ::-1] 340 surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) 341 self.current_camera_surface = (image.frame, surface) 342 self.update_surface(image.frame, None) 343
[docs] 344 @staticmethod 345 def rotate_around_point(xy, radians, origin): 346 """Rotate a point around a given point.""" 347 x, y = xy 348 offset_x, offset_y = origin 349 adjusted_x = x - offset_x 350 adjusted_y = y - offset_y 351 cos_rad = math.cos(radians) 352 sin_rad = math.sin(radians) 353 qx = offset_x + cos_rad * adjusted_x - sin_rad * adjusted_y 354 qy = offset_y + sin_rad * adjusted_x + cos_rad * adjusted_y 355 356 return qx, qy
357
[docs] 358 def tick(self, frame, rss_response, allowed_heading_ranges): 359 if not self._camera: 360 return 361 surface = pygame.Surface(self._dim) 362 surface.set_colorkey(pygame.Color("black")) 363 surface.set_alpha(180) 364 try: 365 lines = RssUnstructuredSceneVisualizer.get_trajectory_sets( 366 rss_response.rss_state_snapshot, self._camera.get_transform(), self._calibration 367 ) 368 369 polygons = [ 370 ( 371 RssUnstructuredSceneVisualizer.transform_points( 372 RssUnstructuredSceneVisualizer._get_points_from_pairs( 373 RssUnstructuredSceneVisualizer.draw_heading_range( 374 heading_range, rss_response.ego_dynamics_on_route 375 ) 376 ), 377 self._camera.get_transform(), 378 self._calibration, 379 ), 380 Color.blue, 381 ) 382 for heading_range in allowed_heading_ranges 383 ] 384 385 RssUnstructuredSceneVisualizer.draw_lines(surface, lines) 386 RssUnstructuredSceneVisualizer.draw_polygons(surface, polygons) 387 except RuntimeError as e: 388 print(f"ERROR {e}") 389 self._current_rss_surface = (frame, surface) 390 self.update_surface(None, frame)
391
[docs] 392 def render(self, display): 393 if self._surface: 394 display.blit(self._surface, (display.get_width() - self._dim[0], 0))
395
[docs] 396 @staticmethod 397 def draw_heading_range(heading_range, ego_dynamics_on_route): 398 line = [(float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y))] 399 length = 3.0 400 current_angle = float(heading_range.begin) 401 max_angle = float(heading_range.end) 402 if heading_range.end < heading_range.begin: 403 max_angle += 2.0 * np.pi 404 405 while current_angle < max_angle: 406 line.append(( 407 float(ego_dynamics_on_route.ego_center.x) + length * np.cos(current_angle), 408 float(ego_dynamics_on_route.ego_center.y) + length * np.sin(current_angle), 409 )) 410 current_angle += 0.2 411 412 if current_angle != max_angle: 413 line.append(( 414 float(ego_dynamics_on_route.ego_center.x) + length * np.cos(max_angle), 415 float(ego_dynamics_on_route.ego_center.y) + length * np.sin(max_angle), 416 )) 417 418 line.append((float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y))) 419 return line
420
[docs] 421 @staticmethod 422 def get_trajectory_sets(rss_state_snapshot: "ad.rss.state.RssStateSnapshot", camera_transform, calibration): 423 """ 424 Creates 3D bounding boxes based on carla vehicle list and camera. 425 """ 426 trajectory_sets = [ 427 # ego 428 ( 429 RssUnstructuredSceneVisualizer.transform_points( 430 RssUnstructuredSceneVisualizer._get_trajectory_set_points( 431 rss_state_snapshot.unstructuredSceneEgoInformation.brakeTrajectorySet 432 ), 433 camera_transform, 434 calibration, 435 ), 436 Color.red, 437 ), 438 ( 439 RssUnstructuredSceneVisualizer.transform_points( 440 RssUnstructuredSceneVisualizer._get_trajectory_set_points( 441 rss_state_snapshot.unstructuredSceneEgoInformation.continueForwardTrajectorySet 442 ), 443 camera_transform, 444 calibration, 445 ), 446 Color.green, 447 ), 448 ] 449 450 # others 451 for state in rss_state_snapshot.individualResponses: 452 if state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet: 453 trajectory_sets.append(( 454 RssUnstructuredSceneVisualizer.transform_points( 455 RssUnstructuredSceneVisualizer._get_trajectory_set_points( 456 state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet 457 ), 458 camera_transform, 459 calibration, 460 ), 461 Color.red, 462 )) 463 if state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet: 464 trajectory_sets.append(( 465 RssUnstructuredSceneVisualizer.transform_points( 466 RssUnstructuredSceneVisualizer._get_trajectory_set_points( 467 state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet 468 ), 469 camera_transform, 470 calibration, 471 ), 472 Color.green, 473 )) 474 475 return trajectory_sets
476
[docs] 477 @staticmethod 478 def draw_lines(surface, lines): 479 """ 480 Draws lines on pygame display. 481 """ 482 for line, color in lines: 483 if len(line) > 1: 484 pygame.draw.lines(surface, color, True, line, 2)
485
[docs] 486 @staticmethod 487 def draw_polygons(surface, polygons): 488 """ 489 Draws polygons on pygame display. 490 """ 491 for polygon, color in polygons: 492 if len(polygon) > 1: 493 pygame.draw.polygon(surface, color, polygon)
494
[docs] 495 @staticmethod 496 def transform_points(world_cords, camera_transform, calibration): 497 """ 498 Returns trajectory set projected to camera view 499 """ 500 if world_cords.size == 0: 501 return [] 502 world_cords = np.transpose(world_cords) 503 cords_x_y_z = RssUnstructuredSceneVisualizer._world_to_sensor(world_cords, camera_transform)[:3, :] 504 cords_y_minus_z_x = np.stack([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]]) 505 ts = np.transpose(np.dot(calibration, cords_y_minus_z_x)) 506 camera_ts = np.stack([ts[:, 0] / ts[:, 2], ts[:, 1] / ts[:, 2], ts[:, 2]], axis=1) 507 return [(int(point[0]), int(point[1])) for point in camera_ts] # line_to_draw
508 509 @staticmethod 510 def _get_trajectory_set_points(trajectory_set): 511 """ """ 512 cords = np.zeros((len(trajectory_set), 4)) 513 for i, pt in enumerate(trajectory_set): 514 cords[i, :] = [pt.x, -pt.y, 0, 1] 515 return cords 516 517 @staticmethod 518 def _get_points_from_pairs(trajectory_set): 519 """ """ 520 cords = np.zeros((len(trajectory_set), 4)) 521 for i, pt in enumerate(trajectory_set): 522 cords[i, :] = [pt[0], -pt[1], 0, 1] 523 return cords 524 525 @staticmethod 526 def _world_to_sensor(cords, camera_transform): 527 """ 528 Transforms world coordinates to sensor. 529 """ 530 world_sensor_matrix = camera_transform.get_inverse_matrix() 531 sensor_cords = np.dot(world_sensor_matrix, cords) 532 return sensor_cords
533 534 535# ============================================================================== 536# -- RssBoundingBoxVisualizer ------------------------------------------------------ 537# ============================================================================== 538 539 540class RssBoundingBoxVisualizer(CanBeDummy): 541 def __init__(self, display_dimensions, world, camera: carla.Sensor): 542 self._last_camera_frame = 0 543 self._surface_for_frame = [] 544 self._world = world 545 self._dim = display_dimensions 546 self._calibration = np.identity(3) 547 self._calibration[0, 2] = self._dim[0] / 2.0 548 self._calibration[1, 2] = self._dim[1] / 2.0 549 self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / ( 550 2.0 * np.tan(90.0 * np.pi / 360.0) 551 ) # fov default: 90.0 552 self._camera = camera 553 554 def tick(self, frame, individual_rss_states): 555 if len(self._surface_for_frame) > 0: 556 try: 557 while self._surface_for_frame[0][0] < self._last_camera_frame: 558 self._surface_for_frame.pop(0) 559 except IndexError: 560 return 561 562 # only render on new frame 563 if len(self._surface_for_frame) > 0 and self._surface_for_frame[0][0] == frame: 564 return 565 566 surface = pygame.Surface(self._dim) 567 surface.set_colorkey(pygame.Color("black")) 568 surface.set_alpha(80) 569 try: 570 bounding_boxes = RssBoundingBoxVisualizer.get_bounding_boxes( 571 individual_rss_states, self._camera.get_transform(), self._calibration, self._world 572 ) 573 RssBoundingBoxVisualizer.draw_bounding_boxes(surface, bounding_boxes) 574 self._surface_for_frame.append((frame, surface, len(bounding_boxes))) 575 except RuntimeError: 576 pass 577 578 def render(self, display, current_camera_frame): 579 rendered = False 580 boxes_to_render = 0 581 for frame, surface, box_count in self._surface_for_frame: 582 if frame == current_camera_frame: 583 display.blit(surface, (0, 0)) 584 boxes_to_render = box_count 585 rendered = True 586 break 587 if not rendered and boxes_to_render > 0: 588 print(f"Warning: {boxes_to_render} bounding boxes were not drawn.") 589 self._last_camera_frame = current_camera_frame 590 591 @staticmethod 592 def get_bounding_boxes(individual_rss_states, camera_transform, calibration, world: carla.World): 593 """ 594 Creates 3D bounding boxes based on carla vehicle list and camera. 595 """ 596 bounding_boxes = [] 597 for state in individual_rss_states: 598 if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant and state.is_dangerous: 599 other_actor = state.get_actor(world) 600 if other_actor: 601 bounding_boxes.append( 602 RssBoundingBoxVisualizer.get_bounding_box(other_actor, camera_transform, calibration) 603 ) 604 # filter objects behind camera 605 bounding_boxes = [bb for bb in bounding_boxes if all(bb[:, 2] > 0)] 606 return bounding_boxes 607 608 @staticmethod 609 def draw_bounding_boxes( 610 surface: pygame.Surface, bounding_boxes: list[np.ndarray], color: pygame.Color = pygame.Color("red") # noqa: B008 611 ) -> None: # noqa: B008 612 """ 613 Draws bounding boxes on pygame display. 614 """ 615 for bbox in bounding_boxes: 616 points = [(int(bbox[i, 0]), int(bbox[i, 1])) for i in range(8)] 617 # draw lines 618 # base 619 polygon = [points[0], points[1], points[2], points[3]] 620 pygame.draw.polygon(surface, color, polygon) 621 # top 622 polygon = [points[4], points[5], points[6], points[7]] 623 pygame.draw.polygon(surface, color, polygon) 624 # base-top 625 polygon = [points[0], points[1], points[5], points[4]] 626 pygame.draw.polygon(surface, color, polygon) 627 polygon = [points[1], points[2], points[6], points[5]] 628 pygame.draw.polygon(surface, color, polygon) 629 polygon = [points[2], points[6], points[7], points[3]] 630 pygame.draw.polygon(surface, color, polygon) 631 polygon = [points[0], points[4], points[7], points[3]] 632 pygame.draw.polygon(surface, color, polygon) 633 634 @staticmethod 635 def get_bounding_box(vehicle, camera_transform, calibration): 636 """ 637 Returns 3D bounding box for a vehicle based on camera view. 638 """ 639 640 bb_cords = RssBoundingBoxVisualizer._create_bb_points(vehicle) 641 cords_x_y_z = RssBoundingBoxVisualizer._vehicle_to_sensor(bb_cords, vehicle, camera_transform)[:3, :] 642 cords_y_minus_z_x = np.stack([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]]) 643 bbox = np.transpose(np.dot(calibration, cords_y_minus_z_x)) 644 camera_bbox = np.stack([bbox[:, 0] / bbox[:, 2], bbox[:, 1] / bbox[:, 2], bbox[:, 2]], axis=1) 645 return camera_bbox 646 647 @staticmethod 648 def _create_bb_points(vehicle: carla.Actor): 649 """ 650 Returns 3D bounding box for a vehicle. 651 """ 652 653 cords = np.empty((8, 4)) 654 extent = vehicle.bounding_box.extent 655 x = extent.x 656 y = extent.y 657 z = extent.z 658 cords[0] = [x, y, -z, 1.0] 659 cords[1] = [-x, y, -z, 1.0] 660 cords[2] = [-x, -y, -z, 1.0] 661 cords[3] = [x, -y, -z, 1.0] 662 cords[4] = [x, y, z, 1.0] 663 cords[5] = [-x, y, z, 1.0] 664 cords[6] = [-x, -y, z, 1.0] 665 cords[7] = [x, -y, z, 1.0] 666 return cords 667 668 @staticmethod 669 def _vehicle_to_sensor(cords, vehicle, camera_transform): 670 """ 671 Transforms coordinates of a vehicle bounding box to sensor. 672 """ 673 674 world_cord = RssBoundingBoxVisualizer._vehicle_to_world(cords, vehicle) 675 # Sensor coordinates 676 return RssBoundingBoxVisualizer._world_to_sensor(world_cord, camera_transform) 677 678 @staticmethod 679 def _vehicle_to_world(cords, vehicle: carla.Actor): 680 """ 681 Transforms coordinates of a vehicle bounding box to world. 682 """ 683 684 bb_vehicle_matrix = carla.Transform(vehicle.bounding_box.location).get_matrix() 685 vehicle_world_matrix = vehicle.get_transform().get_matrix() 686 bb_world_matrix = np.dot(vehicle_world_matrix, bb_vehicle_matrix) 687 # World coordinates 688 return np.dot(bb_world_matrix, np.transpose(cords)) 689 690 @staticmethod 691 def _world_to_sensor(cords, camera_transform): 692 """ 693 Transforms world coordinates to sensor. 694 """ 695 696 world_sensor_matrix = camera_transform.get_inverse_matrix() 697 # Sensor coordinates 698 return np.dot(world_sensor_matrix, cords) 699 700 701# ============================================================================== 702# -- RssDebugVisualizer ------------------------------------------------------------ 703# ============================================================================== 704 705
[docs] 706class RssDebugVisualizationMode(Enum): 707 Off = 1 708 RouteOnly = 2 709 VehicleStateOnly = 3 710 VehicleStateAndRoute = 4 711 All = 5
712 713
[docs] 714class RssDebugVisualizer:
[docs] 715 def __init__( 716 self, 717 player: carla.Vehicle, 718 world: carla.World, 719 visualization_mode: Union[RssDebugVisualizationMode, str, int] = RssDebugVisualizationMode.Off, 720 ): 721 self._world = world 722 self._player = player 723 if isinstance(visualization_mode, str): 724 visualization_mode = RssDebugVisualizationMode[visualization_mode] 725 elif isinstance(visualization_mode, int): 726 visualization_mode = RssDebugVisualizationMode(visualization_mode) 727 self._visualization_mode: RssDebugVisualizationMode = visualization_mode
728
[docs] 729 def toggleMode(self): 730 if self._visualization_mode == RssDebugVisualizationMode.All: 731 self._visualization_mode = RssDebugVisualizationMode.Off 732 elif self._visualization_mode == RssDebugVisualizationMode.Off: 733 self._visualization_mode = RssDebugVisualizationMode.RouteOnly 734 elif self._visualization_mode == RssDebugVisualizationMode.RouteOnly: 735 self._visualization_mode = RssDebugVisualizationMode.VehicleStateOnly 736 elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateOnly: 737 self._visualization_mode = RssDebugVisualizationMode.VehicleStateAndRoute 738 elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute: 739 self._visualization_mode = RssDebugVisualizationMode.All 740 print(f"New Debug Visualizer Mode {self._visualization_mode}")
741
[docs] 742 def tick(self, route, dangerous, individual_rss_states, ego_dynamics_on_route): 743 if self._visualization_mode in { 744 RssDebugVisualizationMode.RouteOnly, 745 RssDebugVisualizationMode.VehicleStateAndRoute, 746 RssDebugVisualizationMode.All, 747 }: 748 self.visualize_route(dangerous, route) 749 750 if self._visualization_mode in { 751 RssDebugVisualizationMode.VehicleStateOnly, 752 RssDebugVisualizationMode.VehicleStateAndRoute, 753 RssDebugVisualizationMode.All, 754 }: 755 self.visualize_rss_results(individual_rss_states) 756 757 if self._visualization_mode == RssDebugVisualizationMode.All: 758 self.visualize_ego_dynamics(ego_dynamics_on_route)
759
[docs] 760 def visualize_route(self, dangerous: bool, route: Optional[ad.map.route.FullRoute]): 761 if not route: 762 return 763 right_lane_edges = {} 764 left_lane_edges = {} 765 766 for road_segment in route.roadSegments: 767 right_most_lane = road_segment.drivableLaneSegments[0] 768 if right_most_lane.laneInterval.laneId not in right_lane_edges: 769 edge = ad.map.route.getRightProjectedENUEdge(right_most_lane.laneInterval) 770 right_lane_edges[right_most_lane.laneInterval.laneId] = edge 771 intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection( 772 right_most_lane.laneInterval.laneId 773 ) 774 775 color = carla.Color(r=(128 if dangerous else 255)) 776 if intersection_lane: 777 color.b = 128 if dangerous else 255 778 779 self.visualize_enu_edge(edge, color, self._player.get_location().z) 780 781 left_most_lane = road_segment.drivableLaneSegments[-1] 782 if left_most_lane.laneInterval.laneId not in left_lane_edges: 783 edge = ad.map.route.getLeftProjectedENUEdge(left_most_lane.laneInterval) 784 left_lane_edges[left_most_lane.laneInterval.laneId] = edge 785 intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection( 786 left_most_lane.laneInterval.laneId 787 ) 788 color = carla.Color(g=(128 if dangerous else 255)) 789 if intersection_lane: 790 color.b = 128 if dangerous else 255 791 792 self.visualize_enu_edge(edge, color, self._player.get_location().z)
793
[docs] 794 def visualize_enu_edge(self, edge, color, z_offset): 795 for point in edge: 796 carla_point = carla.Location(x=float(point.x), y=-1.0 * float(point.y), z=float(point.z) + z_offset) 797 self._world.debug.draw_point(carla_point, 0.1, color, 0.1, False)
798
[docs] 799 def visualize_rss_results(self, state_snapshot: Iterable[RssStateInfo]): 800 for state in state_snapshot: 801 other_actor = state.get_actor(self._world) 802 if not other_actor: 803 # print("Actor not found. Skip visualizing state {}".format(state)) 804 continue 805 ego_point = self._player.get_location() 806 ego_point.z += 0.05 807 yaw = self._player.get_transform().rotation.yaw 808 cosine = math.cos(math.radians(yaw)) 809 sine = math.sin(math.radians(yaw)) 810 line_offset = carla.Location(-sine * 0.1, cosine * 0.1, 0.0) 811 812 point = other_actor.get_location() 813 point.z += 0.05 814 indicator_color = Color.carla_green 815 dangerous = ad.rss.state.isDangerous(state.rss_state) 816 if dangerous: 817 indicator_color = Color.carla_red 818 elif state.rss_state.situationType == ad.rss.situation.SituationType.NotRelevant: 819 indicator_color = Color.carla_gray 820 821 if self._visualization_mode == RssDebugVisualizationMode.All: 822 # the connection lines are only visualized if All is requested 823 lon_color = carla.Color(indicator_color.r, indicator_color.g, indicator_color.b) 824 lat_l_color = carla.Color(indicator_color.r, indicator_color.g, indicator_color.b) 825 lat_r_color = carla.Color(indicator_color.r, indicator_color.g, indicator_color.b) 826 if not state.rss_state.longitudinalState.isSafe: 827 lon_color.r = 255 828 lon_color.g = 0 if dangerous else 255 829 if not state.rss_state.lateralStateLeft.isSafe: 830 lat_l_color.r = 255 831 lat_l_color.g = 0 if dangerous else 255 832 if not state.rss_state.lateralStateRight.isSafe: 833 lat_r_color.r = 255 834 lat_r_color.g = 0 if dangerous else 255 835 self._world.debug.draw_line(ego_point, point, 0.1, lon_color, 0.066, False) 836 self._world.debug.draw_line( 837 ego_point - line_offset, point - line_offset, 0.1, lat_l_color, 0.066, False 838 ) 839 self._world.debug.draw_line( 840 ego_point + line_offset, point + line_offset, 0.1, lat_r_color, 0.066, False 841 ) 842 point.z += 3.0 843 self._world.debug.draw_point(point, 0.2, indicator_color, 0.066, False)
844
[docs] 845 def visualize_ego_dynamics(self, ego_dynamics_on_route): 846 sin_heading = math.sin(float(ego_dynamics_on_route.route_heading)) 847 cos_heading = math.cos(float(ego_dynamics_on_route.route_heading)) 848 849 heading_location_start = self._player.get_location() 850 heading_location_start.x -= cos_heading * 10.0 851 heading_location_start.y += sin_heading * 10.0 852 heading_location_start.z += 0.5 853 heading_location_end = self._player.get_location() 854 heading_location_end.x += cos_heading * 10.0 855 heading_location_end.y -= sin_heading * 10.0 856 heading_location_end.z += 0.5 857 858 self._world.debug.draw_arrow( 859 heading_location_start, heading_location_end, 0.1, 0.1, Color.carla_blue, 0.066, False 860 ) 861 862 sin_center = math.sin(float(ego_dynamics_on_route.route_heading) + math.pi / 2.0) 863 cos_center = math.cos(float(ego_dynamics_on_route.route_heading) + math.pi / 2.0) 864 center_location_start = self._player.get_location() 865 center_location_start.x -= cos_center * 2.0 866 center_location_start.y += sin_center * 2.0 867 center_location_start.z += 0.5 868 center_location_end = self._player.get_location() 869 center_location_end.x += cos_center * 2.0 870 center_location_end.y -= sin_center * 2.0 871 center_location_end.z += 0.5 872 873 self._world.debug.draw_line(center_location_start, center_location_end, 0.1, Color.carla_blue, 0.066, False)