Source code for classes.sensors.rss_sensor

  1# Copyright (c) 2020 Intel Corporation
  2#
  3# Based on CARLA example : https://github.com/carla-simulator/carla/blob/master/PythonAPI/examples/rss/rss_visualization.py
  4
  5
  6# pyrightX: reportUnknownMemberType=none
  7# pyrightX: reportUnknownArgumentType=information
  8# pyrightX: reportUnknownParameterType=information
  9# pyrightX: reportTypeCommentUsage=none
 10# pyrightX: reportAttributeAccessIssue=information
 11
 12import inspect
 13import math
 14from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 15from typing import cast as assure_type
 16
 17import carla
 18
 19from agents.tools.logs import logger
 20from classes.sensors import CustomSensorInterface
 21from classes.constants import AD_RSS_AVAILABLE, RssLogLevel, RssLogLevelAlias
 22from .rss_visualization import (
 23    RssDebugVisualizationMode,
 24    RssDebugVisualizer,
 25    RssUnstructuredSceneVisualizer,
 26)
 27from launch_tools import CarlaDataProvider
 28
 29if AD_RSS_AVAILABLE:
 30    from carla import ad
 31
 32    RssStateEvaluator = ad.rss.state.RssStateEvaluator
 33    EVALUATOR_NONE_STATE = RssStateEvaluator.names["None"]
 34
 35if TYPE_CHECKING:
 36    assert ad  # remove Unbound type # type: ignore
 37    from classes.sensors.rss_visualization import RssBoundingBoxVisualizer, RssStateVisualizer
 38
 39
 40# ==============================================================================
 41# -- RssSensor -----------------------------------------------------------------
 42# ==============================================================================
 43
 44
