Source code for classes.sensors.rss_sensor
1# Copyright (c) 2020 Intel Corporation
2#
3# Based on CARLA example : https://github.com/carla-simulator/carla/blob/master/PythonAPI/examples/rss/rss_visualization.py
4
5
6# pyrightX: reportUnknownMemberType=none
7# pyrightX: reportUnknownArgumentType=information
8# pyrightX: reportUnknownParameterType=information
9# pyrightX: reportTypeCommentUsage=none
10# pyrightX: reportAttributeAccessIssue=information
11
12import inspect
13import math
14from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
15from typing import cast as assure_type
16
17import carla
18
19from agents.tools.logs import logger
20from classes.sensors import CustomSensorInterface
21from classes.constants import AD_RSS_AVAILABLE, RssLogLevel, RssLogLevelAlias
22from .rss_visualization import (
23 RssDebugVisualizationMode,
24 RssDebugVisualizer,
25 RssUnstructuredSceneVisualizer,
26)
27from launch_tools import CarlaDataProvider
28
29if AD_RSS_AVAILABLE:
30 from carla import ad
31
32 RssStateEvaluator = ad.rss.state.RssStateEvaluator
33 EVALUATOR_NONE_STATE = RssStateEvaluator.names["None"]
34
35if TYPE_CHECKING:
36 assert ad # remove Unbound type # type: ignore
37 from classes.sensors.rss_visualization import RssBoundingBoxVisualizer, RssStateVisualizer
38
39
40# ==============================================================================
41# -- RssSensor -----------------------------------------------------------------
42# ==============================================================================
43
44
[docs]
45class RssStateInfo:
[docs]
46 def __init__(self, rss_state, ego_dynamics_on_route, world_model):
47 # type: (ad.rss.state.RssState, carla.RssEgoDynamicsOnRoute, ad.rss.world.WorldModel) -> None
48 self.rss_state = rss_state
49 self.distance = -1
50 self.is_dangerous = ad.rss.state.isDangerous(rss_state)
51 if rss_state.situationType == ad.rss.situation.SituationType.Unstructured:
52 self.actor_calculation_mode = ad.rss.map.RssMode.Unstructured
53 else:
54 self.actor_calculation_mode = ad.rss.map.RssMode.Structured
55
56 # calculate distance to other vehicle
57 object_state = None
58 for scene in world_model.scenes:
59 if scene.object.objectId == rss_state.objectId:
60 object_state = scene.object.state
61 break
62
63 if object_state:
64 self.distance = math.sqrt(
65 (float(ego_dynamics_on_route.ego_center.x) - float(object_state.centerPoint.x)) ** 2
66 + (float(ego_dynamics_on_route.ego_center.y) - float(object_state.centerPoint.y)) ** 2
67 )
68
69 self.longitudinal_margin = float(
70 rss_state.longitudinalState.rssStateInformation.currentDistance
71 - rss_state.longitudinalState.rssStateInformation.safeDistance
72 )
73 self.margin = max(0, self.longitudinal_margin)
74 self.lateral_margin = None
75 if rss_state.lateralStateLeft.rssStateInformation.evaluator != EVALUATOR_NONE_STATE:
76 self.lateral_margin = (
77 rss_state.lateralStateLeft.rssStateInformation.currentDistance
78 - rss_state.lateralStateLeft.rssStateInformation.safeDistance
79 )
80 if rss_state.lateralStateRight.rssStateInformation.evaluator != EVALUATOR_NONE_STATE:
81 lateral_margin_right = (
82 rss_state.lateralStateRight.rssStateInformation.currentDistance
83 - rss_state.lateralStateRight.rssStateInformation.safeDistance
84 )
85 if self.lateral_margin is None or self.lateral_margin > lateral_margin_right:
86 self.lateral_margin = lateral_margin_right
87 if self.lateral_margin is not None and self.lateral_margin > 0:
88 self.margin += float(self.lateral_margin)
89
[docs]
90 def get_actor(self, world):
91 # type: (carla.World) -> carla.Actor | None
92 if self.rss_state.objectId == 18446744073709551614:
93 return None # "Border Left"
94 if self.rss_state.objectId == 18446744073709551615:
95 return None # "Border Right"
96 return world.get_actor(self.rss_state.objectId)
97
98 def __str__(self):
99 return "RssStateInfo: object=" + str(self.rss_state.objectId) + " dangerous=" + str(self.is_dangerous)
100
101
[docs]
102class RssSensor(CustomSensorInterface):
[docs]
103 def __init__(
104 self,
105 parent_actor: carla.Vehicle,
106 unstructured_scene_visualizer: "RssUnstructuredSceneVisualizer",
107 bounding_box_visualizer: "RssBoundingBoxVisualizer",
108 state_visualizer: "RssStateVisualizer",
109 *,
110 visualizer_mode: RssDebugVisualizationMode = RssDebugVisualizationMode.Off,
111 routing_targets: Optional[Iterable[carla.Transform]] = None,
112 log_level: RssLogLevelAlias = RssLogLevel.off,
113 ):
114 world = CarlaDataProvider.get_world()
115 assert world
116 self.unstructured_scene_visualizer = unstructured_scene_visualizer
117 self.bounding_box_visualizer = bounding_box_visualizer
118 self._parent: carla.Vehicle = parent_actor
119 self.timestamp = None
120 self.response_valid = False
121 self.proper_response = None
122 self.rss_state_snapshot = None
123 self.situation_snapshot = None
124 self.world_model = None
125 self.individual_rss_states = []
126 self._allowed_heading_ranges: Iterable[ad.rss.state.HeadingRange] = []
127 self.ego_dynamics_on_route: Optional[carla.RssEgoDynamicsOnRoute] = None # set on response
128 self.current_vehicle_parameters = self.get_default_parameters()
129 self.route = None
130 self.debug_visualizer = RssDebugVisualizer(parent_actor, world, visualizer_mode)
131 self.state_visualizer = state_visualizer
132 self.change_to_unstructured_position_map = {}
133
134 # get max steering angle
135 physics_control = parent_actor.get_physics_control()
136 self._max_steer_angle = 0.0
137 for wheel in physics_control.wheels:
138 self._max_steer_angle = max(self._max_steer_angle, wheel.max_steer_angle)
139 self._max_steer_angle = math.radians(self._max_steer_angle)
140
141 bp = CarlaDataProvider._blueprint_library.find("sensor.other.rss")
142 self.sensor: carla.RssSensor = assure_type(
143 "carla.RssSensor", world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
144 )
145 # We need to pass the lambda a weak reference to self to avoid circular
146 # reference.
147
148 def check_rss_class(clazz):
149 return inspect.isclass(clazz) and "RssSensor" in clazz.__name__
150
151 if not inspect.getmembers(carla, check_rss_class):
152 raise RuntimeError('CARLA PythonAPI not compiled in RSS variant, please "make PythonAPI.rss"')
153
154 self.log_level = log_level
155 self.map_log_level = log_level
156
157 self.set_default_parameters()
158
159 self.sensor.register_actor_constellation_callback(self._on_actor_constellation_request)
160
161 self.sensor.listen(self._on_rss_response)
162 assert isinstance(self.log_level, carla.RssLogLevel)
163 assert isinstance(self.map_log_level, carla.RssLogLevel)
164 logger.info(f"Setting log level to {log_level}")
165 self.sensor.set_log_level(self.log_level)
166 self.sensor.set_map_log_level(self.map_log_level)
167
168 # only relevant if actor constellation callback is not registered
169 # self.sensor.ego_vehicle_dynamics = self.current_vehicle_parameters
170
171 self.sensor.road_boundaries_mode = carla.RssRoadBoundariesMode.Off
172
173 self.sensor.reset_routing_targets()
174 if routing_targets:
175 for target in routing_targets:
176 self.sensor.append_routing_target(target)
177
178 def _on_actor_constellation_request(self, actor_constellation_data: "carla.RssActorConstellationData"):
179 # print("_on_actor_constellation_request: ", str(actor_constellation_data))
180
181 actor_constellation_result = carla.RssActorConstellationResult()
182 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.NotRelevant
183 actor_constellation_result.restrict_speed_limit_mode = (
184 ad.rss.map.RssSceneCreation.RestrictSpeedLimitMode.IncreasedSpeedLimit10
185 )
186 actor_constellation_result.ego_vehicle_dynamics = self.current_vehicle_parameters
187 actor_constellation_result.actor_object_type = ad.rss.world.ObjectType.Invalid
188 actor_constellation_result.actor_dynamics = self.current_vehicle_parameters
189
190 actor_id = -1
191 # actor_type_id = "none"
192 if actor_constellation_data.other_actor is not None:
193 actor_id = actor_constellation_data.other_actor.id
194 # actor_type_id = actor_constellation_data.other_actor.type_id
195
196 ego_on_the_sidewalk = False
197 ego_on_routeable_road = False
198 for occupied_region in actor_constellation_data.ego_match_object.mapMatchedBoundingBox.laneOccupiedRegions:
199 lane = ad.map.lane.getLane(occupied_region.laneId)
200 if lane.type == ad.map.lane.LaneType.PEDESTRIAN:
201 # if not ego_on_the_sidewalk:
202 # print ( "ego-{} on lane of lane type {} => sidewalk".format(actor_id, lane.type))
203 ego_on_the_sidewalk = True
204 elif ad.map.lane.isRouteable(lane):
205 # if not ego_on_routeable_road:
206 # print ( "ego-{} on lane of lane type {} => road".format(actor_id, lane.type))
207 ego_on_routeable_road = True
208
209 if "walker.pedestrian" in actor_constellation_data.other_actor.type_id:
210 # determine if the pedestrian is walking on the sidewalk or on the road
211 pedestrian_on_the_road = False
212 pedestrian_on_the_sidewalk = False
213 for (
214 occupied_region
215 ) in actor_constellation_data.other_match_object.mapMatchedBoundingBox.laneOccupiedRegions:
216 lane = ad.map.lane.getLane(occupied_region.laneId)
217 if lane.type == ad.map.lane.LaneType.PEDESTRIAN:
218 # if not pedestrian_on_the_sidewalk:
219 # print ( "pedestrian-{} on lane of lane type {} => sidewalk".format(actor_id, lane.type))
220 pedestrian_on_the_sidewalk = True
221 else:
222 # if not pedestrian_on_the_road:
223 # print ( "pedestrian-{} on lane of lane type {} => road".format(actor_id, lane.type))
224 pedestrian_on_the_road = True
225 if (
226 ego_on_routeable_road
227 and not ego_on_the_sidewalk
228 and not pedestrian_on_the_road
229 and pedestrian_on_the_sidewalk
230 ):
231 # pedestrian is not on the road, but on the sidewalk: then common sense is that vehicle has priority
232 # This analysis can and should be done more detailed, but this is a basic starting point for the decision
233 # In addition, the road network has to be correct to work best
234 # (currently there are no sidewalks in intersection areas)
235 # print ( "pedestrian-{} Off".format(actor_id))
236 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.NotRelevant
237 else:
238 # print ( "pedestrian-{} Unstructured".format(actor_id))
239 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Unstructured
240 actor_constellation_result.actor_object_type = ad.rss.world.ObjectType.Pedestrian
241 actor_constellation_result.actor_dynamics = self.get_pedestrian_parameters()
242 elif "vehicle" in actor_constellation_data.other_actor.type_id:
243 actor_constellation_result.actor_object_type = ad.rss.world.ObjectType.OtherVehicle
244
245 # set the response time of others vehicles to 2 seconds; the rest stays the same
246 actor_constellation_result.actor_dynamics.responseTime = 2.0
247
248 # per default, if ego is not on the road -> unstructured
249 if ego_on_routeable_road:
250 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Structured
251 else:
252 # print("vehicle-{} unstructured: reason other ego not on routeable road".format(actor_id))
253 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Unstructured
254
255 # special handling for vehicles standing still
256 actor_vel = actor_constellation_data.other_actor.get_velocity()
257 actor_speed = math.sqrt(actor_vel.x**2 + actor_vel.y**2 + actor_vel.z**2)
258 if actor_speed < 0.01:
259 # reduce response time
260 actor_constellation_result.actor_dynamics.responseTime = 1.0
261 # still in structured?
262 if actor_constellation_result.rss_calculation_mode == ad.rss.map.RssMode.Structured:
263 actor_distance = math.sqrt(
264 float(
265 actor_constellation_data.ego_match_object.enuPosition.centerPoint.x
266 - actor_constellation_data.other_match_object.enuPosition.centerPoint.x
267 )
268 ** 2
269 + float(
270 actor_constellation_data.ego_match_object.enuPosition.centerPoint.y
271 - actor_constellation_data.other_match_object.enuPosition.centerPoint.y
272 )
273 ** 2
274 )
275 # print("vehicle-{} unstructured check: other distance {}".format(actor_id, actor_distance))
276
277 if actor_constellation_data.ego_dynamics_on_route.ego_speed < 0.01:
278 # both vehicles stand still, so we have to analyze in detail if we possibly want to use
279 # unstructured mode to cope with blockades on the road...
280
281 if actor_distance < 10:
282 # the other has to be near enough to trigger a switch to unstructured
283 other_outside_routeable_road = False
284 for occupied_region in actor_constellation_data.other_match_object.mapMatchedBoundingBox.laneOccupiedRegions:
285 lane = ad.map.lane.getLane(occupied_region.laneId)
286 if not ad.map.lane.isRouteable(lane):
287 other_outside_routeable_road = True
288
289 if other_outside_routeable_road:
290 # if the other is somewhat outside the standard routeable road (e.g. parked at the side, ...)
291 # we immediately decide for unstructured
292 # print("vehicle-{} unstructured: reason other outside routeable
293 # road".format(actor_id))
294 actor_constellation_result.rss_calculation_mode = ad.rss.map.RssMode.Unstructured
295 else:
296 # otherwise we have to look in the orientation delta in addition to get some basic idea of the
297 # constellation (we don't want to go into unstructured if we both waiting
298 # behind a red light...)
299 heading_delta = abs(
300 float(
301 actor_constellation_data.ego_match_object.enuPosition.heading
302 - actor_constellation_data.other_match_object.enuPosition.heading
303 )
304 )
305 if heading_delta > 0.2: # around 11 degree
306 # print("vehicle-{} unstructured: reason heading delta
307 # {}".format(actor_id, heading_delta))
308 actor_constellation_result.rss_calculation_mode = (
309 ad.rss.map.RssMode.Unstructured
310 )
311 self.change_to_unstructured_position_map[actor_id] = (
312 actor_constellation_data.other_match_object.enuPosition
313 )
314 else:
315 # ego moves
316 if actor_distance < 10:
317 # if the ego moves, the other actor doesn't move an the mode was
318 # previously set to unstructured, keep it
319 try:
320 if (
321 self.change_to_unstructured_position_map[actor_id]
322 == actor_constellation_data.other_match_object.enuPosition
323 ):
324 heading_delta = abs(
325 float(
326 actor_constellation_data.ego_match_object.enuPosition.heading
327 - actor_constellation_data.other_match_object.enuPosition.heading
328 )
329 )
330 if heading_delta > 0.2:
331 actor_constellation_result.rss_calculation_mode = (
332 ad.rss.map.RssMode.Unstructured
333 )
334 else:
335 del self.change_to_unstructured_position_map[actor_id]
336 except (AttributeError, KeyError):
337 pass
338 else:
339 if actor_id in self.change_to_unstructured_position_map:
340 del self.change_to_unstructured_position_map[actor_id]
341
342 # still in structured?
343 if actor_constellation_result.rss_calculation_mode == ad.rss.map.RssMode.Structured:
344 # in structured case we have to cope with not yet implemented lateral intersection checks in core RSS implementation
345 # if the other is standing still, we don't assume that he will accelerate
346 # otherwise if standing at the intersection the acceleration within reaction time
347 # will allow to enter the intersection which current RSS implementation will immediately consider
348 # as dangerous
349 # print("_on_actor_constellation_result({}) setting accelMax to
350 # zero".format(actor_constellation_data.other_actor.id))
351 actor_constellation_result.actor_dynamics.alphaLon.accelMax = 0.0
352 actor_constellation_result.actor_dynamics.alphaLat.accelMax = 0.0
353 else:
354 # store route for debug drawings
355 self.route = actor_constellation_data.ego_route
356 # since the ego vehicle is controlled manually, it is easy possible that the ego vehicle
357 # accelerates far more in lateral direction than the ego_dynamics indicate
358 # in an automated vehicle this would be considered by the low-level controller when the RSS restriction
359 # is taken into account properly
360 # but the simple RSS restrictor within CARLA is not able to do so...
361 # So we should at least tell RSS about the fact that we acceleration more than this
362 # to be able to react on this
363 abs_avg_route_accel_lat = abs(float(actor_constellation_data.ego_dynamics_on_route.avg_route_accel_lat))
364 if abs_avg_route_accel_lat > actor_constellation_result.ego_vehicle_dynamics.alphaLat.accelMax:
365 # print("!! Route lateral dynamics exceed expectations: route:{} expected:{} !!".format(abs_avg_route_accel_lat,
366 # actor_constellation_result.ego_vehicle_dynamics.alphaLat.accelMax))
367 actor_constellation_result.ego_vehicle_dynamics.alphaLat.accelMax = min(20.0, abs_avg_route_accel_lat)
368
369 # print("_on_actor_constellation_result({}-{}): ".format(actor_id,
370 # actor_type_id), str(actor_constellation_result))
371 return actor_constellation_result
372
[docs]
373 def destroy(self):
374 logger.info("Stopping RSS sensor")
375 super().destroy()
376 self.unstructured_scene_visualizer = None
377
380
[docs]
381 def increase_log_level(self):
382 print(f"increase {self.log_level}")
383 if self.log_level < carla.RssLogLevel.off:
384 self.log_level = carla.RssLogLevel.values[self.log_level + 1]
385 self.sensor.set_log_level(self.log_level)
386
[docs]
387 def decrease_log_level(self):
388 print(f"decrease {self.log_level}")
389 if self.log_level > carla.RssLogLevel.trace:
390 self.log_level = carla.RssLogLevel.values[self.log_level - 1]
391 self.sensor.set_log_level(self.log_level)
392
[docs]
393 def increase_map_log_level(self):
394 if self.map_log_level < carla.RssLogLevel.off:
395 self.map_log_level = self.map_log_level + 1
396 self.sensor.set_map_log_level(self.map_log_level)
397
[docs]
398 def decrease_map_log_level(self):
399 if self.map_log_level > carla.RssLogLevel.trace:
400 self.map_log_level = self.map_log_level - 1
401 self.sensor.set_map_log_level(self.map_log_level)
402
405
[docs]
406 @staticmethod
407 def get_default_parameters() -> "ad.rss.world.RssDynamics":
408 ego_dynamics = ad.rss.world.RssDynamics()
409 ego_dynamics.alphaLon.accelMax = 5
410 ego_dynamics.alphaLon.brakeMax = -8
411 ego_dynamics.alphaLon.brakeMin = -4
412 ego_dynamics.alphaLon.brakeMinCorrect = -3
413 ego_dynamics.alphaLat.accelMax = 0.2
414 ego_dynamics.alphaLat.brakeMin = -0.8
415 ego_dynamics.lateralFluctuationMargin = 0.1
416 ego_dynamics.responseTime = 0.5
417 ego_dynamics.maxSpeedOnAcceleration = 100
418 ego_dynamics.unstructuredSettings.pedestrianTurningRadius = 2.0
419 ego_dynamics.unstructuredSettings.driveAwayMaxAngle = 2.4
420 ego_dynamics.unstructuredSettings.vehicleYawRateChange = 1.3
421 ego_dynamics.unstructuredSettings.vehicleMinRadius = 3.5
422 ego_dynamics.unstructuredSettings.vehicleTrajectoryCalculationStep = 0.2
423 ego_dynamics.unstructuredSettings.vehicleFrontIntermediateYawRateChangeRatioSteps = 4
424 ego_dynamics.unstructuredSettings.vehicleBackIntermediateYawRateChangeRatioSteps = 0
425 ego_dynamics.unstructuredSettings.vehicleContinueForwardIntermediateAccelerationSteps = 3
426 ego_dynamics.unstructuredSettings.vehicleBrakeIntermediateAccelerationSteps = 3
427 ego_dynamics.unstructuredSettings.pedestrianTurningRadius = 2.0
428 ego_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateHeadingChangeRatioSteps = 3
429 ego_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateAccelerationSteps = 0
430 ego_dynamics.unstructuredSettings.pedestrianBrakeIntermediateAccelerationSteps = 3
431 ego_dynamics.unstructuredSettings.pedestrianFrontIntermediateHeadingChangeRatioSteps = 4
432 ego_dynamics.unstructuredSettings.pedestrianBackIntermediateHeadingChangeRatioSteps = 0
433 return ego_dynamics
434
[docs]
435 def set_default_parameters(self):
436 logger.info("Use 'default' RSS Parameters")
437 self.current_vehicle_parameters = self.get_default_parameters()
438
[docs]
439 @staticmethod
440 def get_pedestrian_parameters() -> "ad.rss.world.RssDynamics":
441 pedestrian_dynamics = ad.rss.world.RssDynamics()
442 pedestrian_dynamics.alphaLon.accelMax = 2.0
443 pedestrian_dynamics.alphaLon.brakeMax = -2.0
444 pedestrian_dynamics.alphaLon.brakeMin = -2.0
445 pedestrian_dynamics.alphaLon.brakeMinCorrect = -2.0
446 pedestrian_dynamics.alphaLat.accelMax = 0.001
447 pedestrian_dynamics.alphaLat.brakeMin = -0.001
448 pedestrian_dynamics.lateralFluctuationMargin = 0.1
449 pedestrian_dynamics.responseTime = 0.8
450 pedestrian_dynamics.maxSpeedOnAcceleration = 10
451 pedestrian_dynamics.unstructuredSettings.pedestrianTurningRadius = 2.0
452 pedestrian_dynamics.unstructuredSettings.driveAwayMaxAngle = 2.4
453 pedestrian_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateHeadingChangeRatioSteps = 3
454 pedestrian_dynamics.unstructuredSettings.pedestrianContinueForwardIntermediateAccelerationSteps = 0
455 pedestrian_dynamics.unstructuredSettings.pedestrianBrakeIntermediateAccelerationSteps = 3
456 pedestrian_dynamics.unstructuredSettings.pedestrianFrontIntermediateHeadingChangeRatioSteps = 4
457 pedestrian_dynamics.unstructuredSettings.pedestrianBackIntermediateHeadingChangeRatioSteps = 0
458
459 # not used:
460 pedestrian_dynamics.unstructuredSettings.vehicleYawRateChange = 1.3
461 pedestrian_dynamics.unstructuredSettings.vehicleMinRadius = 3.5
462 pedestrian_dynamics.unstructuredSettings.vehicleTrajectoryCalculationStep = 0.2
463 pedestrian_dynamics.unstructuredSettings.vehicleFrontIntermediateYawRateChangeRatioSteps = 4
464 pedestrian_dynamics.unstructuredSettings.vehicleBackIntermediateYawRateChangeRatioSteps = 0
465 pedestrian_dynamics.unstructuredSettings.vehicleContinueForwardIntermediateAccelerationSteps = 3
466 pedestrian_dynamics.unstructuredSettings.vehicleBrakeIntermediateAccelerationSteps = 3
467 return pedestrian_dynamics
468
[docs]
469 def get_steering_ranges(self) -> List[Tuple[float, float]]:
470 return [
471 (
472 (float(self.ego_dynamics_on_route.ego_heading) - float(heading_range.begin)) / self._max_steer_angle, # pyright: ignore[reportOptionalMemberAccess]
473 (float(self.ego_dynamics_on_route.ego_heading) - float(heading_range.end)) / self._max_steer_angle,
474 ) # pyright: ignore[reportOptionalMemberAccess]
475 for heading_range in self._allowed_heading_ranges
476 ]
477
478 def _on_rss_response(self, response: "carla.RssResponse"):
479 if not self or not response:
480 return
481 delta_time = 0.1
482 if self.timestamp:
483 delta_time = response.timestamp - self.timestamp
484 if delta_time > -0.05:
485 self.timestamp = response.timestamp
486 self.response_valid = response.response_valid
487 self.proper_response = response.proper_response
488 self.ego_dynamics_on_route = response.ego_dynamics_on_route
489 self.rss_state_snapshot = response.rss_state_snapshot
490 self.situation_snapshot = response.situation_snapshot
491 self.world_model = response.world_model
492
493 # calculate the allowed heading ranges:
494 if response.proper_response.headingRanges:
495 heading = float(response.ego_dynamics_on_route.ego_heading)
496 heading_ranges: "list[ad.rss.state.HeadingRange] | ad.rss.state.HeadingRangeVector" = (
497 response.proper_response.headingRanges
498 )
499 steering_range = ad.rss.state.HeadingRange()
500 steering_range.begin = -self._max_steer_angle + heading
501 steering_range.end = self._max_steer_angle + heading
502 # Updates heading_ranges -> bool if overlap exists
503 ad.rss.unstructured.getHeadingOverlap(steering_range, heading_ranges)
504 self._allowed_heading_ranges = heading_ranges
505 else:
506 self._allowed_heading_ranges = []
507
508 if self.unstructured_scene_visualizer:
509 self.unstructured_scene_visualizer.tick(response.frame, response, self._allowed_heading_ranges)
510
511 new_states = [
512 RssStateInfo(rss_state, response.ego_dynamics_on_route, response.world_model)
513 for rss_state in response.rss_state_snapshot.individualResponses
514 ]
515 if len(new_states) > 0:
516 new_states.sort(key=lambda rss_states: rss_states.distance)
517 self.individual_rss_states = new_states
518 if self.bounding_box_visualizer:
519 self.bounding_box_visualizer.tick(response.frame, self.individual_rss_states)
520 if self.state_visualizer:
521 self.state_visualizer.tick(self.individual_rss_states)
522 self.debug_visualizer.tick(
523 self.route, not response.proper_response.isSafe, self.individual_rss_states, self.ego_dynamics_on_route
524 )
525
526 else:
527 print(f"ignore outdated RSS response {delta_time}")