Source code for classes.carla_originals.sensors

  1"""
  2Sensor classes used by the examples in the CARLA repository.
  3These classes are true to the original, however might have slight
  4modifications like type hints and the :py:class:`.CustomSensorInterface`
  5as base.
  6"""
  7
  8import collections
  9import math
 10import weakref
 11
 12import carla
 13
 14from classes._sensor_interface import CustomSensorInterface
 15
 16__all__ = [
 17    'CollisionSensor',
 18    'GnssSensor',
 19    'IMUSensor',
 20    'LaneInvasionSensor',
 21    'RadarSensor'
 22]
 23
 24from typing import TYPE_CHECKING
 25
 26if TYPE_CHECKING:
 27    from classes.hud import HUD
 28
 29# ==============================================================================
 30# -- CollisionSensor -----------------------------------------------------------
 31# ==============================================================================
 32
 33
[docs] 34class CollisionSensor(CustomSensorInterface): 35 """ 36 Wrapper class for CARLA collision sensors 37 38 See Also: 39 https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector 40 """ 41 42 def __init__(self, parent_actor: carla.Actor, hud: "HUD"): 43 """Constructor method""" 44 self.history: "list[tuple[int, float]]" = [] 45 self._parent = parent_actor 46 self.hud: "HUD" = hud 47 world = self._parent.get_world() 48 blueprint = world.get_blueprint_library().find('sensor.other.collision') 49 self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent) # type: ignore 50 # We need to pass the lambda a weak reference to 51 # self to avoid circular reference. 52 weak_self = weakref.ref(self) 53 self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) # pyright: ignore[reportArgumentType] 54
[docs] 55 def get_collision_history(self): 56 """Gets the history of collisions""" 57 history: dict[int, float] = collections.defaultdict(float) 58 for frame, intensity in self.history: 59 history[frame] += intensity 60 return history
61 62 @staticmethod 63 def _on_collision(weak_self: "weakref.ref[CollisionSensor]", event: carla.CollisionEvent): 64 """On collision method""" 65 self = weak_self() 66 if not self: 67 return 68 from classes.hud import get_actor_display_name # lazy import to avoid circular import # noqa: PLC0415 69 actor_type = get_actor_display_name(event.other_actor) 70 self.hud.notification(f'Collision with {actor_type!r}') 71 impulse = event.normal_impulse 72 intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2) 73 self.history.append((event.frame, intensity)) 74 if len(self.history) > 4000: 75 self.history.pop(0)
76 77 78# ============================================================================== 79# -- LaneInvasionSensor -------------------------------------------------------- 80# ============================================================================== 81 82
[docs] 83class LaneInvasionSensor(CustomSensorInterface): 84 """ 85 Wrapper class for CARLA lane invasion sensors 86 87 See Also: 88 https://carla.readthedocs.io/en/latest/ref_sensors/#lane-invasion-detector 89 """ 90 91 def __init__(self, parent_actor: carla.Actor, hud: "HUD"): 92 """Constructor method""" 93 self._parent = parent_actor 94 self.hud = hud 95 world = self._parent.get_world() 96 bp = world.get_blueprint_library().find('sensor.other.lane_invasion') 97 self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) # type: ignore 98 # We need to pass the lambda a weak reference to self to avoid circular 99 # reference. 100 weak_self = weakref.ref(self) 101 self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) # pyright: ignore[reportArgumentType] 102 103 @staticmethod 104 def _on_invasion(weak_self: "weakref.ref[LaneInvasionSensor]", event: carla.LaneInvasionEvent): 105 """On invasion method""" 106 self = weak_self() 107 if not self: 108 return 109 lane_types = {x.type for x in event.crossed_lane_markings} 110 text = [repr(str(x).split()[-1]) for x in lane_types] 111 self.hud.notification('Crossed line {}'.format(' and '.join(text)))
112 113 114# ============================================================================== 115# -- GnssSensor -------------------------------------------------------- 116# ============================================================================== 117 118
[docs] 119class GnssSensor(CustomSensorInterface): 120 """ 121 Wrapper class for CARLA GNSS sensors 122 123 See Also: 124 https://carla.readthedocs.io/en/latest/ref_sensors/#gnss-sensor 125 """ 126 127 def __init__(self, parent_actor: carla.Actor): 128 """Constructor method""" 129 self._parent = parent_actor 130 self.lat = 0.0 131 self.lon = 0.0 132 world = self._parent.get_world() 133 blueprint = world.get_blueprint_library().find('sensor.other.gnss') 134 self.sensor = world.spawn_actor(blueprint, carla.Transform(carla.Location(x=1.0, z=2.8)), # type: ignore 135 attach_to=self._parent) 136 # We need to pass the lambda a weak reference to 137 # self to avoid circular reference. 138 weak_self = weakref.ref(self) 139 self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) # pyright: ignore[reportArgumentType] 140 141 @staticmethod 142 def _on_gnss_event(weak_self: "weakref.ref[GnssSensor]", event: carla.GnssMeasurement): 143 """GNSS method""" 144 self = weak_self() 145 if not self: 146 return 147 self.lat = event.latitude 148 self.lon = event.longitude
149 150# ============================================================================== 151# -- RadarSensor --------------------------------------------------------------- 152# ============================================================================== 153 154
[docs] 155class RadarSensor(CustomSensorInterface): 156 """ 157 Wrapper class for CARLA radar sensors 158 159 See Also: 160 https://carla.readthedocs.io/en/latest/ref_sensors/#radar-sensor 161 """ 162 163 def __init__(self, parent_actor: carla.Actor): 164 self._parent = parent_actor 165 bound_x = 0.5 + self._parent.bounding_box.extent.x 166 #bound_y = 0.5 + self._parent.bounding_box.extent.y 167 bound_z = 0.5 + self._parent.bounding_box.extent.z 168 169 self.velocity_range = 7.5 # m/s 170 world = self._parent.get_world() 171 self.debug = world.debug 172 bp = world.get_blueprint_library().find('sensor.other.radar') 173 bp.set_attribute('horizontal_fov', str(35)) 174 bp.set_attribute('vertical_fov', str(20)) 175 self.sensor = world.spawn_actor( # type: ignore 176 bp, 177 carla.Transform( 178 carla.Location(x=bound_x + 0.05, z=bound_z + 0.05), 179 carla.Rotation(pitch=5)), 180 attach_to=self._parent) 181 # We need a weak reference to self to avoid circular reference. 182 weak_self = weakref.ref(self) 183 self.sensor.listen( 184 lambda radar_data: RadarSensor._Radar_callback(weak_self, radar_data)) # pyright: ignore[reportArgumentType] 185 186 @staticmethod 187 def _Radar_callback(weak_self: "weakref.ref[RadarSensor]", radar_data: carla.RadarMeasurement): 188 self = weak_self() 189 if not self: 190 return 191 # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]: 192 # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4')) 193 # points = np.reshape(points, (len(radar_data), 4)) 194 195 current_rot = radar_data.transform.rotation 196 for detect in radar_data: 197 azi = math.degrees(detect.azimuth) 198 alt = math.degrees(detect.altitude) 199 # The 0.25 adjusts a bit the distance so the dots can 200 # be properly seen 201 fw_vec = carla.Vector3D(x=detect.depth - 0.25) 202 carla.Transform( 203 carla.Location(), 204 carla.Rotation( 205 pitch=current_rot.pitch + alt, 206 yaw=current_rot.yaw + azi, 207 roll=current_rot.roll)).transform(fw_vec) 208 209 def clamp(min_v, max_v, value): 210 return max(min_v, min(value, max_v)) 211 212 norm_velocity = detect.velocity / self.velocity_range # range [-1, 1] 213 r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0) 214 g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0) 215 b = int(abs(clamp(- 1.0, 0.0, - 1.0 - norm_velocity)) * 255.0) 216 self.debug.draw_point( 217 radar_data.transform.location + fw_vec, # pyright: ignore[reportArgumentType] 218 size=0.075, 219 life_time=0.06, 220 persistent_lines=False, 221 color=carla.Color(r, g, b))
222 223 224# ============================================================================== 225# -- IMUSensor ----------------------------------------------------------------- 226# ============================================================================== 227
[docs] 228class IMUSensor(CustomSensorInterface): 229 """ 230 Wrapper class for CARLA IMU sensors 231 232 See Also: 233 https://carla.readthedocs.io/en/latest/ref_sensors/#imu-sensor 234 """ 235 236 def __init__(self, parent_actor: carla.Actor): 237 self._parent = parent_actor 238 self.accelerometer = (0.0, 0.0, 0.0) 239 self.gyroscope = (0.0, 0.0, 0.0) 240 self.compass = 0.0 241 world = self._parent.get_world() 242 bp = world.get_blueprint_library().find('sensor.other.imu') 243 self.sensor = world.spawn_actor( # type: ignore 244 bp, carla.Transform(), attach_to=self._parent) 245 # We need to pass the lambda a weak reference to self to avoid circular 246 # reference. 247 weak_self = weakref.ref(self) 248 self.sensor.listen( 249 lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data)) # pyright: ignore[reportArgumentType] 250 251 @staticmethod 252 def _IMU_callback(weak_self: "weakref.ref[IMUSensor]", sensor_data: carla.IMUMeasurement): 253 self = weak_self() 254 if not self: 255 return 256 limits = (-99.9, 99.9) 257 self.accelerometer = ( 258 max(limits[0], min(limits[1], sensor_data.accelerometer.x)), 259 max(limits[0], min(limits[1], sensor_data.accelerometer.y)), 260 max(limits[0], min(limits[1], sensor_data.accelerometer.z))) 261 self.gyroscope = ( 262 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))), 263 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))), 264 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z)))) 265 self.compass = math.degrees(sensor_data.compass)