Source code for classes.ui.camera_manager

  1import weakref
  2from threading import Thread
  3from typing import TYPE_CHECKING, ClassVar, List, Optional, cast
  4
  5import carla
  6import numpy as np
  7import pygame
  8from carla import ColorConverter as cc
  9from carla import AttachmentType
 10from typing_extensions import Self, Final
 11
 12from agents.tools import logger
 13from agents.tools.hints import CameraBlueprint
 14from classes import MockDummy
 15from classes.sensors import CustomSensorInterface
 16from classes.ui import spectator_follow_actor
 17from launch_tools import CarlaDataProvider, class_or_instance_method
 18
 19from . import _follow_car_event
 20
 21if TYPE_CHECKING:
 22    from agents.tools.config_creation import LaunchConfig
 23    from classes.ui.hud import HUD
 24
 25__all__ = [
 26    "CameraBlueprints",
 27    "CameraBlueprintsSimple",
 28    "CameraManager",
 29]
 30
 31
 32CameraBlueprints: Final = {
 33    "Camera RGB": CameraBlueprint("sensor.camera.rgb", cc.Raw, "Camera RGB"),
 34    "Camera Depth (Raw)": CameraBlueprint("sensor.camera.depth", cc.Raw, "Camera Depth (Raw)"),
 35    "Camera Depth (Gray Scale)": CameraBlueprint("sensor.camera.depth", cc.Depth, "Camera Depth (Gray Scale)"),
 36    "Camera Depth (Logarithmic Gray Scale)": CameraBlueprint(
 37        "sensor.camera.depth", cc.LogarithmicDepth, "Camera Depth (Logarithmic Gray Scale)"
 38    ),
 39    "Camera Semantic Segmentation (Raw)": CameraBlueprint(
 40        "sensor.camera.semantic_segmentation", cc.Raw, "Camera Semantic Segmentation (Raw)"
 41    ),
 42    "Camera Semantic Segmentation (CityScapes Palette)": CameraBlueprint(
 43        "sensor.camera.semantic_segmentation", cc.CityScapesPalette, "Camera Semantic Segmentation (CityScapes Palette)"
 44    ),
 45    "Lidar (Ray-Cast)": CameraBlueprint("sensor.lidar.ray_cast", carla.ColorConverter.Raw, "Lidar (Ray-Cast)"),
 46}
 47"""
 48Camera blueprints used by the CARLA examples.
 49
 50Todo:
 51    Should be replaced by :py:class:`.CameraConfig.camera_blueprints`
 52"""
 53
 54CameraBlueprintsSimple: List[CameraBlueprint] = [CameraBlueprints["Camera RGB"]]
 55"""Just a single RGB camera. Default for :py:meth:`.CameraManager.sensors`"""
 56
 57
 58# ==============================================================================
 59# -- CameraManager -------------------------------------------------------------
 60# ==============================================================================
 61
 62
