1import weakref
2from threading import Thread
3from typing import TYPE_CHECKING, ClassVar, List, Optional, cast
4
5import carla
6import numpy as np
7import pygame
8from carla import ColorConverter as cc
9from carla import AttachmentType
10from typing_extensions import Self, Final
11
12from agents.tools import logger
13from agents.tools.hints import CameraBlueprint
14from classes import MockDummy
15from classes.sensors import CustomSensorInterface
16from classes.ui import spectator_follow_actor
17from launch_tools import CarlaDataProvider, class_or_instance_method
18
19from . import _follow_car_event
20
21if TYPE_CHECKING:
22 from agents.tools.config_creation import LaunchConfig
23 from classes.ui.hud import HUD
24
25__all__ = [
26 "CameraBlueprints",
27 "CameraBlueprintsSimple",
28 "CameraManager",
29]
30
31
32CameraBlueprints: Final = {
33 "Camera RGB": CameraBlueprint("sensor.camera.rgb", cc.Raw, "Camera RGB"),
34 "Camera Depth (Raw)": CameraBlueprint("sensor.camera.depth", cc.Raw, "Camera Depth (Raw)"),
35 "Camera Depth (Gray Scale)": CameraBlueprint("sensor.camera.depth", cc.Depth, "Camera Depth (Gray Scale)"),
36 "Camera Depth (Logarithmic Gray Scale)": CameraBlueprint(
37 "sensor.camera.depth", cc.LogarithmicDepth, "Camera Depth (Logarithmic Gray Scale)"
38 ),
39 "Camera Semantic Segmentation (Raw)": CameraBlueprint(
40 "sensor.camera.semantic_segmentation", cc.Raw, "Camera Semantic Segmentation (Raw)"
41 ),
42 "Camera Semantic Segmentation (CityScapes Palette)": CameraBlueprint(
43 "sensor.camera.semantic_segmentation", cc.CityScapesPalette, "Camera Semantic Segmentation (CityScapes Palette)"
44 ),
45 "Lidar (Ray-Cast)": CameraBlueprint("sensor.lidar.ray_cast", carla.ColorConverter.Raw, "Lidar (Ray-Cast)"),
46}
47"""
48Camera blueprints used by the CARLA examples.
49
50Todo:
51 Should be replaced by :py:class:`.CameraConfig.camera_blueprints`
52"""
53
54CameraBlueprintsSimple: List[CameraBlueprint] = [CameraBlueprints["Camera RGB"]]
55"""Just a single RGB camera. Default for :py:meth:`.CameraManager.sensors`"""
56
57
58# ==============================================================================
59# -- CameraManager -------------------------------------------------------------
60# ==============================================================================
61
62
[docs]
63class CameraManager(MockDummy.CanBeDummy, CustomSensorInterface):
64 """Class for camera management"""
65
66 default_blueprints: ClassVar[List[CameraBlueprint]] = list(CameraBlueprints.values())
67 """
68 Cameras are attached to the ego vehicle by default if **sensors** is not set or an empty list.
69
70 Attentions:
71 The default argument for **sensors** is a list with just a simple RGB camera.
72
73 :meta private:
74 """
75
[docs]
76 def __init__(
77 self,
78 parent_actor: carla.Actor,
79 hud: "HUD",
80 args: "LaunchConfig",
81 sensors: Optional[List[CameraBlueprint]] = CameraBlueprintsSimple,
82 ):
83 """
84 Constructor method.
85
86 :py:meth:`set_sensor` should be called after init to set :py:attr:`sensor`
87 and :py:attr:`index` to a valid value.
88
89 Parameters:
90 parent_actor: The actor to which the camera is attached.
91 sensors: The sensors for the user interface to be used with the :py:class:`.HUD`.
92 Defaults to one RGB camera.
93 """
94 self.sensor: Optional[carla.Sensor] = None # Needs call to set_sensor
95 self.index: Optional[int] = None # Needs call to set_sensor
96
97 self._surface: Optional[pygame.Surface] = None # set on _parse_image, # type: ignore
98 self._parent = parent_actor
99 self._hud: "HUD" = hud
100 self.current_frame = -1
101 self.recording = False
102 self._args = args
103 self._frame_interval = args.camera.recorder.frame_interval # todo freeze
104 self.outpath = args.camera.recorder.output_path # todo freeze
105 bound_x = 0.5 + self._parent.bounding_box.extent.x
106 bound_y = 0.5 + self._parent.bounding_box.extent.y
107 bound_z = 0.5 + self._parent.bounding_box.extent.z
108
109 # Maybe use args.camera.camera_blueprints
110 self._camera_transforms = [
111 (
112 carla.Transform(
113 carla.Location(x=-2.0 * bound_x, y=+0.0 * bound_y, z=2.0 * bound_z), carla.Rotation(pitch=8.0)
114 ),
115 AttachmentType.SpringArmGhost,
116 ),
117 (
118 carla.Transform(carla.Location(x=+0.8 * bound_x, y=+0.0 * bound_y, z=1.3 * bound_z)),
119 AttachmentType.Rigid,
120 ),
121 (
122 carla.Transform(carla.Location(x=+1.9 * bound_x, y=+1.0 * bound_y, z=1.2 * bound_z)),
123 AttachmentType.SpringArmGhost,
124 ),
125 (
126 carla.Transform(
127 carla.Location(x=-2.8 * bound_x, y=+0.0 * bound_y, z=4.6 * bound_z), carla.Rotation(pitch=6.0)
128 ),
129 AttachmentType.SpringArmGhost,
130 ),
131 (carla.Transform(carla.Location(x=-1.0, y=-1.0 * bound_y, z=0.4 * bound_z)), AttachmentType.Rigid),
132 ]
133
134 self.transform_index = 1
135 # NOTE: These are remnants from the original code, for our purpose most sensors are not relevant
136 # -> Move to globals or some config which should be used (also saves resources)
137 self.sensors = sensors if sensors is not None else self.default_blueprints.copy()
138 bp_library = CarlaDataProvider._blueprint_library
139 for i, item in enumerate(self.sensors):
140 try:
141 if item.actual_blueprint is not None:
142 continue
143 except AttributeError: # not a named tuple
144 pass
145 blp = bp_library.find(item[0])
146 if item[0].startswith("sensor.camera"):
147 blp.set_attribute("image_size_x", str(hud.dim[0]))
148 blp.set_attribute("image_size_y", str(hud.dim[1]))
149 if blp.has_attribute("gamma"):
150 blp.set_attribute("gamma", str(args.camera.gamma))
151 elif item[0].startswith("sensor.lidar"):
152 blp.set_attribute("range", "50")
153 try:
154 # Named tuple
155 self.sensors[i] = item._replace(actual_blueprint=blp) # update with actual blueprint added
156 except AttributeError:
157 self.sensors[i] = CameraBlueprint(item[0], item[1], item[2], blp)
158
159 # Update spectator in UE editor.
160 if args.camera.spectator:
161 self.follow_actor(self._parent)
162
[docs]
163 def toggle_camera(self) -> None:
164 """Activate a camera"""
165 self.transform_index = (self.transform_index + 1) % len(self._camera_transforms)
166 self.set_sensor(self.index if self.index is not None else 0, notify=False, force_respawn=True)
167
[docs]
168 def set_sensor(self, index: Optional[int], notify=True, force_respawn=False) -> None:
169 """Set the sensor that should be used for the camera output"""
170 index = index or 0
171 index = index % len(self.sensors)
172 needs_respawn = (
173 True if self.index is None else (force_respawn or (self.sensors[index][0] != self.sensors[self.index][0]))
174 )
175 if needs_respawn:
176 if self.sensor is not None:
177 self.destroy()
178 self._surface = None
179 self.sensor = cast(
180 "carla.Sensor",
181 CarlaDataProvider.get_world().spawn_actor(
182 self.sensors[index][-1], # type: ignore
183 self._camera_transforms[self.transform_index][0],
184 attach_to=self._parent,
185 attachment_type=self._camera_transforms[self.transform_index][1],
186 ),
187 )
188
189 # We need to pass the lambda a weak reference to
190 # self to avoid circular reference.
191 weak_self = weakref.ref(self)
192 self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) # type: ignore[arg-type]
193 if notify:
194 self._hud.notification(self.sensors[index][2])
195 self.index = index
196
[docs]
197 def next_sensor(self) -> None:
198 """Get the next sensor"""
199 self.set_sensor(self.index + 1 if self.index is not None else None)
200
201 def toggle_recording(self) -> None:
202 """
203 Toggle recording on or off
204
205 Note:
206 Currently requires :py:attr:`.LaunchConfig.pygame` to be set to :code:`True`.
207
208 .. deprecated:
209 Superseded by WorldMode.toggle_recording
210
211 :meta private:
212 """
213 self.recording = not self.recording
214 self._hud.notification("Recording %s" % ("On" if self.recording else "Off"))
215
[docs]
216 def destroy(self) -> None:
217 super().destroy()
218 _follow_car_event.set()
219 self.index = None # type: ignore
220 self._surface = None # type: ignore
221
[docs]
222 def render(self, display: pygame.surface.Surface) -> None:
223 """Renders method the current camera image"""
224 if self._surface is not None:
225 display.blit(self._surface, (0, 0))
226
227 @staticmethod
228 def _parse_image(weak_self: "weakref.ref[CameraManager]", image: carla.Image):
229 self = weak_self()
230 if not self:
231 return
232 index: int = self.index # type: ignore[assignment]
233 if self.sensors[index][0].startswith("sensor.lidar"):
234 points = np.frombuffer(image.raw_data, dtype=np.dtype("f4"))
235 points = np.reshape(points, (int(points.shape[0] / 4), 4))
236 lidar_data = np.array(points[:, :2])
237 lidar_data *= min(self._hud.dim) / 100.0
238 lidar_data += (0.5 * self._hud.dim[0], 0.5 * self._hud.dim[1])
239 lidar_data = np.fabs(lidar_data) # pylint: disable=assignment-from-no-return
240 lidar_data = lidar_data.astype(np.int32)
241 lidar_data = np.reshape(lidar_data, (-1, 2))
242 lidar_img_size = (self._hud.dim[0], self._hud.dim[1], 3)
243 lidar_img = np.zeros(lidar_img_size)
244 lidar_img[tuple(lidar_data.T)] = (255, 255, 255)
245 self._surface = pygame.surfarray.make_surface(lidar_img)
246 else:
247 image.convert(self.sensors[index][1]) # apply color converter
248 array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
249 array = np.reshape(array, (image.height, image.width, 4))
250 array = array[:, :, :3]
251 array = array[:, :, ::-1]
252 self._surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
253 # Deprecated: Recording is done on the WorldModel
254 if self.recording and (
255 (image.frame % self._frame_interval) == 0 or self.current_frame + self._frame_interval < image.frame
256 ):
257 print("Saving image to disk", self.outpath % image.frame)
258 image.save_to_disk(self.outpath % image.frame)
259 self.current_frame = image.frame
260
261 _spectator_thread: Thread
262 """Thread for the spectator to follow the ego vehicle."""
263
[docs]
264 @class_or_instance_method
265 def follow_actor(
266 cls_or_self: "Self | type[Self]", actor: Optional[carla.Actor] = None, updater=spectator_follow_actor
267 ) -> None:
268 """
269 Follows the actor with the spectator view.
270
271 Parameters:
272 actor: The actor to follow. Defaults to the **parent_actor**.
273 updater: The function to update the camera view. Defaults to :py:func:`camera_follow_actor`.
274 """
275 actor = actor or getattr(cls_or_self, "_parent", None)
276 if actor is None:
277 raise ValueError("No actor to follow")
278 logger.log(0, "Starting spectator thread")
279 cls_or_self._spectator_thread = Thread(target=updater, args=(actor,), daemon=True)
280 cls_or_self._spectator_thread.start()
281
[docs]
282 @staticmethod
283 def stop_following_actor() -> None:
284 _follow_car_event.set()
285
[docs]
286 def stop(self) -> None:
287 self.stop_following_actor()
288 return super().stop()
289
290
291# ==============================================================================