1"""
2Sensor classes used by the examples in the CARLA repository.
3These classes are true to the original, however might have slight
4modifications like type hints and the :py:class:`.CustomSensorInterface`
5as base.
6"""
7
8import collections
9import math
10import weakref
11
12import carla
13
14from classes._sensor_interface import CustomSensorInterface
15
16__all__ = [
17 'CollisionSensor',
18 'GnssSensor',
19 'IMUSensor',
20 'LaneInvasionSensor',
21 'RadarSensor'
22]
23
24from typing import TYPE_CHECKING
25
26if TYPE_CHECKING:
27 from classes.hud import HUD
28
29# ==============================================================================
30# -- CollisionSensor -----------------------------------------------------------
31# ==============================================================================
32
33
[docs]
34class CollisionSensor(CustomSensorInterface):
35 """
36 Wrapper class for CARLA collision sensors
37
38 See Also:
39 https://carla.readthedocs.io/en/latest/ref_sensors/#collision-detector
40 """
41
42 def __init__(self, parent_actor: carla.Actor, hud: "HUD"):
43 """Constructor method"""
44 self.history: "list[tuple[int, float]]" = []
45 self._parent = parent_actor
46 self.hud: "HUD" = hud
47 world = self._parent.get_world()
48 blueprint = world.get_blueprint_library().find('sensor.other.collision')
49 self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent) # type: ignore
50 # We need to pass the lambda a weak reference to
51 # self to avoid circular reference.
52 weak_self = weakref.ref(self)
53 self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) # pyright: ignore[reportArgumentType]
54
[docs]
55 def get_collision_history(self):
56 """Gets the history of collisions"""
57 history: dict[int, float] = collections.defaultdict(float)
58 for frame, intensity in self.history:
59 history[frame] += intensity
60 return history
61
62 @staticmethod
63 def _on_collision(weak_self: "weakref.ref[CollisionSensor]", event: carla.CollisionEvent):
64 """On collision method"""
65 self = weak_self()
66 if not self:
67 return
68 from classes.hud import get_actor_display_name # lazy import to avoid circular import # noqa: PLC0415
69 actor_type = get_actor_display_name(event.other_actor)
70 self.hud.notification(f'Collision with {actor_type!r}')
71 impulse = event.normal_impulse
72 intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2)
73 self.history.append((event.frame, intensity))
74 if len(self.history) > 4000:
75 self.history.pop(0)
76
77
78# ==============================================================================
79# -- LaneInvasionSensor --------------------------------------------------------
80# ==============================================================================
81
82
[docs]
83class LaneInvasionSensor(CustomSensorInterface):
84 """
85 Wrapper class for CARLA lane invasion sensors
86
87 See Also:
88 https://carla.readthedocs.io/en/latest/ref_sensors/#lane-invasion-detector
89 """
90
91 def __init__(self, parent_actor: carla.Actor, hud: "HUD"):
92 """Constructor method"""
93 self._parent = parent_actor
94 self.hud = hud
95 world = self._parent.get_world()
96 bp = world.get_blueprint_library().find('sensor.other.lane_invasion')
97 self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) # type: ignore
98 # We need to pass the lambda a weak reference to self to avoid circular
99 # reference.
100 weak_self = weakref.ref(self)
101 self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) # pyright: ignore[reportArgumentType]
102
103 @staticmethod
104 def _on_invasion(weak_self: "weakref.ref[LaneInvasionSensor]", event: carla.LaneInvasionEvent):
105 """On invasion method"""
106 self = weak_self()
107 if not self:
108 return
109 lane_types = {x.type for x in event.crossed_lane_markings}
110 text = [repr(str(x).split()[-1]) for x in lane_types]
111 self.hud.notification('Crossed line {}'.format(' and '.join(text)))
112
113
114# ==============================================================================
115# -- GnssSensor --------------------------------------------------------
116# ==============================================================================
117
118
[docs]
119class GnssSensor(CustomSensorInterface):
120 """
121 Wrapper class for CARLA GNSS sensors
122
123 See Also:
124 https://carla.readthedocs.io/en/latest/ref_sensors/#gnss-sensor
125 """
126
127 def __init__(self, parent_actor: carla.Actor):
128 """Constructor method"""
129 self._parent = parent_actor
130 self.lat = 0.0
131 self.lon = 0.0
132 world = self._parent.get_world()
133 blueprint = world.get_blueprint_library().find('sensor.other.gnss')
134 self.sensor = world.spawn_actor(blueprint, carla.Transform(carla.Location(x=1.0, z=2.8)), # type: ignore
135 attach_to=self._parent)
136 # We need to pass the lambda a weak reference to
137 # self to avoid circular reference.
138 weak_self = weakref.ref(self)
139 self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) # pyright: ignore[reportArgumentType]
140
141 @staticmethod
142 def _on_gnss_event(weak_self: "weakref.ref[GnssSensor]", event: carla.GnssMeasurement):
143 """GNSS method"""
144 self = weak_self()
145 if not self:
146 return
147 self.lat = event.latitude
148 self.lon = event.longitude
149
150# ==============================================================================
151# -- RadarSensor ---------------------------------------------------------------
152# ==============================================================================
153
154
[docs]
155class RadarSensor(CustomSensorInterface):
156 """
157 Wrapper class for CARLA radar sensors
158
159 See Also:
160 https://carla.readthedocs.io/en/latest/ref_sensors/#radar-sensor
161 """
162
163 def __init__(self, parent_actor: carla.Actor):
164 self._parent = parent_actor
165 bound_x = 0.5 + self._parent.bounding_box.extent.x
166 #bound_y = 0.5 + self._parent.bounding_box.extent.y
167 bound_z = 0.5 + self._parent.bounding_box.extent.z
168
169 self.velocity_range = 7.5 # m/s
170 world = self._parent.get_world()
171 self.debug = world.debug
172 bp = world.get_blueprint_library().find('sensor.other.radar')
173 bp.set_attribute('horizontal_fov', str(35))
174 bp.set_attribute('vertical_fov', str(20))
175 self.sensor = world.spawn_actor( # type: ignore
176 bp,
177 carla.Transform(
178 carla.Location(x=bound_x + 0.05, z=bound_z + 0.05),
179 carla.Rotation(pitch=5)),
180 attach_to=self._parent)
181 # We need a weak reference to self to avoid circular reference.
182 weak_self = weakref.ref(self)
183 self.sensor.listen(
184 lambda radar_data: RadarSensor._Radar_callback(weak_self, radar_data)) # pyright: ignore[reportArgumentType]
185
186 @staticmethod
187 def _Radar_callback(weak_self: "weakref.ref[RadarSensor]", radar_data: carla.RadarMeasurement):
188 self = weak_self()
189 if not self:
190 return
191 # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]:
192 # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4'))
193 # points = np.reshape(points, (len(radar_data), 4))
194
195 current_rot = radar_data.transform.rotation
196 for detect in radar_data:
197 azi = math.degrees(detect.azimuth)
198 alt = math.degrees(detect.altitude)
199 # The 0.25 adjusts a bit the distance so the dots can
200 # be properly seen
201 fw_vec = carla.Vector3D(x=detect.depth - 0.25)
202 carla.Transform(
203 carla.Location(),
204 carla.Rotation(
205 pitch=current_rot.pitch + alt,
206 yaw=current_rot.yaw + azi,
207 roll=current_rot.roll)).transform(fw_vec)
208
209 def clamp(min_v, max_v, value):
210 return max(min_v, min(value, max_v))
211
212 norm_velocity = detect.velocity / self.velocity_range # range [-1, 1]
213 r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0)
214 g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0)
215 b = int(abs(clamp(- 1.0, 0.0, - 1.0 - norm_velocity)) * 255.0)
216 self.debug.draw_point(
217 radar_data.transform.location + fw_vec, # pyright: ignore[reportArgumentType]
218 size=0.075,
219 life_time=0.06,
220 persistent_lines=False,
221 color=carla.Color(r, g, b))
222
223
224# ==============================================================================
225# -- IMUSensor -----------------------------------------------------------------
226# ==============================================================================
227
[docs]
228class IMUSensor(CustomSensorInterface):
229 """
230 Wrapper class for CARLA IMU sensors
231
232 See Also:
233 https://carla.readthedocs.io/en/latest/ref_sensors/#imu-sensor
234 """
235
236 def __init__(self, parent_actor: carla.Actor):
237 self._parent = parent_actor
238 self.accelerometer = (0.0, 0.0, 0.0)
239 self.gyroscope = (0.0, 0.0, 0.0)
240 self.compass = 0.0
241 world = self._parent.get_world()
242 bp = world.get_blueprint_library().find('sensor.other.imu')
243 self.sensor = world.spawn_actor( # type: ignore
244 bp, carla.Transform(), attach_to=self._parent)
245 # We need to pass the lambda a weak reference to self to avoid circular
246 # reference.
247 weak_self = weakref.ref(self)
248 self.sensor.listen(
249 lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data)) # pyright: ignore[reportArgumentType]
250
251 @staticmethod
252 def _IMU_callback(weak_self: "weakref.ref[IMUSensor]", sensor_data: carla.IMUMeasurement):
253 self = weak_self()
254 if not self:
255 return
256 limits = (-99.9, 99.9)
257 self.accelerometer = (
258 max(limits[0], min(limits[1], sensor_data.accelerometer.x)),
259 max(limits[0], min(limits[1], sensor_data.accelerometer.y)),
260 max(limits[0], min(limits[1], sensor_data.accelerometer.z)))
261 self.gyroscope = (
262 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))),
263 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))),
264 max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z))))
265 self.compass = math.degrees(sensor_data.compass)