1"""
2Sensor classes used by the examples in the CARLA repository.
3These classes are true to the original, however might have slight
4modifications like type hints and the :py:class:`.CustomSensorInterface`
5as base.
6"""
7
8import collections
9import math
10import weakref
11
12import carla
13
14from classes.sensors._sensor_interface import CustomSensorInterface
15
16__all__ = ["CollisionSensor", "GnssSensor", "IMUSensor", "LaneInvasionSensor", "RadarSensor"]
17
18from typing import TYPE_CHECKING
19
20if TYPE_CHECKING:
21 from classes.ui.hud import HUD
22
23# ==============================================================================
24# -- CollisionSensor -----------------------------------------------------------
25# ==============================================================================
26
27
[docs]
28class CollisionSensor(CustomSensorInterface):
29 """
30 Wrapper class for CARLA collision sensors
31
32 See Also:
33 https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector
34 """
35
36 def __init__(self, parent_actor: carla.Actor, hud: "HUD"):
37 """Constructor method"""
38 self.history: "list[tuple[int, float]]" = []
39 self._parent = parent_actor
40 self.hud: "HUD" = hud
41 world = self._parent.get_world()
42 blueprint = world.get_blueprint_library().find("sensor.other.collision")
43 self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent) # type: ignore
44 # We need to pass the lambda a weak reference to
45 # self to avoid circular reference.
46 weak_self = weakref.ref(self)
47 self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) # pyright: ignore[reportArgumentType]
48
[docs]
49 def get_collision_history(self):
50 """Gets the history of collisions"""
51 history: dict[int, float] = collections.defaultdict(float)
52 for frame, intensity in self.history:
53 history[frame] += intensity
54 return history
55
56 @staticmethod
57 def _on_collision(weak_self: "weakref.ref[CollisionSensor]", event: carla.CollisionEvent):
58 """On collision method"""
59 self = weak_self()
60 if not self:
61 return
62 from classes.ui.hud import get_actor_display_name # lazy import to avoid circular import # noqa: PLC0415
63
64 actor_type = get_actor_display_name(event.other_actor)
65 self.hud.notification(f"Collision with {actor_type!r}")
66 impulse = event.normal_impulse
67 intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)
68 self.history.append((event.frame, intensity))
69 if len(self.history) > 4000:
70 self.history.pop(0)
71
72
73# ==============================================================================
74# -- LaneInvasionSensor --------------------------------------------------------
75# ==============================================================================
76
77
[docs]
78class LaneInvasionSensor(CustomSensorInterface):
79 """
80 Wrapper class for CARLA lane invasion sensors
81
82 See Also:
83 https://carla.readthedocs.io/en/latest/ref_sensors/#lane-invasion-detector
84 """
85
86 def __init__(self, parent_actor: carla.Actor, hud: "HUD"):
87 """Constructor method"""
88 self._parent = parent_actor
89 self.hud = hud
90 world = self._parent.get_world()
91 bp = world.get_blueprint_library().find("sensor.other.lane_invasion")
92 self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) # type: ignore
93 # We need to pass the lambda a weak reference to self to avoid circular
94 # reference.
95 weak_self = weakref.ref(self)
96 self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) # pyright: ignore[reportArgumentType]
97
98 @staticmethod
99 def _on_invasion(weak_self: "weakref.ref[LaneInvasionSensor]", event: carla.LaneInvasionEvent):
100 """On invasion method"""
101 self = weak_self()
102 if not self:
103 return
104 lane_types = {x.type for x in event.crossed_lane_markings}
105 text = [repr(str(x).split()[-1]) for x in lane_types]
106 self.hud.notification("Crossed line {}".format(" and ".join(text)))
107
108
109# ==============================================================================
110# -- GnssSensor --------------------------------------------------------
111# ==============================================================================
112
113
[docs]
114class GnssSensor(CustomSensorInterface):
115 """
116 Wrapper class for CARLA GNSS sensors
117
118 See Also:
119 https://carla.readthedocs.io/en/latest/ref_sensors/#gnss-sensor
120 """
121
122 def __init__(self, parent_actor: carla.Actor):
123 """Constructor method"""
124 self._parent = parent_actor
125 self.lat = 0.0
126 self.lon = 0.0
127 world = self._parent.get_world()
128 blueprint = world.get_blueprint_library().find("sensor.other.gnss")
129 self.sensor = world.spawn_actor(
130 blueprint,
131 carla.Transform(carla.Location(x=1.0, z=2.8)), # type: ignore
132 attach_to=self._parent,
133 )
134 # We need to pass the lambda a weak reference to
135 # self to avoid circular reference.
136 weak_self = weakref.ref(self)
137 self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) # pyright: ignore[reportArgumentType]
138
139 @staticmethod
140 def _on_gnss_event(weak_self: "weakref.ref[GnssSensor]", event: carla.GnssMeasurement):
141 """GNSS method"""
142 self = weak_self()
143 if not self:
144 return
145 self.lat = event.latitude
146 self.lon = event.longitude
147
148
149# ==============================================================================
150# -- RadarSensor ---------------------------------------------------------------
151# ==============================================================================
152
153
[docs]
154class RadarSensor(CustomSensorInterface):
155 """
156 Wrapper class for CARLA radar sensors
157
158 See Also:
159 https://carla.readthedocs.io/en/latest/ref_sensors/#radar-sensor
160 """
161
162 def __init__(self, parent_actor: carla.Actor):
163 self._parent = parent_actor
164 bound_x = 0.5 + self._parent.bounding_box.extent.x
165 # bound_y = 0.5 + self._parent.bounding_box.extent.y
166 bound_z = 0.5 + self._parent.bounding_box.extent.z
167
168 self.velocity_range = 7.5 # m/s
169 world = self._parent.get_world()
170 self.debug = world.debug
171 bp = world.get_blueprint_library().find("sensor.other.radar")
172 bp.set_attribute("horizontal_fov", str(35))
173 bp.set_attribute("vertical_fov", str(20))
174 self.sensor = world.spawn_actor( # type: ignore
175 bp,
176 carla.Transform(carla.Location(x=bound_x + 0.05, z=bound_z + 0.05), carla.Rotation(pitch=5)),
177 attach_to=self._parent,
178 )
179 # We need a weak reference to self to avoid circular reference.
180 weak_self = weakref.ref(self)
181 self.sensor.listen(lambda radar_data: RadarSensor._Radar_callback(weak_self, radar_data)) # pyright: ignore[reportArgumentType]
182
183 @staticmethod
184 def _Radar_callback(weak_self: "weakref.ref[RadarSensor]", radar_data: carla.RadarMeasurement):
185 self = weak_self()
186 if not self:
187 return
188 # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]:
189 # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4'))
190 # points = np.reshape(points, (len(radar_data), 4))
191
192 current_rot = radar_data.transform.rotation
193 for detect in radar_data:
194 azi = math.degrees(detect.azimuth)
195 alt = math.degrees(detect.altitude)
196 # The 0.25 adjusts a bit the distance so the dots can
197 # be properly seen
198 fw_vec = carla.Vector3D(x=detect.depth - 0.25)
199 carla.Transform(
200 carla.Location(),
201 carla.Rotation(pitch=current_rot.pitch + alt, yaw=current_rot.yaw + azi, roll=current_rot.roll),
202 ).transform(fw_vec)
203
204 def clamp(min_v, max_v, value):
205 return max(min_v, min(value, max_v))
206
207 norm_velocity = detect.velocity / self.velocity_range # range [-1, 1]
208 r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0)
209 g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0)
210 b = int(abs(clamp(-1.0, 0.0, -1.0 - norm_velocity)) * 255.0)
211 self.debug.draw_point(
212 radar_data.transform.location + fw_vec, # pyright: ignore[reportArgumentType]
213 size=0.075,
214 life_time=0.06,
215 persistent_lines=False,
216 color=carla.Color(r, g, b),
217 )
218
219
220# ==============================================================================
221# -- IMUSensor -----------------------------------------------------------------
222# ==============================================================================
223
224
[docs]
225class IMUSensor(CustomSensorInterface):
226 """
227 Wrapper class for CARLA IMU sensors
228
229 See Also:
230 https://carla.readthedocs.io/en/latest/ref_sensors/#imu-sensor
231 """
232
233 def __init__(self, parent_actor: carla.Actor):
234 self._parent = parent_actor
235 self.accelerometer = (0.0, 0.0, 0.0)
236 self.gyroscope = (0.0, 0.0, 0.0)
237 self.compass = 0.0
238 world = self._parent.get_world()
239 bp = world.get_blueprint_library().find("sensor.other.imu")
240 self.sensor = world.spawn_actor( # type: ignore
241 bp, carla.Transform(), attach_to=self._parent
242 )
243 # We need to pass the lambda a weak reference to self to avoid circular
244 # reference.
245 weak_self = weakref.ref(self)
246 self.sensor.listen(lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data)) # pyright: ignore[reportArgumentType]
247
248 @staticmethod
249 def _IMU_callback(weak_self: "weakref.ref[IMUSensor]", sensor_data: carla.IMUMeasurement):
250 self = weak_self()
251 if not self:
252 return
253 limits = (-99.9, 99.9)
254 self.accelerometer = (
255 max(limits[0], min(limits[1], sensor_data.accelerometer.x)),
256 max(limits[0], min(limits[1], sensor_data.accelerometer.y)),
257 max(limits[0], min(limits[1], sensor_data.accelerometer.z)),
258 )
259 self.gyroscope = (
260 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))),
261 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))),
262 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z))),
263 )
264 self.compass = math.degrees(sensor_data.compass)