[docs] 45class RssStateInfo:
[docs] 46 def __init__(self, rss_state, ego_dynamics_on_route, world_model): 47 # type: (ad.rss.state.RssState, carla.RssEgoDynamicsOnRoute, ad.rss.world.WorldModel) -> None 48 self.rss_state = rss_state 49 self.distance = -1 50 self.is_dangerous = ad.rss.state.isDangerous(rss_state) 51 if rss_state.situationType == ad.rss.situation.SituationType.Unstructured: 52 self.actor_calculation_mode = ad.rss.map.RssMode.Unstructured 53 else: 54 self.actor_calculation_mode = ad.rss.map.RssMode.Structured 55 56 # calculate distance to other vehicle 57 object_state = None 58 for scene in world_model.scenes: 59 if scene.object.objectId == rss_state.objectId: 60 object_state = scene.object.state 61 break 62 63 if object_state: 64 self.distance = math.sqrt( 65 (float(ego_dynamics_on_route.ego_center.x) - float(object_state.centerPoint.x)) ** 2 66 + (float(ego_dynamics_on_route.ego_center.y) - float(object_state.centerPoint.y)) ** 2 67 ) 68 69 self.longitudinal_margin = float( 70 rss_state.longitudinalState.rssStateInformation.currentDistance 71 - rss_state.longitudinalState.rssStateInformation.safeDistance 72 ) 73 self.margin = max(0, self.longitudinal_margin) 74 self.lateral_margin = None 75 if rss_state.lateralStateLeft.rssStateInformation.evaluator != EVALUATOR_NONE_STATE: 76 self.lateral_margin = ( 77 rss_state.lateralStateLeft.rssStateInformation.currentDistance 78 - rss_state.lateralStateLeft.rssStateInformation.safeDistance 79 ) 80 if rss_state.lateralStateRight.rssStateInformation.evaluator != EVALUATOR_NONE_STATE: 81 lateral_margin_right = ( 82 rss_state.lateralStateRight.rssStateInformation.currentDistance 83 - rss_state.lateralStateRight.rssStateInformation.safeDistance 84 ) 85 if self.lateral_margin is None or self.lateral_margin > lateral_margin_right: 86 self.lateral_margin = lateral_margin_right 87 if self.lateral_margin is not None and self.lateral_margin > 0: 88 self.margin += float(self.lateral_margin)
89
[docs] 90 def get_actor(self, world): 91 # type: (carla.World) -> carla.Actor | None 92 if self.rss_state.objectId == 18446744073709551614: 93 return None # "Border Left" 94 if self.rss_state.objectId == 18446744073709551615: 95 return None # "Border Right" 96 return world.get_actor(self.rss_state.objectId)
97 98 def __str__(self): 99 return "RssStateInfo: object=" + str(self.rss_state.objectId) + " dangerous=" + str(self.is_dangerous)
100 101
[docs] 102class RssSensor(CustomSensorInterface):
[docs] 103 def __init__( 104 self, 105 parent_actor: carla.Vehicle, 106 unstructured_scene_visualizer: "RssUnstructuredSceneVisualizer", 107 bounding_box_visualizer: "RssBoundingBoxVisualizer", 108 state_visualizer: "RssStateVisualizer", 109 *, 110 visualizer_mode: RssDebugVisualizationMode = RssDebugVisualizationMode.Off, 111 routing_targets: Optional[Iterable[carla.Transform]] = None, 112 log_level: RssLogLevelAlias = RssLogLevel.off, 113 ): 114 world = CarlaDataProvider.get_world() 115 assert world 116 self.unstructured_scene_visualizer = unstructured_scene_visualizer 117 self.bounding_box_visualizer = bounding_box_visualizer 118 self._parent: carla.Vehicle = parent_actor 119 self.timestamp = None 120 self.response_valid = False 121 self.proper_response = None 122 self.rss_state_snapshot = None 123 self.situation_snapshot = None 124 self.world_model = None 125 self.individual_rss_states = [] 126 self._allowed_heading_ranges: Iterable[ad.rss.state.HeadingRange] = [] 127 self.ego_dynamics_on_route: Optional[carla.RssEgoDynamicsOnRoute] = None # set on response 128 self.current_vehicle_parameters = self.get_default_parameters() 129 self.route = None 130 self.debug_visualizer = RssDebugVisualizer(parent_actor, world, visualizer_mode) 131 self.state_visualizer = state_visualizer 132 self.change_to_unstructured_position_map = {} 133 134 # get max steering angle 135 physics_control = parent_actor.get_physics_control() 136 self._max_steer_angle = 0.0 137 for wheel in physics_control.wheels: 138 self._max_steer_angle = max(self._max_steer_angle, wheel.max_steer_angle) 139 self._max_steer_angle = math.radians(self._max_steer_angle) 140 141 bp = CarlaDataProvider._blueprint_library.find("sensor.other.rss") 142 self.sensor: carla.RssSensor = assure_type( 143 "carla.RssSensor", world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) 144 ) 145 # We need to pass the lambda a weak reference to self to avoid circular 146 # reference. 147 148 def check_rss_class(clazz): 149 return inspect.isclass(clazz) and "RssSensor" in clazz.__name__ 150 151 if not inspect.getmembers(carla, check_rss_class): 152 raise RuntimeError('CARLA PythonAPI not compiled in RSS variant, please "make PythonAPI.rss"') 153 154 self.log_level = log_level 155 self.map_log_level = log_level 156 157 self.set_default_parameters() 158 159 self.sensor.register_actor_constellation_callback(self._on_actor_constellation_request) 160 161 self.sensor.listen(self._on_rss_response) 162 assert isinstance(self.log_level, carla.RssLogLevel) 163 assert isinstance(self.map_log_level, carla.RssLogLevel) 164 logger.info(f"Setting log level to {log_level}") 165 self.sensor.set_log_level(self.log_level) 166 self.sensor.set_map_log_level(self.map_log_level) 167 168 # only relevant if actor constellation callback is not registered 169 # self.sensor.ego_vehicle_dynamics = self.current_vehicle_parameters 170 171 self.sensor.road_boundaries_mode = carla.RssRoadBoundariesMode.Off 172 173 self.sensor.reset_routing_targets() 174 if routing_targets: 175 for target in routing_targets: 176 self.sensor.append_routing_target(target)
177 178 def _on_actor_constellation_request(self, actor_constellation_data: "carla.RssActorConstellationData"): 179 # print("_on_actor_constellation_request: ", str(actor_constellation_data)) 180 181 actor_constellation_result = carla.RssActorConstellationResult() 182 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.NotRelevant 183 actor_constellation_result.restrict_speed_limit_mode = ( 184 ad.rss.map.RssSceneCreation.RestrictSpeedLimitMode.IncreasedSpeedLimit10 185 ) 186 actor_constellation_result.ego_vehicle_dynamics = self.current_vehicle_parameters 187 actor_constellation_result.actor_object_type = ad.rss.world.ObjectType.Invalid 188 actor_constellation_result.actor_dynamics = self.current_vehicle_parameters 189 190 actor_id = -1 191 # actor_type_id = "none" 192 if actor_constellation_data.other_actor is not None: 193 actor_id = actor_constellation_data.other_actor.id 194 # actor_type_id = actor_constellation_data.other_actor.type_id 195 196 ego_on_the_sidewalk = False 197 ego_on_routeable_road = False 198 for occupied_region in actor_constellation_data.ego_match_object.mapMatchedBoundingBox.laneOccupiedRegions: 199 lane = ad.map.lane.getLane(occupied_region.laneId) 200 if lane.type == ad.map.lane.LaneType.PEDESTRIAN: 201 # if not ego_on_the_sidewalk: 202 # print ( "ego-{} on lane of lane type {} => sidewalk".format(actor_id, lane.type)) 203 ego_on_the_sidewalk = True 204 elif ad.map.lane.isRouteable(lane): 205 # if not ego_on_routeable_road: 206 # print ( "ego-{} on lane of lane type {} => road".format(actor_id, lane.type)) 207 ego_on_routeable_road = True 208 209 if "walker.pedestrian" in actor_constellation_data.other_actor.type_id: 210 # determine if the pedestrian is walking on the sidewalk or on the road 211 pedestrian_on_the_road = False 212 pedestrian_on_the_sidewalk = False 213 for ( 214 occupied_region 215 ) in actor_constellation_data.other_match_object.mapMatchedBoundingBox.laneOccupiedRegions: 216 lane = ad.map.lane.getLane(occupied_region.laneId) 217 if lane.type == ad.map.lane.LaneType.PEDESTRIAN: 218 # if not pedestrian_on_the_sidewalk: 219 # print ( "pedestrian-{} on lane of lane type {} => sidewalk".format(actor_id, lane.type)) 220 pedestrian_on_the_sidewalk = True 221 else: 222 # if not pedestrian_on_the_road: 223 # print ( "pedestrian-{} on lane of lane type {} => road".format(actor_id, lane.type)) 224 pedestrian_on_the_road = True 225 if ( 226 ego_on_routeable_road 227 and not ego_on_the_sidewalk 228 and not pedestrian_on_the_road 229 and pedestrian_on_the_sidewalk 230 ): 231 # pedestrian is not on the road, but on the sidewalk: then common sense is that vehicle has priority 232 # This analysis can and should be done more detailed, but this is a basic starting point for the decision 233 # In addition, the road network has to be correct to work best 234 # (currently there are no sidewalks in intersection areas) 235 # print ( "pedestrian-{} Off".format(actor_id)) 236 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.NotRelevant 237 else: 238 # print ( "pedestrian-{} Unstructured".format(actor_id)) 239 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Unstructured 240 actor_constellation_result.actor_object_type = ad.rss.world.ObjectType.Pedestrian 241 actor_constellation_result.actor_dynamics = self.get_pedestrian_parameters() 242 elif "vehicle" in actor_constellation_data.other_actor.type_id: 243 actor_constellation_result.actor_object_type = ad.rss.world.ObjectType.OtherVehicle 244 245 # set the response time of others vehicles to 2 seconds; the rest stays the same 246 actor_constellation_result.actor_dynamics.responseTime = 2.0 247 248 # per default, if ego is not on the road -> unstructured 249 if ego_on_routeable_road: 250 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Structured 251 else: 252 # print("vehicle-{} unstructured: reason other ego not on routeable road".format(actor_id)) 253 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Unstructured 254 255 # special handling for vehicles standing still 256 actor_vel = actor_constellation_data.other_actor.get_velocity() 257 actor_speed = math.sqrt(actor_vel.x**2 + actor_vel.y**2 + actor_vel.z**2) 258 if actor_speed < 0.01: 259 # reduce response time 260 actor_constellation_result.actor_dynamics.responseTime = 1.0 261 # still in structured? 262 if actor_constellation_result.rss_calculation_mode == ad.rss.map.RssMode.Structured: 263 actor_distance = math.sqrt( 264 float( 265 actor_constellation_data.ego_match_object.enuPosition.centerPoint.x 266 - actor_constellation_data.other_match_object.enuPosition.centerPoint.x 267 ) 268 ** 2 269 + float( 270 actor_constellation_data.ego_match_object.enuPosition.centerPoint.y 271 - actor_constellation_data.other_match_object.enuPosition.centerPoint.y 272 ) 273 ** 2 274 ) 275 # print("vehicle-{} unstructured check: other distance {}".format(actor_id, actor_distance)) 276 277 if actor_constellation_data.ego_dynamics_on_route.ego_speed < 0.01: 278 # both vehicles stand still, so we have to analyze in detail if we possibly want to use 279 # unstructured mode to cope with blockades on the road... 280 281 if actor_distance < 10: 282 # the other has to be near enough to trigger a switch to unstructured 283 other_outside_routeable_road = False 284 for occupied_region in actor_constellation_data.other_match_object.mapMatchedBoundingBox.laneOccupiedRegions: 285 lane = ad.map.lane.getLane(occupied_region.laneId) 286 if not ad.map.lane.isRouteable(lane): 287 other_outside_routeable_road = True 288 289 if other_outside_routeable_road: 290 # if the other is somewhat outside the standard routeable road (e.g. parked at the side, ...) 291 # we immediately decide for unstructured 292 # print("vehicle-{} unstructured: reason other outside routeable 293 # road".format(actor_id)) 294 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Unstructured 295 else: 296 # otherwise we have to look in the orientation delta in addition to get some basic idea of the 297 # constellation (we don't want to go into unstructured if we both waiting 298 # behind a red light...) 299 heading_delta = abs( 300 float( 301 actor_constellation_data.ego_match_object.enuPosition.heading 302 - actor_constellation_data.other_match_object.enuPosition.heading 303 ) 304 ) 305 if heading_delta > 0.2: # around 11 degree 306 # print("vehicle-{} unstructured: reason heading delta 307 # {}".format(actor_id, heading_delta)) 308 actor_constellation_result.rss_calculation_mode = ( 309 ad.rss.map.RssMode.Unstructured 310 ) 311 self.change_to_unstructured_position_map[actor_id] = ( 312 actor_constellation_data.other_match_object.enuPosition 313 ) 314 else: 315 # ego moves 316 if actor_distance < 10: 317 # if the ego moves, the other actor doesn't move an the mode was 318 # previously set to unstructured, keep it 319 try: 320 if ( 321 self.change_to_unstructured_position_map[actor_id] 322 == actor_constellation_data.other_match_object.enuPosition 323 ): 324 heading_delta = abs( 325 float( 326 actor_constellation_data.ego_match_object.enuPosition.heading 327 - actor_constellation_data.other_match_object.enuPosition.heading 328 ) 329 ) 330 if heading_delta > 0.2: 331 actor_constellation_result.rss_calculation_mode = ( 332 ad.rss.map.RssMode.Unstructured 333 ) 334 else: 335 del self.change_to_unstructured_position_map[actor_id] 336 except (AttributeError, KeyError): 337 pass 338 else: 339 if actor_id in self.change_to_unstructured_position_map: 340 del self.change_to_unstructured_position_map[actor_id] 341 342 # still in structured? 343 if actor_constellation_result.rss_calculation_mode == ad.rss.map.RssMode.Structured: 344 # in structured case we have to cope with not yet implemented lateral intersection checks in core RSS implementation 345 # if the other is standing still, we don't assume that he will accelerate 346 # otherwise if standing at the intersection the acceleration within reaction time 347 # will allow to enter the intersection which current RSS implementation will immediately consider 348 # as dangerous 349 # print("_on_actor_constellation_result({}) setting accelMax to 350 # zero".format(actor_constellation_data.other_actor.id)) 351 actor_constellation_result.actor_dynamics.alphaLon.accelMax = 0.0 352 actor_constellation_result.actor_dynamics.alphaLat.accelMax = 0.0 353 else: 354 # store route for debug drawings 355 self.route = actor_constellation_data.ego_route 356 # since the ego vehicle is controlled manually, it is easy possible that the ego vehicle 357 # accelerates far more in lateral direction than the ego_dynamics indicate 358 # in an automated vehicle this would be considered by the low-level controller when the RSS restriction 359 # is taken into account properly 360 # but the simple RSS restrictor within CARLA is not able to do so... 361 # So we should at least tell RSS about the fact that we acceleration more than this 362 # to be able to react on this 363 abs_avg_route_accel_lat = abs(float(actor_constellation_data.ego_dynamics_on_route.avg_route_accel_lat)) 364 if abs_avg_route_accel_lat > actor_constellation_result.ego_vehicle_dynamics.alphaLat.accelMax: 365 # print("!! Route lateral dynamics exceed expectations: route:{} expected:{} !!".format(abs_avg_route_accel_lat, 366 # actor_constellation_result.ego_vehicle_dynamics.alphaLat.accelMax)) 367 actor_constellation_result.ego_vehicle_dynamics.alphaLat.accelMax = min(20.0, abs_avg_route_accel_lat) 368 369 # print("_on_actor_constellation_result({}-{}): ".format(actor_id, 370 # actor_type_id), str(actor_constellation_result)) 371 return actor_constellation_result 372
[docs] 373 def destroy(self): 374 logger.info("Stopping RSS sensor") 375 super().destroy() 376 self.unstructured_scene_visualizer = None
377
[docs] 378 def toggle_debug_visualization_mode(self): 379 self.debug_visualizer.toggleMode()
380
[docs] 381 def increase_log_level(self): 382 print(f"increase {self.log_level}") 383 if self.log_level < carla.RssLogLevel.off: 384 self.log_level = carla.RssLogLevel.values[self.log_level + 1] 385 self.sensor.set_log_level(self.log_level)
386
[docs] 387 def decrease_log_level(self): 388 print(f"decrease {self.log_level}") 389 if self.log_level > carla.RssLogLevel.trace: 390 self.log_level = carla.RssLogLevel.values[self.log_level - 1] 391 self.sensor.set_log_level(self.log_level)
392
[docs] 393 def increase_map_log_level(self): 394 if self.map_log_level < carla.RssLogLevel.off: 395 self.map_log_level = self.map_log_level + 1 396 self.sensor.set_map_log_level(self.map_log_level)
397
[docs] 398 def decrease_map_log_level(self): 399 if self.map_log_level > carla.RssLogLevel.trace: 400 self.map_log_level = self.map_log_level - 1 401 self.sensor.set_map_log_level(self.map_log_level)
402
[docs] 403 def drop_route(self): 404 self.sensor.drop_route()
405
[docs] 406 @staticmethod 407 def get_default_parameters() -> "ad.rss.world.RssDynamics": 408 ego_dynamics = ad.rss.world.RssDynamics() 409 ego_dynamics.alphaLon.accelMax = 5 410 ego_dynamics.alphaLon.brakeMax = -8 411 ego_dynamics.alphaLon.brakeMin = -4 412 ego_dynamics.alphaLon.brakeMinCorrect = -3 413 ego_dynamics.alphaLat.accelMax = 0.2 414 ego_dynamics.alphaLat.brakeMin = -0.8 415 ego_dynamics.lateralFluctuationMargin = 0.1 416 ego_dynamics.responseTime = 0.5 417 ego_dynamics.maxSpeedOnAcceleration = 100 418 ego_dynamics.unstructuredSettings.pedestrianTurningRadius = 2.0 419 ego_dynamics.unstructuredSettings.driveAwayMaxAngle = 2.4 420 ego_dynamics.unstructuredSettings.vehicleYawRateChange = 1.3 421 ego_dynamics.unstructuredSettings.vehicleMinRadius = 3.5 422 ego_dynamics.unstructuredSettings.vehicleTrajectoryCalculationStep = 0.2 423 ego_dynamics.unstructuredSettings.vehicleFrontIntermediateYawRateChangeRatioSteps = 4 424 ego_dynamics.unstructuredSettings.vehicleBackIntermediateYawRateChangeRatioSteps = 0 425 ego_dynamics.unstructuredSettings.vehicleContinueForwardIntermediateAccelerationSteps = 3 426 ego_dynamics.unstructuredSettings.vehicleBrakeIntermediateAccelerationSteps = 3 427 ego_dynamics.unstructuredSettings.pedestrianTurningRadius = 2.0 428 ego_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateHeadingChangeRatioSteps = 3 429 ego_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateAccelerationSteps = 0 430 ego_dynamics.unstructuredSettings.pedestrianBrakeIntermediateAccelerationSteps = 3 431 ego_dynamics.unstructuredSettings.pedestrianFrontIntermediateHeadingChangeRatioSteps = 4 432 ego_dynamics.unstructuredSettings.pedestrianBackIntermediateHeadingChangeRatioSteps = 0 433 return ego_dynamics
434
[docs] 435 def set_default_parameters(self): 436 logger.info("Use 'default' RSS Parameters") 437 self.current_vehicle_parameters = self.get_default_parameters()
438
[docs] 439 @staticmethod 440 def get_pedestrian_parameters() -> "ad.rss.world.RssDynamics": 441 pedestrian_dynamics = ad.rss.world.RssDynamics() 442 pedestrian_dynamics.alphaLon.accelMax = 2.0 443 pedestrian_dynamics.alphaLon.brakeMax = -2.0 444 pedestrian_dynamics.alphaLon.brakeMin = -2.0 445 pedestrian_dynamics.alphaLon.brakeMinCorrect = -2.0 446 pedestrian_dynamics.alphaLat.accelMax = 0.001 447 pedestrian_dynamics.alphaLat.brakeMin = -0.001 448 pedestrian_dynamics.lateralFluctuationMargin = 0.1 449 pedestrian_dynamics.responseTime = 0.8 450 pedestrian_dynamics.maxSpeedOnAcceleration = 10 451 pedestrian_dynamics.unstructuredSettings.pedestrianTurningRadius = 2.0 452 pedestrian_dynamics.unstructuredSettings.driveAwayMaxAngle = 2.4 453 pedestrian_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateHeadingChangeRatioSteps = 3 454 pedestrian_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateAccelerationSteps = 0 455 pedestrian_dynamics.unstructuredSettings.pedestrianBrakeIntermediateAccelerationSteps = 3 456 pedestrian_dynamics.unstructuredSettings.pedestrianFrontIntermediateHeadingChangeRatioSteps = 4 457 pedestrian_dynamics.unstructuredSettings.pedestrianBackIntermediateHeadingChangeRatioSteps = 0 458 459 # not used: 460 pedestrian_dynamics.unstructuredSettings.vehicleYawRateChange = 1.3 461 pedestrian_dynamics.unstructuredSettings.vehicleMinRadius = 3.5 462 pedestrian_dynamics.unstructuredSettings.vehicleTrajectoryCalculationStep = 0.2 463 pedestrian_dynamics.unstructuredSettings.vehicleFrontIntermediateYawRateChangeRatioSteps = 4 464 pedestrian_dynamics.unstructuredSettings.vehicleBackIntermediateYawRateChangeRatioSteps = 0 465 pedestrian_dynamics.unstructuredSettings.vehicleContinueForwardIntermediateAccelerationSteps = 3 466 pedestrian_dynamics.unstructuredSettings.vehicleBrakeIntermediateAccelerationSteps = 3 467 return pedestrian_dynamics
468
[docs] 469 def get_steering_ranges(self) -> List[Tuple[float, float]]: 470 return [ 471 ( 472 (float(self.ego_dynamics_on_route.ego_heading) - float(heading_range.begin)) / self._max_steer_angle, # pyright: ignore[reportOptionalMemberAccess] 473 (float(self.ego_dynamics_on_route.ego_heading) - float(heading_range.end)) / self._max_steer_angle, 474 ) # pyright: ignore[reportOptionalMemberAccess] 475 for heading_range in self._allowed_heading_ranges 476 ]
477 478 def _on_rss_response(self, response: "carla.RssResponse"): 479 if not self or not response: 480 return 481 delta_time = 0.1 482 if self.timestamp: 483 delta_time = response.timestamp - self.timestamp 484 if delta_time > -0.05: 485 self.timestamp = response.timestamp 486 self.response_valid = response.response_valid 487 self.proper_response = response.proper_response 488 self.ego_dynamics_on_route = response.ego_dynamics_on_route 489 self.rss_state_snapshot = response.rss_state_snapshot 490 self.situation_snapshot = response.situation_snapshot 491 self.world_model = response.world_model 492 493 # calculate the allowed heading ranges: 494 if response.proper_response.headingRanges: 495 heading = float(response.ego_dynamics_on_route.ego_heading) 496 heading_ranges: "list[ad.rss.state.HeadingRange] | ad.rss.state.HeadingRangeVector" = ( 497 response.proper_response.headingRanges 498 ) 499 steering_range = ad.rss.state.HeadingRange() 500 steering_range.begin = -self._max_steer_angle + heading 501 steering_range.end = self._max_steer_angle + heading 502 # Updates heading_ranges -> bool if overlap exists 503 ad.rss.unstructured.getHeadingOverlap(steering_range, heading_ranges) 504 self._allowed_heading_ranges = heading_ranges 505 else: 506 self._allowed_heading_ranges = [] 507 508 if self.unstructured_scene_visualizer: 509 self.unstructured_scene_visualizer.tick(response.frame, response, self._allowed_heading_ranges) 510 511 new_states = [ 512 RssStateInfo(rss_state, response.ego_dynamics_on_route, response.world_model) 513 for rss_state in response.rss_state_snapshot.individualResponses 514 ] 515 if len(new_states) > 0: 516 new_states.sort(key=lambda rss_states: rss_states.distance) 517 self.individual_rss_states = new_states 518 if self.bounding_box_visualizer: 519 self.bounding_box_visualizer.tick(response.frame, self.individual_rss_states) 520 if self.state_visualizer: 521 self.state_visualizer.tick(self.individual_rss_states) 522 self.debug_visualizer.tick( 523 self.route, not response.proper_response.isSafe, self.individual_rss_states, self.ego_dynamics_on_route 524 ) 525 526 else: 527 print(f"ignore outdated RSS response {delta_time}")