Source code for classes.sensors.carla_originals.sensors

  1"""
  2Sensor classes used by the examples in the CARLA repository.
  3These classes are true to the original, however might have slight
  4modifications like type hints and the :py:class:`.CustomSensorInterface`
  5as base.
  6"""
  7
  8import collections
  9import math
 10import weakref
 11
 12import carla
 13
 14from classes.sensors._sensor_interface import CustomSensorInterface
 15
 16__all__ = ["CollisionSensor", "GnssSensor", "IMUSensor", "LaneInvasionSensor", "RadarSensor"]
 17
 18from typing import TYPE_CHECKING
 19
 20if TYPE_CHECKING:
 21    from classes.ui.hud import HUD
 22
 23# ==============================================================================
 24# -- CollisionSensor -----------------------------------------------------------
 25# ==============================================================================
 26
 27
[docs] 28class CollisionSensor(CustomSensorInterface): 29 """ 30 Wrapper class for CARLA collision sensors 31 32 See Also: 33 https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector 34 """ 35 36 def __init__(self, parent_actor: carla.Actor, hud: "HUD"): 37 """Constructor method""" 38 self.history: "list[tuple[int, float]]" = [] 39 self._parent = parent_actor 40 self.hud: "HUD" = hud 41 world = self._parent.get_world() 42 blueprint = world.get_blueprint_library().find("sensor.other.collision") 43 self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent) # type: ignore 44 # We need to pass the lambda a weak reference to 45 # self to avoid circular reference. 46 weak_self = weakref.ref(self) 47 self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) # pyright: ignore[reportArgumentType] 48
[docs] 49 def get_collision_history(self): 50 """Gets the history of collisions""" 51 history: dict[int, float] = collections.defaultdict(float) 52 for frame, intensity in self.history: 53 history[frame] += intensity 54 return history
55 56 @staticmethod 57 def _on_collision(weak_self: "weakref.ref[CollisionSensor]", event: carla.CollisionEvent): 58 """On collision method""" 59 self = weak_self() 60 if not self: 61 return 62 from classes.ui.hud import get_actor_display_name # lazy import to avoid circular import # noqa: PLC0415 63 64 actor_type = get_actor_display_name(event.other_actor) 65 self.hud.notification(f"Collision with {actor_type!r}") 66 impulse = event.normal_impulse 67 intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2) 68 self.history.append((event.frame, intensity)) 69 if len(self.history) > 4000: 70 self.history.pop(0)
71 72 73# ============================================================================== 74# -- LaneInvasionSensor -------------------------------------------------------- 75# ============================================================================== 76 77
[docs] 78class LaneInvasionSensor(CustomSensorInterface): 79 """ 80 Wrapper class for CARLA lane invasion sensors 81 82 See Also: 83 https://carla.readthedocs.io/en/latest/ref_sensors/#lane-invasion-detector 84 """ 85 86 def __init__(self, parent_actor: carla.Actor, hud: "HUD"): 87 """Constructor method""" 88 self._parent = parent_actor 89 self.hud = hud 90 world = self._parent.get_world() 91 bp = world.get_blueprint_library().find("sensor.other.lane_invasion") 92 self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) # type: ignore 93 # We need to pass the lambda a weak reference to self to avoid circular 94 # reference. 95 weak_self = weakref.ref(self) 96 self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) # pyright: ignore[reportArgumentType] 97 98 @staticmethod 99 def _on_invasion(weak_self: "weakref.ref[LaneInvasionSensor]", event: carla.LaneInvasionEvent): 100 """On invasion method""" 101 self = weak_self() 102 if not self: 103 return 104 lane_types = {x.type for x in event.crossed_lane_markings} 105 text = [repr(str(x).split()[-1]) for x in lane_types] 106 self.hud.notification("Crossed line {}".format(" and ".join(text)))
107 108 109# ============================================================================== 110# -- GnssSensor -------------------------------------------------------- 111# ============================================================================== 112 113
[docs] 114class GnssSensor(CustomSensorInterface): 115 """ 116 Wrapper class for CARLA GNSS sensors 117 118 See Also: 119 https://carla.readthedocs.io/en/latest/ref_sensors/#gnss-sensor 120 """ 121 122 def __init__(self, parent_actor: carla.Actor): 123 """Constructor method""" 124 self._parent = parent_actor 125 self.lat = 0.0 126 self.lon = 0.0 127 world = self._parent.get_world() 128 blueprint = world.get_blueprint_library().find("sensor.other.gnss") 129 self.sensor = world.spawn_actor( 130 blueprint, 131 carla.Transform(carla.Location(x=1.0, z=2.8)), # type: ignore 132 attach_to=self._parent, 133 ) 134 # We need to pass the lambda a weak reference to 135 # self to avoid circular reference. 136 weak_self = weakref.ref(self) 137 self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) # pyright: ignore[reportArgumentType] 138 139 @staticmethod 140 def _on_gnss_event(weak_self: "weakref.ref[GnssSensor]", event: carla.GnssMeasurement): 141 """GNSS method""" 142 self = weak_self() 143 if not self: 144 return 145 self.lat = event.latitude 146 self.lon = event.longitude
147 148 149# ============================================================================== 150# -- RadarSensor --------------------------------------------------------------- 151# ============================================================================== 152 153
[docs] 154class RadarSensor(CustomSensorInterface): 155 """ 156 Wrapper class for CARLA radar sensors 157 158 See Also: 159 https://carla.readthedocs.io/en/latest/ref_sensors/#radar-sensor 160 """ 161 162 def __init__(self, parent_actor: carla.Actor): 163 self._parent = parent_actor 164 bound_x = 0.5 + self._parent.bounding_box.extent.x 165 # bound_y = 0.5 + self._parent.bounding_box.extent.y 166 bound_z = 0.5 + self._parent.bounding_box.extent.z 167 168 self.velocity_range = 7.5 # m/s 169 world = self._parent.get_world() 170 self.debug = world.debug 171 bp = world.get_blueprint_library().find("sensor.other.radar") 172 bp.set_attribute("horizontal_fov", str(35)) 173 bp.set_attribute("vertical_fov", str(20)) 174 self.sensor = world.spawn_actor( # type: ignore 175 bp, 176 carla.Transform(carla.Location(x=bound_x + 0.05, z=bound_z + 0.05), carla.Rotation(pitch=5)), 177 attach_to=self._parent, 178 ) 179 # We need a weak reference to self to avoid circular reference. 180 weak_self = weakref.ref(self) 181 self.sensor.listen(lambda radar_data: RadarSensor._Radar_callback(weak_self, radar_data)) # pyright: ignore[reportArgumentType] 182 183 @staticmethod 184 def _Radar_callback(weak_self: "weakref.ref[RadarSensor]", radar_data: carla.RadarMeasurement): 185 self = weak_self() 186 if not self: 187 return 188 # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]: 189 # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4')) 190 # points = np.reshape(points, (len(radar_data), 4)) 191 192 current_rot = radar_data.transform.rotation 193 for detect in radar_data: 194 azi = math.degrees(detect.azimuth) 195 alt = math.degrees(detect.altitude) 196 # The 0.25 adjusts a bit the distance so the dots can 197 # be properly seen 198 fw_vec = carla.Vector3D(x=detect.depth - 0.25) 199 carla.Transform( 200 carla.Location(), 201 carla.Rotation(pitch=current_rot.pitch + alt, yaw=current_rot.yaw + azi, roll=current_rot.roll), 202 ).transform(fw_vec) 203 204 def clamp(min_v, max_v, value): 205 return max(min_v, min(value, max_v)) 206 207 norm_velocity = detect.velocity / self.velocity_range # range [-1, 1] 208 r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0) 209 g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0) 210 b = int(abs(clamp(-1.0, 0.0, -1.0 - norm_velocity)) * 255.0) 211 self.debug.draw_point( 212 radar_data.transform.location + fw_vec, # pyright: ignore[reportArgumentType] 213 size=0.075, 214 life_time=0.06, 215 persistent_lines=False, 216 color=carla.Color(r, g, b), 217 )
218 219 220# ============================================================================== 221# -- IMUSensor ----------------------------------------------------------------- 222# ============================================================================== 223 224
[docs] 225class IMUSensor(CustomSensorInterface): 226 """ 227 Wrapper class for CARLA IMU sensors 228 229 See Also: 230 https://carla.readthedocs.io/en/latest/ref_sensors/#imu-sensor 231 """ 232 233 def __init__(self, parent_actor: carla.Actor): 234 self._parent = parent_actor 235 self.accelerometer = (0.0, 0.0, 0.0) 236 self.gyroscope = (0.0, 0.0, 0.0) 237 self.compass = 0.0 238 world = self._parent.get_world() 239 bp = world.get_blueprint_library().find("sensor.other.imu") 240 self.sensor = world.spawn_actor( # type: ignore 241 bp, carla.Transform(), attach_to=self._parent 242 ) 243 # We need to pass the lambda a weak reference to self to avoid circular 244 # reference. 245 weak_self = weakref.ref(self) 246 self.sensor.listen(lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data)) # pyright: ignore[reportArgumentType] 247 248 @staticmethod 249 def _IMU_callback(weak_self: "weakref.ref[IMUSensor]", sensor_data: carla.IMUMeasurement): 250 self = weak_self() 251 if not self: 252 return 253 limits = (-99.9, 99.9) 254 self.accelerometer = ( 255 max(limits[0], min(limits[1], sensor_data.accelerometer.x)), 256 max(limits[0], min(limits[1], sensor_data.accelerometer.y)), 257 max(limits[0], min(limits[1], sensor_data.accelerometer.z)), 258 ) 259 self.gyroscope = ( 260 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))), 261 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))), 262 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z))), 263 ) 264 self.compass = math.degrees(sensor_data.compass)