[docs] 63class CameraManager(MockDummy.CanBeDummy, CustomSensorInterface): 64 """Class for camera management""" 65 66 default_blueprints: ClassVar[List[CameraBlueprint]] = list(CameraBlueprints.values()) 67 """ 68 Cameras are attached to the ego vehicle by default if **sensors** is not set or an empty list. 69 70 Attentions: 71 The default argument for **sensors** is a list with just a simple RGB camera. 72 73 :meta private: 74 """ 75
[docs] 76 def __init__( 77 self, 78 parent_actor: carla.Actor, 79 hud: "HUD", 80 args: "LaunchConfig", 81 sensors: Optional[List[CameraBlueprint]] = CameraBlueprintsSimple, 82 ): 83 """ 84 Constructor method. 85 86 :py:meth:`set_sensor` should be called after init to set :py:attr:`sensor` 87 and :py:attr:`index` to a valid value. 88 89 Parameters: 90 parent_actor: The actor to which the camera is attached. 91 sensors: The sensors for the user interface to be used with the :py:class:`.HUD`. 92 Defaults to one RGB camera. 93 """ 94 self.sensor: Optional[carla.Sensor] = None # Needs call to set_sensor 95 self.index: Optional[int] = None # Needs call to set_sensor 96 97 self._surface: Optional[pygame.Surface] = None # set on _parse_image, # type: ignore 98 self._parent = parent_actor 99 self._hud: "HUD" = hud 100 self.current_frame = -1 101 self.recording = False 102 self._args = args 103 self._frame_interval = args.camera.recorder.frame_interval # todo freeze 104 self.outpath = args.camera.recorder.output_path # todo freeze 105 bound_x = 0.5 + self._parent.bounding_box.extent.x 106 bound_y = 0.5 + self._parent.bounding_box.extent.y 107 bound_z = 0.5 + self._parent.bounding_box.extent.z 108 109 # Maybe use args.camera.camera_blueprints 110 self._camera_transforms = [ 111 ( 112 carla.Transform( 113 carla.Location(x=-2.0 * bound_x, y=+0.0 * bound_y, z=2.0 * bound_z), carla.Rotation(pitch=8.0) 114 ), 115 AttachmentType.SpringArmGhost, 116 ), 117 ( 118 carla.Transform(carla.Location(x=+0.8 * bound_x, y=+0.0 * bound_y, z=1.3 * bound_z)), 119 AttachmentType.Rigid, 120 ), 121 ( 122 carla.Transform(carla.Location(x=+1.9 * bound_x, y=+1.0 * bound_y, z=1.2 * bound_z)), 123 AttachmentType.SpringArmGhost, 124 ), 125 ( 126 carla.Transform( 127 carla.Location(x=-2.8 * bound_x, y=+0.0 * bound_y, z=4.6 * bound_z), carla.Rotation(pitch=6.0) 128 ), 129 AttachmentType.SpringArmGhost, 130 ), 131 (carla.Transform(carla.Location(x=-1.0, y=-1.0 * bound_y, z=0.4 * bound_z)), AttachmentType.Rigid), 132 ] 133 134 self.transform_index = 1 135 # NOTE: These are remnants from the original code, for our purpose most sensors are not relevant 136 # -> Move to globals or some config which should be used (also saves resources) 137 self.sensors = sensors if sensors is not None else self.default_blueprints.copy() 138 bp_library = CarlaDataProvider._blueprint_library 139 for i, item in enumerate(self.sensors): 140 try: 141 if item.actual_blueprint is not None: 142 continue 143 except AttributeError: # not a named tuple 144 pass 145 blp = bp_library.find(item[0]) 146 if item[0].startswith("sensor.camera"): 147 blp.set_attribute("image_size_x", str(hud.dim[0])) 148 blp.set_attribute("image_size_y", str(hud.dim[1])) 149 if blp.has_attribute("gamma"): 150 blp.set_attribute("gamma", str(args.camera.gamma)) 151 elif item[0].startswith("sensor.lidar"): 152 blp.set_attribute("range", "50") 153 try: 154 # Named tuple 155 self.sensors[i] = item._replace(actual_blueprint=blp) # update with actual blueprint added 156 except AttributeError: 157 self.sensors[i] = CameraBlueprint(item[0], item[1], item[2], blp) 158 159 # Update spectator in UE editor. 160 if args.camera.spectator: 161 self.follow_actor(self._parent)
162
[docs] 163 def toggle_camera(self) -> None: 164 """Activate a camera""" 165 self.transform_index = (self.transform_index + 1) % len(self._camera_transforms) 166 self.set_sensor(self.index if self.index is not None else 0, notify=False, force_respawn=True)
167
[docs] 168 def set_sensor(self, index: Optional[int], notify=True, force_respawn=False) -> None: 169 """Set the sensor that should be used for the camera output""" 170 index = index or 0 171 index = index % len(self.sensors) 172 needs_respawn = ( 173 True if self.index is None else (force_respawn or (self.sensors[index][0] != self.sensors[self.index][0])) 174 ) 175 if needs_respawn: 176 if self.sensor is not None: 177 self.destroy() 178 self._surface = None 179 self.sensor = cast( 180 "carla.Sensor", 181 CarlaDataProvider.get_world().spawn_actor( 182 self.sensors[index][-1], # type: ignore 183 self._camera_transforms[self.transform_index][0], 184 attach_to=self._parent, 185 attachment_type=self._camera_transforms[self.transform_index][1], 186 ), 187 ) 188 189 # We need to pass the lambda a weak reference to 190 # self to avoid circular reference. 191 weak_self = weakref.ref(self) 192 self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) # type: ignore[arg-type] 193 if notify: 194 self._hud.notification(self.sensors[index][2]) 195 self.index = index
196
[docs] 197 def next_sensor(self) -> None: 198 """Get the next sensor""" 199 self.set_sensor(self.index + 1 if self.index is not None else None)
200 201 def toggle_recording(self) -> None: 202 """ 203 Toggle recording on or off 204 205 Note: 206 Currently requires :py:attr:`.LaunchConfig.pygame` to be set to :code:`True`. 207 208 .. deprecated: 209 Superseded by WorldMode.toggle_recording 210 211 :meta private: 212 """ 213 self.recording = not self.recording 214 self._hud.notification("Recording %s" % ("On" if self.recording else "Off")) 215
[docs] 216 def destroy(self) -> None: 217 super().destroy() 218 _follow_car_event.set() 219 self.index = None # type: ignore 220 self._surface = None # type: ignore
221
[docs] 222 def render(self, display: pygame.surface.Surface) -> None: 223 """Renders method the current camera image""" 224 if self._surface is not None: 225 display.blit(self._surface, (0, 0))
226 227 @staticmethod 228 def _parse_image(weak_self: "weakref.ref[CameraManager]", image: carla.Image): 229 self = weak_self() 230 if not self: 231 return 232 index: int = self.index # type: ignore[assignment] 233 if self.sensors[index][0].startswith("sensor.lidar"): 234 points = np.frombuffer(image.raw_data, dtype=np.dtype("f4")) 235 points = np.reshape(points, (int(points.shape[0] / 4), 4)) 236 lidar_data = np.array(points[:, :2]) 237 lidar_data *= min(self._hud.dim) / 100.0 238 lidar_data += (0.5 * self._hud.dim[0], 0.5 * self._hud.dim[1]) 239 lidar_data = np.fabs(lidar_data) # pylint: disable=assignment-from-no-return 240 lidar_data = lidar_data.astype(np.int32) 241 lidar_data = np.reshape(lidar_data, (-1, 2)) 242 lidar_img_size = (self._hud.dim[0], self._hud.dim[1], 3) 243 lidar_img = np.zeros(lidar_img_size) 244 lidar_img[tuple(lidar_data.T)] = (255, 255, 255) 245 self._surface = pygame.surfarray.make_surface(lidar_img) 246 else: 247 image.convert(self.sensors[index][1]) # apply color converter 248 array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) 249 array = np.reshape(array, (image.height, image.width, 4)) 250 array = array[:, :, :3] 251 array = array[:, :, ::-1] 252 self._surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) 253 # Deprecated: Recording is done on the WorldModel 254 if self.recording and ( 255 (image.frame % self._frame_interval) == 0 or self.current_frame + self._frame_interval < image.frame 256 ): 257 print("Saving image to disk", self.outpath % image.frame) 258 image.save_to_disk(self.outpath % image.frame) 259 self.current_frame = image.frame 260 261 _spectator_thread: Thread 262 """Thread for the spectator to follow the ego vehicle.""" 263
[docs] 264 @class_or_instance_method 265 def follow_actor( 266 cls_or_self: "Self | type[Self]", actor: Optional[carla.Actor] = None, updater=spectator_follow_actor 267 ) -> None: 268 """ 269 Follows the actor with the spectator view. 270 271 Parameters: 272 actor: The actor to follow. Defaults to the **parent_actor**. 273 updater: The function to update the camera view. Defaults to :py:func:`camera_follow_actor`. 274 """ 275 actor = actor or getattr(cls_or_self, "_parent", None) 276 if actor is None: 277 raise ValueError("No actor to follow") 278 logger.log(0, "Starting spectator thread") 279 cls_or_self._spectator_thread = Thread(target=updater, args=(actor,), daemon=True) 280 cls_or_self._spectator_thread.start()
281
[docs] 282 @staticmethod 283 def stop_following_actor() -> None: 284 _follow_car_event.set()
285
[docs] 286 def stop(self) -> None: 287 self.stop_following_actor() 288 return super().stop()
289 290 291# ==============================================================================