Source code for classes.worldmodel

   1"""
   2Interface classes between CARLA, the agent, and the user interface.
   3"""
   4# pyright: reportOptionalMemberAccess=warning
   5
   6from __future__ import annotations
   7
   8import contextlib
   9import os
  10from pathlib import Path
  11import subprocess
  12import sys
  13import time
  14import weakref
  15
  16from collections.abc import Mapping
  17from typing import TYPE_CHECKING, Any, ClassVar, Iterable, List, NoReturn, Optional, Sequence, Union, overload
  18from typing import cast as assure_type
  19
  20import carla
  21import hydra
  22from numpy import random as nprandom
  23import pygame
  24import random
  25from hydra.core.global_hydra import GlobalHydra
  26from hydra.core.hydra_config import HydraConfig
  27from hydra.core.utils import configure_log
  28from omegaconf import DictConfig, OmegaConf, open_dict
  29
  30from agents.tools.config_creation import LaunchConfig, RssLogLevel, RssRoadBoundariesMode, config_store
  31from classes import exceptions as _exceptions
  32from classes.camera_manager import CameraManager
  33from classes._sensor_interface import CustomSensorInterface
  34from classes.carla_originals.sensors import CollisionSensor, GnssSensor, IMUSensor, LaneInvasionSensor, RadarSensor
  35from classes.exceptions import AgentDoneException, ContinueLoopException
  36from classes.hud import HUD, get_actor_display_name
  37from classes.keyboard_controls import KeyboardControl, RSSKeyboardControl
  38from classes.rss_sensor import AD_RSS_AVAILABLE, RssSensor
  39from classes.rss_visualization import RssBoundingBoxVisualizer, RssUnstructuredSceneVisualizer
  40from classes.information_manager import InformationManager
  41from launch_tools import class_or_instance_method
  42
  43if TYPE_CHECKING:
  44    from hydra.conf import HydraConf
  45    from typing_extensions import Self
  46
  47    from agents.lunatic_agent import LunaticAgent
  48    from agents.navigation.global_route_planner import GlobalRoutePlanner
  49    from agents.tools.config_creation import LunaticAgentSettings, RssRoadBoundariesModeAlias
  50    from classes.type_protocols import ControllerClassT
  51    from classes.detection_matrix import DetectionMatrix
  52
  53from agents.tools.logs import logger
  54from launch_tools import CarlaDataProvider, Literal
  55from launch_tools.blueprint_helpers import get_actor_blueprints
  56
  57
[docs] 58class AccessCarlaMixin: 59 """ 60 Mixin class that delegates the attributes :py:attr:`client`, :py:attr:`map`, and :py:attr:`world` 61 to the :py:class:`.CarlaDataProvider` to keep them in sync. 62 63 Note: 64 This mixin only works for instances, they are not class attributes. 65 """ 66 67 @property 68 def client(self) -> carla.Client: 69 return CarlaDataProvider.get_client() 70 71 @client.setter 72 def client(self, value: carla.Client): 73 CarlaDataProvider.set_client(value) 74 75 @property 76 def world(self) -> carla.World: 77 return CarlaDataProvider.get_world() 78 79 @world.setter 80 def world(self, value: carla.World): 81 CarlaDataProvider.set_world(value) 82 83 @property 84 def map(self) -> carla.Map: 85 return CarlaDataProvider.get_map() 86 87 @map.setter 88 def map(self, value: carla.Map): 89 """ 90 Avoid setting the map directly. Use :py:meth:`.CarlaDataProvider.set_world` instead. 91 92 Raises: 93 ValueError: If the map is not the same as the one set in :py:attr:`.CarlaDataProvider.get_map`. 94 95 :meta private: 96 """ 97 if CarlaDataProvider.get_map() != value: 98 raise ValueError("CarlaDataProvider.get_map() and passed map are not the same.") 99 # Do nothing as map is set when using get_map or set_world 100
[docs] 101 @staticmethod 102 def get_blueprint_library() -> carla.BlueprintLibrary: 103 """ 104 Access to a cached version of the blueprint library 105 106 Raises: 107 ValueError: If the blueprint library is not set, which happens when the world is not set. 108 The world must be setup before (:py:class:`CarlaDataProvider.set_world()<.CarlaDataProvider>`). 109 """ 110 if CarlaDataProvider._blueprint_library is None: 111 raise ValueError("Blueprint Library not set. Call CarlaDataProvider.set_world() first.") 112 return CarlaDataProvider._blueprint_library
113 114# ============================================================================== 115# -- Game Framework --------------------------------------------------------------- 116# ============================================================================== 117 118
[docs] 119class GameFramework(AccessCarlaMixin, CarlaDataProvider): 120 """ 121 A utility class to setup CARLA, pygame, Hydra_, the configuration and parts of the user interface 122 and the agent. 123 124 A :py:class:`GameFramework` instance can be used to control the game loop and work as a 125 handler for the :py:mod:`exceptions` of this project. Furthermore can it manage the cooldown 126 of the :py:class:`.Rule` classes. 127 128 Note: 129 The GameFramework derives from the :py:class:`.CarlaDataProvider` which gives this class 130 more utility methods that currently are not included in this part of the documentation. 131 """ 132 133 clock: ClassVar[pygame.time.Clock] = None 134 display: ClassVar[pygame.Surface] = None 135 controller: weakref.ProxyType[RSSKeyboardControl] | RSSKeyboardControl 136 """Parser for keyboard events""" 137 138 traffic_manager: Optional[carla.TrafficManager] = None 139 140 @property 141 def launch_config(self) -> LaunchConfig: 142 """:py:class:`.LaunchConfig` object that was used for the initialization (**args**)""" 143 return self._args 144 145 @property 146 def agent_config(self) -> LunaticAgentSettings: 147 """ 148 The configuration of the attached :py:attr:`agent`, if it exists otherwise the 149 **agent** attribute of the stored :py:attr:`launch_config`. 150 """ 151 return self.agent.config if self.agent else self._args.agent 152 153 # ----- Init Functions ----- 154
[docs] 155 @classmethod 156 def quickstart(cls, launch_config: Optional[LaunchConfig] = None, *, logging: bool = False) -> "Self": 157 """ 158 Initializes Hydra_ in a limited way, i.e. does not allow for command line overrides. 159 160 Sets up the :py:class:`carla.Client` and related instances as well as pygame. 161 162 Note: 163 It is recommended that you use a :python:`@hydra.main` decorated main function instead 164 to make full use of the Hydra_ framework. 165 166 Parameters: 167 launch_config: The configuration to use. If :code:`None`, will use the default 168 configuration from :code:`./conf/launch_config.yaml`. 169 logging: If True, change the how logging is done by applying the logger settings from 170 :code:`./conf/config_extensions/job_logging.yaml`. 171 Default is :code:`False`. 172 173 Returns: 174 The initialized :py:class:`GameFramework` instance. 175 176 See Also: 177 This function uses: 178 - :py:meth:`.initialize_hydra` 179 - :py:meth:`.init_carla` 180 - :py:meth:`.init_pygame` 181 """ 182 if not launch_config: 183 launch_config = cls.initialize_hydra(logging=logging) 184 if not logging and AD_RSS_AVAILABLE: 185 launch_config.agent.rss.log_level = RssLogLevel.off 186 cls.init_carla(launch_config) 187 cls.init_pygame(launch_config) 188 return cls(launch_config)
189 190 # Hydra Tools 191 # TODO: this could be some launch_tools MixinClass
[docs] 192 @staticmethod 193 def initialize_hydra(config_dir: str = "./conf", 194 config_name: str = "launch_config", 195 version_base=None, *, 196 job_name="LunaticAgentJob", 197 logging=True, 198 structured=True) -> "LaunchConfig": 199 """ 200 Use this function only if no hydra.main is available. 201 202 Usage: 203 204 .. code-block:: python 205 206 args = GameFramework.initialize_hydra(config_dir=<abs_path_of_conf>, config_name="launch_config") 207 game_framework = GameFramework(args) 208 209 Args: 210 config_dir: The directory where the hydra configuration is stored. 211 config_name: The name of the configuration file. 212 version_base: The version base of hydra for the configuration. Default is None. 213 job_name: The name of the job. 214 logging: If True, will set up logging. 215 structured: If True will create the config based on :py:class:`.LaunchConfig`, 216 otherwise it will be based on a :python:`dict`. This is useful for runtime 217 type-checks and conversions. 218 If the configs :py:attr:`.LaunchConfig.strict_config` value is < 2, this 219 parameter is ignored. Disable if you experience problems. 220 Default is :python:`True`. 221 222 See Also: 223 Hydra functions: 224 - :py:func:`hydra.initialize_config_dir` 225 - :py:func:`hydra.compose` 226 """ 227 config_dir = os.path.abspath(config_dir) 228 hydra_initialized = GameFramework.hydra_initialized() 229 if not hydra_initialized: 230 # Not save-guarding this against multiple calls, expose the hydra error 231 # todo: low-prio check if config dir and the other parameters are the same. 232 hydra.initialize_config_dir(version_base=version_base, 233 config_dir=config_dir, 234 job_name=job_name) 235 236 dict_config = hydra.compose(config_name=config_name, 237 return_hydra_config=not hydra_initialized, 238 overrides=None) 239 if structured and dict_config.get("strict_config", 3) >= 2: 240 # Uses the correct dataclass schemas as values. 241 if config_name == "launch_config": 242 if LaunchConfig._config_path: 243 # Load defined schema from the config Store 244 cn = config_store.load(LaunchConfig._config_path) 245 schema: Optional[DictConfig] = cn.node # type: ignore[assignment] 246 with open_dict(schema): 247 schema.merge_with(dict_config) 248 config = assure_type(LaunchConfig, schema) 249 else: 250 schema = None 251 else: 252 schema = None 253 if schema is None: 254 logger.debug("No schema found for structured init file %s. Falling back to LaunchConfig", config_name) 255 launch_config = LaunchConfig(**dict_config) # type: ignore 256 config: LaunchConfig = OmegaConf.structured(launch_config, 257 flags={'allow_objects': True}) 258 else: 259 config = assure_type("LaunchConfig", dict_config) 260 261 if not hydra_initialized: 262 hydra_conf: HydraConfig = GameFramework.get_hydra_config(raw=True) 263 if OmegaConf.is_missing(config.hydra.runtime, "output_dir"): 264 config.hydra.runtime.output_dir = config.hydra.run.dir 265 hydra_conf.set_config(config) # type: ignore 266 Path(config.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True) 267 if logging: 268 # Assure that our logger works 269 configure_log(config.hydra.job_logging, logger.name) # type: ignore 270 with open_dict(config): 271 del config["hydra"] 272 config.agent._set_flag("allow_objects", True) 273 config.agent.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config. 274 return config
275 276 # TODO: Maybe unify these settings; make overrides available in the config.
[docs] 277 @staticmethod 278 def load_hydra_config(config_name: str = "conf/launch_config") -> "LaunchConfig": 279 if GameFramework.hydra_initialized(): 280 return assure_type(LaunchConfig, hydra.compose(config_name=config_name)) 281 config_dir, config_name = os.path.split(config_name) 282 # Try to get job name from file in the stack. 283 import inspect # noqa 284 frame = inspect.stack()[-1] 285 module = inspect.getmodule(frame[0]) 286 name = module.__file__ if module and module.__file__ else "unknown" 287 return GameFramework.initialize_hydra(config_dir, config_name, job_name=name)
288
[docs] 289 def __init__(self, args: "LaunchConfig", 290 config: Optional[DictConfig] = None, 291 timeout: float = 10.0, 292 worker_threads: int = 0, 293 *, map_layers=carla.MapLayer.All): 294 """ 295 Parameters: 296 args: Configuration for the GameFramework. 297 config: Optional config for the agent (unused in this project) 298 timeout: Timeout for the :py:class:`carla.Client`. 299 worker_threads: See :py:class:`carla.Client`. 300 map_layers: See :py:meth:`carla.Client.load_world` 301 """ 302 if args.seed: 303 random.seed(args.seed) 304 nprandom.seed(args.seed) 305 self._args = args 306 self.world_settings: carla.WorldSettings = self.init_carla(args, timeout, worker_threads, map_layers=map_layers) 307 308 # These are class variables 309 GameFramework.clock, GameFramework.display = self.init_pygame(args) 310 311 self.config = config 312 self.agent = None 313 self.world_model = None 314 self.controller = None 315 316 self.debug = self.world.debug 317 self.continue_loop = True 318 self.traffic_manager: Optional[carla.TrafficManager] = self.init_traffic_manager() 319 320 # Import here to avoid circular imports 321 from classes.rule import BlockingRule, Rule # noqa: PLC0415,RUF100 322 self.cooldown_framework = Rule.CooldownFramework() # used in context manager. # NOTE: Currently can be constant 323 BlockingRule._gameframework = weakref.proxy(self)
324
[docs] 325 @class_or_instance_method 326 def init_pygame(cls_or_self: "Self | type[Self]", launch_config: Optional[LaunchConfig] = None, 327 recreate: bool = False) -> tuple[pygame.time.Clock, pygame.Surface]: 328 """ 329 Parameters: 330 launch_config: Will use the :py:attr:`width<.LaunchConfig.width>` and 331 :py:attr:`height<.LaunchConfig.height> attributes of this object if set the 332 :py:mod:`pygame` windows size. Otherwise will use :python:`(1280, 720)`. 333 Defaults to :code:`None`. 334 335 recreate: If :python:`True`, will reinitialize pygame a second time if this function is 336 called. 337 338 .. experimental; returns None, None if launch_config.pygame is False. 339 """ 340 if recreate or GameFramework.clock is None or GameFramework.display is None: 341 if launch_config is None: 342 launch_config = getattr(cls_or_self, "_args", None) # get from inst. 343 if getattr(launch_config, "pygame", True): 344 pygame.init() 345 pygame.font.init() 346 GameFramework.clock = pygame.time.Clock() 347 if getattr(launch_config, "pygame", True) and "READTHEDOCS" not in os.environ: 348 GameFramework.display = pygame.display.set_mode( 349 size=(launch_config.width, launch_config.height) 350 if launch_config else (1280, 720), 351 flags=pygame.HWSURFACE | pygame.DOUBLEBUF) 352 return GameFramework.clock, GameFramework.display
353
[docs] 354 @staticmethod 355 def init_carla(args: Optional[LaunchConfig] = None, 356 timeout: Optional[float] = None, 357 worker_threads: int = 0, *, 358 map_layers: carla.MapLayer = carla.MapLayer.All) -> carla.WorldSettings: 359 """ 360 Initializes the :py:class:`carla.Client` and the connects it to the simulator. 361 362 Parameters: 363 args: The configuration for the GameFramework. 364 timeout: The timeout for the :py:class:`carla.Client`. Default is 10.0. 365 366 .. deprecated:: _ 367 Use the :py:attr:`.LaunchConfig.timeout` attribute instead. 368 369 worker_threads: See :py:class:`carla.Client`. 370 map_layers: The map layers to load. Default is :py:attr:`carla.MapLayer.All`. 371 372 Returns: 373 The settings of the loaded world. 374 375 Note: 376 This function has to be called before :py:attr:`client`, :py:attr:`world` 377 or :py:attr:`map` can be accessed. 378 Alternatively the client and :py:class:`CarlaDataProvider` need to be setup in a 379 different way beforehand. 380 381 See Also: 382 - :py:class:`carla.Client` 383 - :py:meth:`carla.Client.load_world` 384 """ 385 # Note: This sets up the CarlaDataProvider 386 if args is None: 387 timeout = timeout or 10.0 388 GameFramework.setup_client_map_and_world(timeout=timeout, 389 worker_threads=worker_threads, 390 map_layers=map_layers) 391 else: 392 GameFramework.setup_client_map_and_world(args.map, 393 args.host, args.port, 394 timeout=timeout or args.timeout, 395 worker_threads=worker_threads, 396 map_layers=map_layers, 397 sync=args.sync, 398 fps=args.fps) 399 return CarlaDataProvider.get_world().get_settings()
400
[docs] 401 @staticmethod 402 def setup_client_map_and_world( 403 map_name: str = "Town04", 404 ip: str = "127.0.0.1", 405 port: int = 2000, *, 406 timeout: float = 10.0, 407 worker_threads: int = 0, 408 reload_world: bool = False, 409 reset_settings: bool = True, 410 map_layers: carla.MapLayer = carla.MapLayer.All, 411 sync: Union[bool, None] = True, 412 fps: int = 20 413 ) -> tuple[carla.Client, carla.World, carla.Map]: 414 """ 415 Loads the :py:class:`carla.Client`, the :py:class:`carla.World`, and the :py:class:`carla.Map`. 416 417 This is a subfunction of :py:meth:`.init_carla` 418 that can be used without a :py:class:`LaunchConfig`. 419 420 See Also: 421 :py:meth:`.init_carla` 422 :py:meth:`carla.Client.set_timeout` 423 :py:meth:`carla.Client.reload_world` 424 """ 425 client = CarlaDataProvider.get_client() 426 if client is None: 427 client = carla.Client(ip, port, worker_threads) 428 client.set_timeout(timeout) 429 CarlaDataProvider.set_client(client) 430 431 world = CarlaDataProvider.get_world() 432 if not world: 433 world = client.get_world() 434 _map = world.get_map() # CarlaDataProvider map not yet set -> set_world 435 436 if map_name and _map.name != "Carla/Maps/" + map_name: 437 world: carla.World = client.load_world(map_name, reset_settings, map_layers) 438 elif reload_world: 439 world = client.reload_world(reset_settings) 440 logger.info("Reloaded world - map_layers ignored.") 441 elif map_name is None: 442 logger.info("Provided map_name is None, skipped loading world. Assuming world is already loaded.") 443 else: 444 logger.info("skipped loading world '%s', already loaded - map_layers and reset_settings ignored.", _map.name) 445 446 world_settings = world.get_settings() 447 # Apply world settings 448 if sync is not None: 449 if sync: 450 logger.debug("Using synchronous mode.") 451 # apply synchronous mode if wanted 452 world_settings.synchronous_mode = True 453 world_settings.fixed_delta_seconds = 1 / fps # 0.05 454 world.apply_settings(world_settings) 455 else: 456 logger.debug("Using asynchronous mode.") 457 world_settings.synchronous_mode = False 458 world.apply_settings(world_settings) 459 logger.info("World Settings:\n%s", world_settings) 460 461 # Note: set_world loads multiple information. It has to be called after applying the world settings. 462 CarlaDataProvider.set_world(world) 463 464 return client, world, CarlaDataProvider.get_map()
465
[docs] 466 def init_traffic_manager(self, port: Optional[int] = None) -> carla.TrafficManager: 467 """ 468 Returns an instance of the :py:class:`carla.TrafficManager` 469 related to the specified port. If it does not exist, this will be created. 470 471 See Also: 472 :py:meth:`carla.Client.get_trafficmanager` 473 474 Parameters: 475 port: The port to use. If :code:`None`, will use the port from 476 :py:meth:`.CarlaDataProvider.get_traffic_manager_port`, which defaults to :code:`8000`. 477 """ 478 if port is None: 479 port = CarlaDataProvider.get_traffic_manager_port() 480 traffic_manager = self.client.get_trafficmanager(port) 481 if self._args.handle_ticks: 482 if self._args.sync: 483 traffic_manager.set_synchronous_mode(True) 484 traffic_manager.set_hybrid_physics_mode(True) # Note default 50m 485 traffic_manager.set_hybrid_physics_radius(50.0) # TODO: make a LaunchConfig config variable 486 self.traffic_manager = traffic_manager 487 return traffic_manager
488
[docs] 489 def init_agent_and_interface(self, 490 ego: Optional[carla.Vehicle], 491 agent_class: "type[LunaticAgent]", 492 config: Optional[LunaticAgentSettings] = None, 493 overwrites: Optional[dict[str, Any]] = None 494 ) -> "tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl]": 495 """ 496 Quick setup for the agent and the world model. 497 498 Among others this executes: 499 - :py:meth:`.LunaticAgent.create_world_and_agent` 500 - :py:meth:`.GameFramework.make_controller` 501 - :py:meth:`.WorldModel.tick_server_world` 502 503 .. code-block:: python 504 505 from agents.lunatic_agent import LunaticAgent, LunaticAgentSettings 506 507 ego = world.spawn_actor(world.get_blueprint_library().find("vehicle.audi.tt")) 508 agent, world_model, global_planner, controller = ( 509 game_framework.init_agent_and_interface(ego, LunaticAgent) 510 ) 511 512 Arguments: 513 ego: The ego vehicle. Can be :code:`None` if the agent is set to use an 514 external actor (:py:attr:`.LaunchConfig.externalActor`). 515 agent_class: The agent class to instantiate. 516 config: The configuration of the agent. If :code:`None` the 517 :py:attr:`.agent_class.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` are used. 518 overwrites: Additional overwrites to the configuration. 519 """ 520 if ego is None and not self._args.externalActor: 521 raise ValueError("`ego` must be passed if ``externalActor` is not set.") 522 self.agent, self.world_model, self.global_planner \ 523 = agent_class.create_world_and_agent(self._args, 524 vehicle=ego, 525 sim_world=self.world, 526 agent_config=config, 527 overwrites=overwrites) 528 self.config = self.agent.config 529 controller = self.make_controller(self.world_model, RSSKeyboardControl, start_in_autopilot=False) # Note: stores weakref to controller 530 self.world_model.game_framework = weakref.proxy(self) 531 self.world_model.tick_server_world() 532 self.agent.verify_settings(strictness=-1) # NOTE: Here live info is already available and will throw some errors 533 return self.agent, self.world_model, self.global_planner, controller
534
[docs] 535 def make_world_model(self, config: "LunaticAgentSettings", 536 player: Optional[carla.Vehicle] = None) -> "WorldModel": 537 """ 538 Creates a :py:class:`WorldModel` with a backreference to the GameFramework. 539 """ 540 self.world_model = WorldModel(config, self._args, player=player) 541 self.world_model.game_framework = weakref.proxy(self) 542 return self.world_model
543
[docs] 544 def make_controller(self, 545 world_model: "WorldModel", 546 controller_class: type[ControllerClassT] = RSSKeyboardControl, 547 **kwargs): 548 """ 549 Creates a keyboard controller and attaches it to the world model. 550 551 Args: 552 world_model: The world model to attach the controller to. 553 controller_class: The controller class to instantiate. Defaults to :py:class:`.RSSKeyboardControl`. 554 **kwargs: Additional arguments to pass to the controller. 555 """ 556 controller = controller_class(world_model, 557 config=self.config, 558 clock=self.clock, 559 **kwargs) 560 self.controller = weakref.proxy(controller) 561 return controller # NOTE: does not return the proxy object.
562
[docs] 563 @staticmethod 564 def hydra_initialized() -> bool: 565 """ 566 Checks wether Hydra_ is initialized. This is normally only done 567 when the :py:func:`@hydra.main` decorator is used. 568 """ 569 return GlobalHydra.instance().is_initialized()
570 571 @overload 572 @staticmethod 573 def get_hydra_config(raw: Literal[False] = False) -> "HydraConf": ... 574 575 @overload 576 @staticmethod 577 def get_hydra_config(raw: Literal[True]) -> HydraConfig: ... 578
[docs] 579 @staticmethod 580 def get_hydra_config(raw: bool = False) -> "HydraConfig | HydraConf": 581 """ 582 Retrieves the Hydra_ configuration object. 583 584 Parameters: 585 raw: If :python:`True`, returns the :py:class:`hydra.conf.HydraConf` dataclass, otherwise the 586 :py:class:`hydra.core.hydra_config.HydraConfig` singleton. Default is :code:`False`. 587 588 Raises: 589 ValueError: If the HydraConfig was not set up yet and :python:`raw=True`. 590 """ 591 if raw: 592 return HydraConfig.instance() 593 return HydraConfig.get()
594 595 # ----- Setters ----- 596 597 def set_controller(self, controller: KeyboardControl) -> None: 598 """ 599 Set the :py:class:`KeyboardControl` 600 601 :meta private: 602 """ 603 self.controller = controller # type: ignore ; maybe use proxy 604 605 def set_config(self, config: DictConfig) -> None: 606 """ 607 Change the :py:attr:`config`. 608 609 :meta private: 610 """ 611 self.config = config 612 613 # ----- UI Functions ----- 614
[docs] 615 def parse_rss_controller_events(self, final_controls: carla.VehicleControl): 616 return self.controller.parse_events(final_controls)
617
[docs] 618 def render_everything(self): 619 """ 620 Update render and hud 621 622 Note: 623 This is the preferred method to update the world and render the camera. 624 """ 625 self.world_model.tick(self.clock) # NOTE: Ticks WorldMODEL not CARLA WORLD! # pyright: ignore[reportOptionalMemberAccess] 626 self.world_model.render(self.display, finalize=False) # pyright: ignore[reportOptionalMemberAccess] 627 self.controller.render(self.display) 628 # These two types must be in sync: 629 dm_render_conf: "DetectionMatrix.RenderOptions" = self._args.camera.hud.detection_matrix # type: ignore[assignment] 630 if dm_render_conf and self.agent: 631 self.agent.render_detection_matrix(self.display, **dm_render_conf) 632 self.world_model.finalize_render(self.display) # pyright: ignore[reportOptionalMemberAccess]
633
[docs] 634 @staticmethod 635 def skip_rest_of_loop(message="GameFramework.end_loop") -> NoReturn: 636 """ 637 Terminates the current iteration and exits the GameFramework by raising a :py:exc:`.ContinueLoopException`. 638 639 Note: 640 It is the users responsibility to manage the agent & local planner 641 before calling this function, i.e. that the agent has a :py:class:`carla.VehicleControl` set. 642 643 Raises: 644 ContinueLoopException: With the given **message**. 645 """ 646 # TODO: add option that still allows for rss. 647 raise ContinueLoopException(message)
648 649 # -------- Tools -------- 650
[docs] 651 @staticmethod 652 def destroy_actors(actors: Iterable[carla.Actor | CustomSensorInterface]): 653 """ 654 Destroys the given actors and customs sensors implemented in this package. 655 656 Removes destroyed actors from the :py:class:`.CarlaDataProvider` actor pool. 657 """ 658 batch: "list[carla.command.DestroyActor]" = [] 659 for actor in actors: 660 if isinstance(actor, (carla.Sensor, CustomSensorInterface)): 661 actor.stop() 662 if isinstance(actor, carla.Actor): 663 if actor.is_alive: 664 batch.append(carla.command.DestroyActor(actor)) 665 else: 666 actor.destroy() 667 668 if batch and CarlaDataProvider._client: 669 try: 670 CarlaDataProvider._client.apply_batch(batch) 671 except RuntimeError as e: 672 if "time-out" in str(e): 673 logger.warning("Timeout while destroying actors.") 674 else: 675 raise 676 else: 677 for command in batch: 678 if CarlaDataProvider.actor_id_exists(command.actor_id): 679 del CarlaDataProvider._carla_actor_pool[command.actor_id] # remove by batch and not by individual command 680 elif not CarlaDataProvider._client: 681 logger.error("No client available to destroy actors.")
682 683 # -------- Context Manager -------- 684 685 def __call__(self, agent: "LunaticAgent"): 686 """ 687 Use as context manager to handle the game loop. 688 Pass an agent if None is set yet. 689 """ 690 self.agent = agent 691 self.world_model = agent._world_model 692 try: 693 if self.world_model.controller: # weakref.proxy 694 self.controller = self.world_model.controller 695 except ReferenceError: 696 self.controller = None # type: ignore[assignment] 697 if not self.controller: 698 logger.debug("Creating new controller.") 699 self.controller = self.make_controller(self.world_model, start_in_autopilot=self._args.autopilot) # hard reference # type: ignore 700 self.world_model.controller = self.controller # hard instead of weak reference 701 self.agent._validate_phases = False 702 return self 703 704 def __enter__(self): 705 if self.agent is None: 706 raise ValueError("Agent not initialized.") 707 if self.world_model is None: 708 raise ValueError("World Model not initialized.") 709 if self.controller is None: 710 raise ValueError("Controller not initialized.") 711 712 self.clock.tick() # self.args.fps) 713 frame = None 714 if self._args.handle_ticks: # i.e. no scenario runner doing it for us 715 if CarlaDataProvider.is_sync_mode(): 716 frame = self.world_model.world.tick(self._args.timeout) 717 else: 718 frame = self.world_model.world.wait_for_tick(self._args.timeout).frame 719 CarlaDataProvider.on_carla_tick() 720 721 if CarlaDataProvider.is_sync_mode(): 722 # We do this only in sync mode as frames could pass between gathering this information 723 # and an agent calling InformationManager.tick(), which in turn calls global_tick 724 # with possibly a DIFFERENT frame wasting computation. 725 if frame is None: 726 frame = self.get_world().get_snapshot().frame 727 InformationManager.global_tick(frame) 728 return self 729 730 def __exit__(self, exc_type, exc_val, exc_tb): 731 self.cooldown_framework.__exit__(exc_type, exc_val, exc_tb) 732 if exc_val: 733 if isinstance(exc_val, AgentDoneException): 734 self.continue_loop = False 735 elif isinstance(exc_val, ContinueLoopException): 736 logger.error("ContinueLoopException(%s) should be thrown during `agent.run_step` but caught by GameFramework this should not happen. Skipping this step; no controls are applied!", exc_val) 737 else: 738 return # skip render and likely terminate. 739 self.render_everything() 740 pygame.display.flip() 741
[docs] 742 @class_or_instance_method 743 def cleanup(cls_or_self: "type[Self] | Self", 744 *, 745 disable_sync: bool = True, 746 quit_pygame: bool = True): 747 """ 748 Cleans up resources and actors. 749 750 Args: 751 disable_sync: If True, will disable synchronous mode. This will 752 prevent the freezing of the Unreal Editor. 753 Default is True. 754 quit_pygame: If True, will call :py:func:`pygame.quit`. Default is True. 755 756 Note: 757 - When called from an instance with an attached agent, 758 the :python:`agent.destroy()` method is called. 759 - Otherwise will call :py:obj:`CarlaDataProvider.cleanup() <.CarlaDataProvider>`. 760 """ 761 try: 762 # Should only work for instance version, but maybe future Singleton support 763 try: 764 if cls_or_self.agent: # pyright: ignore[reportAttributeAccessIssue] 765 cls_or_self.agent.destroy() # pyright: ignore[reportAttributeAccessIssue] 766 finally: 767 if cls_or_self.world_model: # pyright: ignore[reportAttributeAccessIssue] 768 cls_or_self.world_model.destroy() # pyright: ignore[reportAttributeAccessIssue] 769 if disable_sync and cls_or_self.traffic_manager: 770 with contextlib.suppress(Exception): 771 cls_or_self.traffic_manager.set_synchronous_mode(False) 772 finally: 773 # Prevent freezing of the editor 774 if disable_sync and CarlaDataProvider.get_world() is not None: 775 # Disable Synchronous Mode 776 world_settings = carla.WorldSettings(synchronous_mode=False, 777 fixed_delta_seconds=0.0) 778 cls_or_self._world.apply_settings(world_settings) 779 CarlaDataProvider.cleanup() 780 if quit_pygame: 781 pygame.quit()
782 783 # Include access to our exceptions here 784 exceptions = _exceptions 785 """ 786 shortcut to :py:mod:`.exceptions` module containing custom exceptions. 787 788 :meta hide-value: 789 """
790 791 792# ============================================================================== 793# -- World --------------------------------------------------------------- 794# ============================================================================== 795
[docs] 796class WorldModel(AccessCarlaMixin, CarlaDataProvider): 797 """ 798 Class representing the surrounding environment. 799 800 This class is the interface between the agent, the :external_py_class:`carla.World`, the 801 :py:class:`.HUD`, and the :py:class:`.KeyboardControl`. 802 It handles ticking of the simulator and rendering of the pygame interface. 803 804 If :py:attr:`.LaunchConfig.externalActor` is set, it will look for an actor with the role name 805 :py:attr:`.LaunchConfig.rolename`, if such an actor does not yet exist it will wait for its 806 creation until the calling script continues. 807 """ 808 809 controller: Optional[Union[RSSKeyboardControl, weakref.ProxyType[RSSKeyboardControl]]] = None 810 """ 811 Set when controller is created. Uses weakref.proxy as backreference. 812 This is not a :py:mod:`weakref` object, when 813 `with gameframework(agent) <GameFramework.__call__>`:py:meth: is used. 814 """ 815 816 game_framework: Optional["weakref.CallableProxyType[GameFramework]"] = None 817 """ 818 Set when world created via GameFramework. Uses weakref.proxy as backreference 819 820 Attention: 821 Currently not used and not initialized. 822 823 :meta private: 824 """ 825 826 player: carla.Vehicle 827 """ 828 The linked actor. If :py:attr:`.external_actor` is set this will be the first actor found 829 with that role name. 830 """ 831
[docs] 832 def __init__(self, config: "LunaticAgentSettings", 833 args: Union[LaunchConfig, Mapping[str, Any], "os.PathLike[str]", str] = "./conf/launch_config.yaml", 834 agent: Optional[LunaticAgent] = None, 835 *, 836 carla_world: Optional[carla.World] = None, 837 player: Optional[carla.Vehicle] = None, 838 map_inst: Optional[carla.Map] = None): 839 """Constructor method""" 840 # Set World 841 if self.get_world() is None: 842 if carla_world is None: 843 raise ValueError("CarlaDataProvider not available and `carla_world` not passed.") 844 self.world = carla_world 845 elif carla_world is not None and self.world != carla_world: 846 raise ValueError("CarlaDataProvider.get_world() and passed `carla_world` are not the same.") 847 848 self.world_settings = self.world.get_settings() 849 """Object containing some data about the simulation such as synchrony between client and server or rendering mode.""" 850 851 if agent: 852 agent._world_model = self # backreference, if needed; the LunaticAgent sets this as well. 853 854 if self.map is not None: # if this is set accesses CarlaDataProvider 855 if map_inst and self.map != map_inst: 856 raise ValueError("CarlaDataProvider.get_map() and passed map_inst are not the same.") 857 elif map_inst: 858 if isinstance(map_inst, carla.Map): 859 self.map = map_inst 860 else: 861 logger.warning("Warning: Ignoring the given map as it is not a 'carla.Map'") 862 if self.map is None: 863 try: 864 self.set_world(self.world) # CDP function 865 except RuntimeError as error: 866 print(f'RuntimeError: {error}') 867 print(' The server could not send the OpenDRIVE (.xodr) file:') 868 print(' Make sure it exists, has the same name of your town, and is correct.') 869 sys.exit(1) 870 871 self._config = config 872 if not isinstance(args, (Mapping, DictConfig, LaunchConfig)): # TODO: should rather check for string like 873 # Args is expected to be a string here 874 # NOTE: This does NOT INCLUDE CLI OVERWRITES 875 # When passed as path with directory 876 config_dir, config_name = os.path.split(args) 877 try: 878 args = GameFramework.initialize_hydra(config_dir, config_name) 879 except ValueError: 880 # Hydra already initialized 881 args = GameFramework.load_hydra_config(config_name) 882 except Exception: 883 print("Problem with", type(args), args) 884 raise 885 args.externalActor = not (player is not None or agent is not None) # TEMP: Remove to force clean config. 886 self._args: LaunchConfig = assure_type(LaunchConfig, args) 887 888 self.hud: HUD = HUD(self._args.width, self._args.height, self.world) 889 """The :py:class:`HUD` that is managed.""" 890 891 self.sync: Optional[bool] = self._args.sync 892 """Set from :py:attr:`.LaunchConfig.sync`""" 893 894 self.dim = (self._args.width, self._args.height) 895 896 self.external_actor: bool = self._args.externalActor 897 """Set from :py:attr:`.LaunchConfig.externalActor`""" 898 899 self.actor_role_name: Optional[str] = self._args.rolename 900 """Set from :py:attr:`.LaunchConfig.rolename`""" 901 902 self._actor_filter = self._args.filter 903 self._actor_generation: Literal[1, 2, "all"] = self._args.generation 904 self._gamma = self._args.camera.gamma 905 906 # TODO: Unify with CameraManager 907 self.recording = False 908 self._has_recorded = False 909 self._recording_dirs = [] 910 self.recording_frame_num = 0 911 self.recording_dir_num = 0 912 913 # From manual controls; used client.start_recorder() 914 # 915 self.recording_enabled: bool = False 916 """ 917 Indicator if :py:attr:`carla.Client` recording feature is on or off. 918 919 Experimental & Untested! 920 CTRL + R : toggle recording of simulation (replacing any previous) 921 CTRL + P : start replaying last recorded simulation 922 923 :meta private: 924 """ 925 926 self.recording_start = 0 927 """ 928 CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds) 929 CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds) 930 931 :meta private: 932 """ 933 934 if self.external_actor and (player is not None or agent is not None): 935 raise ValueError("External actor cannot be used with player or agent.") 936 if player is None and agent is not None: 937 self.player = agent._vehicle 938 elif player is not None and agent is not None: 939 if player != agent._vehicle: 940 raise ValueError("Passed `player` and `agent._vehicle` are not the same.") 941 self.player = player 942 else: 943 # If player is None here, will set in in restart() 944 self.player = player # pyright: ignore[reportAttributeAccessIssue] 945 946 assert self.player is not None or self.external_actor # Note: Former optional. Player set in restart 947 948 self.collision_sensor: CollisionSensor = None # type: ignore # set in restart 949 self.lane_invasion_sensor: LaneInvasionSensor = None # type: ignore # set in restart 950 self.gnss_sensor: Optional[GnssSensor] = None 951 self.imu_sensor: Optional[IMUSensor] = None # from interactive 952 self.radar_sensor: Optional[RadarSensor] = None # from interactive 953 self.camera_manager: CameraManager = None # type: ignore # set in restart 954 """ 955 Manages cameras for the user interface and :py:class:`.HUD`. 956 """ 957 958 self._weather_presets = CarlaDataProvider.find_weather_presets() 959 self._weather_index = 0 960 self.weather: str = "NotSet" 961 """ 962 Name of currently used weather preset. 963 See also: :py:class:`CarlaDataProvider.find_weather_presets()<CarlaDataProvider>` 964 965 :meta hide-value: 966 """ 967 968 self.actors: List[Union[carla.Actor, CustomSensorInterface]] = [] 969 """Actors attached to this instance for the user interface and :py:class:`.HUD`.""" 970 971 # From interactive: 972 self.constant_velocity_enabled = NotImplemented #: :meta private: 973 self.show_vehicle_telemetry = False #: :meta private: # enabled via KeyboardController 974 self.doors_are_open: bool = False 975 """ 976 Note: 977 Only set over the KeyboardController, does not query the actor or simulation 978 979 :meta private: 980 """ 981 982 self.current_map_layer = 0 983 self.map_layer_names = [ 984 carla.MapLayer.NONE, 985 carla.MapLayer.Buildings, 986 carla.MapLayer.Decals, 987 carla.MapLayer.Foliage, 988 carla.MapLayer.Ground, 989 carla.MapLayer.ParkedVehicles, 990 carla.MapLayer.Particles, 991 carla.MapLayer.Props, 992 carla.MapLayer.StreetLights, 993 carla.MapLayer.Walls, 994 carla.MapLayer.All 995 ] 996 # RSS 997 # set in restart 998 self.rss_sensor: Optional[RssSensor] = None 999 self.rss_unstructured_scene_visualizer: RssUnstructuredSceneVisualizer = None # type: ignore[assignment] 1000 self.rss_bounding_box_visualizer: RssBoundingBoxVisualizer = None # type: ignore[assignment] 1001 1002 if config.rss: 1003 if config.rss.enabled and not self._actor_filter.startswith("vehicle."): 1004 print('Error: RSS only supports vehicles as ego. Disable RSS or use a vehicle filter for the actor.') 1005 sys.exit(1) 1006 if AD_RSS_AVAILABLE and config.rss.enabled: 1007 self._restrictor = carla.RssRestrictor() 1008 else: 1009 self._restrictor = None 1010 else: 1011 self._restrictor = None 1012 1013 logger.info("Calling WorldModel.restart()") 1014 self._first_start = True 1015 """Indicate that restart was called for the first time.""" 1016 self.restart() 1017 assert self.player is not None 1018 self._vehicle_physics = self.player.get_physics_control() 1019 self.world_tick_id = self.world.on_tick(self.hud.on_world_tick) 1020 1021 if CarlaDataProvider._traffic_light_map is None: 1022 logger.error("Traffic light map not set at this point") # should not have happened 1023 CarlaDataProvider.set_world(self._world) 1024 elif not CarlaDataProvider._traffic_light_map: 1025 logger.error("Traffic light map is empty") # should not have happened 1026 CarlaDataProvider.prepare_map()
1027
[docs] 1028 def rss_set_road_boundaries_mode(self, 1029 road_boundaries_mode: Optional[Union['RssRoadBoundariesModeAlias', 1030 carla.RssRoadBoundariesMode, 1031 bool]] = None) -> None: 1032 """ 1033 Choose wether or not to use the RSS road boundaries feature. 1034 1035 Toggles: :py:attr:`.RssSettings.use_stay_on_road_feature` 1036 1037 Parameters: 1038 road_boundaries_mode: If :python:`None`, uses the value from the config. 1039 If :python:`True`, sets to :py:attr:`carla.RssRoadBoundariesMode.On`. 1040 If :python:`False`, sets to :py:attr:`carla.RssRoadBoundariesMode.Off`. 1041 1042 See Also: 1043 - :py:class:`carla.RssSensor` 1044 """ 1045 # Called from KeyboardControl 1046 if road_boundaries_mode is None: 1047 road_boundaries_mode = self._config.rss.use_stay_on_road_feature 1048 else: 1049 # Depending on AD_RSS_AVAILABLE this uses carla.RssRoadBoundariesMode or the alias 1050 if road_boundaries_mode: 1051 self._config.rss.use_stay_on_road_feature = RssRoadBoundariesMode.On 1052 else: 1053 self._config.rss.use_stay_on_road_feature = RssRoadBoundariesMode.Off 1054 if self.rss_sensor: 1055 self.rss_sensor.sensor.road_boundaries_mode = (carla.RssRoadBoundariesMode.On 1056 if road_boundaries_mode 1057 else carla.RssRoadBoundariesMode.Off) 1058 else: 1059 print("Warning: RSS Road Boundaries Mode not set. RSS sensor not found.")
1060 1061 def toggle_pause(self): 1062 """ 1063 Toggle pause_simulation from the KeyboardControls. 1064 1065 :meta private: 1066 """ 1067 settings = self.world.get_settings() 1068 self.pause_simulation(not settings.synchronous_mode) 1069
[docs] 1070 def pause_simulation(self, pause: bool): 1071 """ 1072 Pauses the simulation by setting the world to synchronous mode. 1073 1074 Attention: 1075 Only works reliable in **asynchronous mode** (:py:attr:`sync=False <.LaunchConfig.sync>`) 1076 and might lead to unexpected behavior in synchronous mode. 1077 """ 1078 if self._args.sync: 1079 logger.warning("Pause simulation only works in asynchronous mode.") 1080 1081 settings = self.world.get_settings() 1082 if pause and not settings.synchronous_mode: 1083 settings.synchronous_mode = True 1084 settings.fixed_delta_seconds = 0.05 1085 self.world.apply_settings(settings) 1086 elif not pause and settings.synchronous_mode: 1087 settings.synchronous_mode = False 1088 settings.fixed_delta_seconds = None 1089 self.world.apply_settings(settings)
1090 1091 @staticmethod 1092 def _find_external_actor(world: carla.World, 1093 role_name: str, 1094 actor_list: Optional[carla.ActorList] = None) -> Optional[carla.Actor]: 1095 """ 1096 Looks to find an actor with a matching **role_name**. 1097 """ 1098 player = None 1099 for actor in actor_list or world.get_actors(): 1100 if actor.attributes.get('role_name') == role_name: 1101 if player is not None: 1102 logger.error("Multiple actors with role_name `%s` found. id: %s. " 1103 "Returning the first one found.", role_name, actor.id) 1104 else: 1105 player = actor 1106 return player 1107 1108 def _wait_for_external_actor(self, timeout: float = 20, sleep: float = 3) -> carla.Actor: 1109 """ 1110 Does not resume the script until an external actor with the role name is found. 1111 1112 Raises: 1113 AssertionError: If :py:attr:`actor_role_name` is not set. 1114 SystemExit: If the actor is not found within the time period. 1115 """ 1116 assert self.actor_role_name 1117 self.tick_server_world() # Tick the world? 1118 start = time.time() 1119 t = start 1120 while t < start + timeout: 1121 player = self._find_external_actor(self.world, self.actor_role_name) 1122 if player is not None: 1123 return player 1124 logger.info("...External actor not found. Waiting to find external actor named `%s`", 1125 self.actor_role_name) 1126 time.sleep(sleep) # Note if on same thread, nothing will happen. Put function into thread? 1127 self.tick_server_world() # Tick the world? 1128 t = time.time() 1129 logger.error(f"External actor `{self.actor_role_name}` not found. Exiting...") 1130 print(f"External actor `{self.actor_role_name}` not found. Exiting...") 1131 sys.exit(1) 1132
[docs] 1133 def restart(self): 1134 """ 1135 Restart the world and sets up the :py:class:`.HUD` sensors. 1136 If :py:attr:`player` is not set or :py:attr:`external_actor` is set, 1137 looks for an actor with the role name, or spawns a new actor. 1138 1139 Note: 1140 Called during :py:meth:`__init__`. 1141 """ 1142 # Keep same camera config if the camera manager exists. 1143 cam_index = assure_type(int, self.camera_manager.index 1144 if self.camera_manager is not None 1145 else 0) 1146 cam_pos_id = (self.camera_manager.transform_index 1147 if self.camera_manager is not None 1148 else 0) 1149 if self.external_actor: 1150 # Check whether there is already an actor with defined role name 1151 if not self.actor_role_name: 1152 raise ValueError("When using external actor, rolename must be set.") 1153 actor_list = self.world.get_actors() # In sync mode the actor list could be empty 1154 external_actor = self._find_external_actor(self.world, self.actor_role_name, actor_list) 1155 if self.player is None: 1156 if external_actor: 1157 self.player = assure_type(carla.Vehicle, external_actor) 1158 else: 1159 self.player = assure_type(carla.Vehicle, 1160 self._wait_for_external_actor(timeout=self._args.timeout + 10)) 1161 elif external_actor and self.player.id != external_actor.id: # NOTE: even with same id different instances and hashes. 1162 logger.warning("External actor found with role_name `%s` but different id. " 1163 "Keeping the current actor (%s) and ignoring the external actor (%s)", 1164 self.actor_role_name, self.player.id, external_actor.id) 1165 if TYPE_CHECKING: 1166 self.player = assure_type(carla.Vehicle, self.player) 1167 1168 else: 1169 # Get a random blueprint. 1170 if self.player is None or self.camera_manager is not None: 1171 # First pass without a player or second pass -> new player 1172 blueprint: carla.ActorBlueprint = assure_type(carla.ActorBlueprint, 1173 random.choice(get_actor_blueprints(self._actor_filter, self._actor_generation))) 1174 blueprint.set_attribute('role_name', self.actor_role_name) # type: ignore[arg-type] 1175 if blueprint.has_attribute('color'): 1176 color = random.choice(blueprint.get_attribute('color').recommended_values) 1177 blueprint.set_attribute('color', color) 1178 # From Interactive: 1179 if blueprint.has_attribute('terramechanics'): # For Tire mechanics/Physics? # Todo is that needed? 1180 blueprint.set_attribute('terramechanics', 'false') 1181 if blueprint.has_attribute('driver_id'): 1182 driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values) 1183 blueprint.set_attribute('driver_id', driver_id) 1184 if blueprint.has_attribute('is_invincible'): 1185 blueprint.set_attribute('is_invincible', 'true') 1186 1187 # TODO: Make this a config option to choose automatically. 1188 # set the max speed 1189 #if blueprint.has_attribute('speed'): 1190 # self.player_max_speed = float(blueprint.get_attribute('speed').recommended_values[1]) 1191 # self.player_max_speed_fast = float(blueprint.get_attribute('speed').recommended_values[2]) 1192 1193 # Spawn the player. 1194 if self.player is not None: 1195 spawn_point = self.player.get_transform() 1196 spawn_point.location.z += 2.0 1197 spawn_point.rotation.roll = 0.0 1198 spawn_point.rotation.pitch = 0.0 1199 if self.camera_manager is not None: # None at first start; not None if player was already set before 1200 self.destroy() 1201 self.player = assure_type(carla.Vehicle, self.world.try_spawn_actor(blueprint, spawn_point)) 1202 self.modify_vehicle_physics(self.player) 1203 while self.player is None: 1204 if not self.map.get_spawn_points(): 1205 print('There are no spawn points available in your map/town.') 1206 print('Please add some Vehicle Spawn Point to your UE4 scene.') 1207 sys.exit(1) 1208 spawn_points = self.map.get_spawn_points() 1209 spawn_point: carla.Transform = random.choice(spawn_points) if spawn_points else carla.Transform() # type: ignore 1210 self.player = assure_type(carla.Vehicle, 1211 self.world.try_spawn_actor(blueprint, spawn_point)) 1212 # From Interactive: 1213 # See: https://carla.readthedocs.io/en/latest/tuto_G_control_vehicle_physics/ 1214 self.show_vehicle_telemetry = False 1215 self.modify_vehicle_physics(self.player) 1216 1217 # Clean external actors restarting a second time 1218 if self.external_actor and ( 1219 # None cleans only first time; False will never clean. 1220 (self._args.restart_clean_sensors is not False and not self._first_start) 1221 or self._args.restart_clean_sensors is True 1222 ): 1223 ego_sensors = [actor for actor in self.world.get_actors() if actor.parent == self.player] 1224 1225 # Remove all old sensors 1226 for ego_sensor in ego_sensors: 1227 if ego_sensor is not None: 1228 ego_sensor.destroy() 1229 1230 # Set up the sensors. 1231 self.collision_sensor = CollisionSensor(self.player, self.hud) 1232 self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud) 1233 self.gnss_sensor = None # GnssSensor(self.player) # TODO: make it optional 1234 self.imu_sensor = None # IMUSensor(self.player) 1235 self.actors.extend([ 1236 self.collision_sensor, 1237 self.lane_invasion_sensor, 1238 ]) 1239 if self.gnss_sensor: 1240 self.actors.append(self.gnss_sensor) 1241 if self.imu_sensor: 1242 self.actors.append(self.imu_sensor) 1243 1244 logger.log(0, "Setting up camera manager") 1245 self.camera_manager = CameraManager(self.player, self.hud, self._args) 1246 self.camera_manager.transform_index = cam_pos_id 1247 self.camera_manager.set_sensor(cam_index, notify=False) 1248 logger.log(0, "Camera Manager set up") 1249 1250 actor_type = get_actor_display_name(self.player) 1251 self.hud.notification(text=actor_type) 1252 1253 self.rss_unstructured_scene_visualizer = RssUnstructuredSceneVisualizer(self.player, 1254 self.world, 1255 self.dim, 1256 gamma_correction=self._gamma) # TODO: use args instead of gamma 1257 self.rss_bounding_box_visualizer = RssBoundingBoxVisualizer(self.dim, 1258 self.world, 1259 self.camera_manager.sensor) 1260 if AD_RSS_AVAILABLE and self._config.rss and self._config.rss.enabled: 1261 log_level = self._config.rss.log_level 1262 if not isinstance(log_level, carla.RssLogLevel): 1263 try: 1264 if isinstance(log_level, str): 1265 log_level = carla.RssLogLevel.names[log_level] 1266 else: 1267 log_level = carla.RssLogLevel(log_level) 1268 except Exception as e: 1269 msg = f"Could not convert '{log_level}' to RssLogLevel must be in {list(carla.RssLogLevel.names.keys())}" 1270 raise KeyError(msg) from e 1271 logger.info("Carla Log level was not a RssLogLevel") 1272 self.rss_sensor = RssSensor(self.player, 1273 self.rss_unstructured_scene_visualizer, 1274 self.rss_bounding_box_visualizer, 1275 self.hud.rss_state_visualizer, 1276 visualizer_mode=self._config.rss.debug_visualization_mode, 1277 log_level=log_level) 1278 self.rss_set_road_boundaries_mode(self._config.rss.use_stay_on_road_feature) 1279 else: 1280 self.rss_sensor = None 1281 self._first_start = False 1282 self.tick_server_world()
1283
[docs] 1284 def tick_server_world(self) -> "int | carla.WorldSnapshot | None": 1285 """ 1286 When :py:attr:`.LaunchConfig.handle_ticks` is :python:`True` 1287 uses :external_py_meth:`carla.World.tick` or :external_py_meth:`carla.World.wait_for_tick` 1288 depending on :py:attr:`.LaunchConfig.sync`. 1289 """ 1290 if self._args.handle_ticks: 1291 if self.sync: 1292 # if timeout is used it it might interfere with _wait_for_external_actor 1293 return self.world.tick(self._args.timeout) 1294 return self.world.wait_for_tick(self._args.timeout) 1295 return None
1296 1297 #def tick(self, clock): 1298 # self.hud.tick(self.player, clock) # RSS example. TODO: Check which has to be used! 1299
[docs] 1300 def tick(self, clock: "pygame.time.Clock"): 1301 """Method for every tick""" 1302 self.hud.tick(self, clock, InformationManager.obstacles)
1303
[docs] 1304 def next_weather(self, reverse: bool = False) -> None: 1305 """Get next weather setting""" 1306 self._weather_index += -1 if reverse else 1 1307 self._weather_index %= len(self._weather_presets) 1308 preset = self._weather_presets[self._weather_index] 1309 self.hud.notification(f'Weather: {preset[1]}') 1310 self.player.get_world().set_weather(preset[0]) 1311 self.weather = preset[1]
1312
[docs] 1313 def next_map_layer(self, reverse: bool = False) -> None: 1314 self.current_map_layer += -1 if reverse else 1 1315 self.current_map_layer %= len(self.map_layer_names) 1316 selected = self.map_layer_names[self.current_map_layer] 1317 self.hud.notification(f'LayerMap selected: {selected}')
1318
[docs] 1319 def load_map_layer(self, unload: bool = False): 1320 selected = self.map_layer_names[self.current_map_layer] 1321 if unload: 1322 self.hud.notification(f'Unloading map layer: {selected}') 1323 self.world.unload_map_layer(selected) 1324 else: 1325 self.hud.notification(f'Loading map layer: {selected}') 1326 self.world.load_map_layer(selected)
1327
[docs] 1328 def toggle_recording(self): 1329 """ 1330 Start recording images from the camera output. 1331 1332 Saved in 1333 :py:attr:`LaunchConfig.camera.recorder.output_path <.CameraConfig.RecorderSettings.output_path>` 1334 with the current frame number. 1335 """ 1336 if not self.recording: 1337 self._has_recorded = True 1338 dir_name, filename = os.path.split(self._args.camera.recorder.output_path) 1339 try: 1340 dir_name_formatted = dir_name % self.recording_dir_num 1341 except TypeError: 1342 dir_name += "%04d" 1343 dir_name_formatted = dir_name % self.recording_dir_num 1344 while os.path.exists(dir_name_formatted): # noqa: PTH110 1345 self.recording_dir_num += 1 1346 dir_name_formatted = dir_name % self.recording_dir_num 1347 self.recording_file_format = os.path.join(dir_name, filename) # keep unformatted. # noqa: PTH118 1348 self.recording_frame_num = 0 1349 Path(dir_name_formatted).mkdir(parents=True, exist_ok=True) 1350 self.hud.notification(f'Started recording (folder: {dir_name_formatted})') 1351 self._recording_dirs.append(dir_name_formatted) 1352 else: 1353 dir_name_formatted = os.path.split(self.recording_file_format)[0] % self.recording_dir_num 1354 self.hud.notification(f'Recording finished (folder: {dir_name_formatted})') 1355 1356 self.recording = not self.recording
1357
[docs] 1358 def toggle_radar(self): 1359 """Adds or destroys a radar sensor for the user interface""" 1360 if self.radar_sensor is None: 1361 self.radar_sensor = RadarSensor(self.player) 1362 elif self.radar_sensor.sensor is not None: 1363 self.radar_sensor.sensor.destroy() 1364 self.radar_sensor = None
1365
[docs] 1366 def modify_vehicle_physics(self, actor: carla.Vehicle): 1367 # If actor is not a vehicle, we cannot use the physics control 1368 try: 1369 physics_control = actor.get_physics_control() 1370 physics_control.use_sweep_wheel_collision = True 1371 actor.apply_physics_control(physics_control) 1372 except Exception: 1373 logger.warning("Could not modify vehicle physics for actor: %s", actor)
1374
[docs] 1375 def finalize_render(self, display: pygame.Surface): 1376 """ 1377 Draws the HUD and saves the image if recording is enabled. 1378 1379 Assumes render(..., finalize=False) was called before, 1380 use this function if you want to render something in between. 1381 """ 1382 self.hud.render(display) 1383 if self.recording: 1384 try: 1385 pygame.image.save(display, self.recording_file_format % (self.recording_dir_num, self.recording_frame_num)) 1386 except Exception as e: 1387 logger.error("Could not save image format: `%s` % (self.recording_dir_num, self.recording_frame_num): %s", self.recording_file_format, e) 1388 self.recording_frame_num += 1
1389
[docs] 1390 def render(self, display: pygame.Surface, finalize: bool = True): 1391 """ 1392 Render world 1393 1394 Recording should be done at the end of the render method, 1395 however camera_manager.render is called first to render the camera. 1396 1397 Call with finalize=False to only render the camera. 1398 1399 Afterwards applying other render features call `finalize_render` 1400 to draw the HUD and save the image if recording is enabled. 1401 """ 1402 self.camera_manager.render(display) 1403 self.rss_bounding_box_visualizer.render(display, self.camera_manager.current_frame) 1404 self.rss_unstructured_scene_visualizer.render(display) 1405 if finalize: 1406 self.finalize_render(display)
1407
[docs] 1408 def destroy_sensors(self): 1409 """Destroy sensors""" 1410 if self.rss_sensor: 1411 self.rss_sensor.destroy() 1412 self.rss_sensor = None 1413 if self.rss_unstructured_scene_visualizer: 1414 self.rss_unstructured_scene_visualizer.destroy() 1415 self.rss_unstructured_scene_visualizer = None # type: ignore[assignment] 1416 if self.camera_manager is not None: 1417 self.camera_manager.destroy() 1418 self.camera_manager = None # type: ignore[assignment] 1419 if self.radar_sensor is not None: 1420 self.toggle_radar() # destroys it if not None
1421
[docs] 1422 def destroy(self, destroy_ego: bool = False): 1423 """ 1424 Destroys all actors 1425 1426 Parameters: 1427 destroy_ego: If True, will destroy the :py:attr:`player` as well. Else assume that it 1428 is destroyed by someone else. 1429 """ 1430 # stop from ticking 1431 if self.world_tick_id and self.world: 1432 self.world.remove_on_tick(self.world_tick_id) 1433 self.destroy_sensors() 1434 if destroy_ego and self.player not in self.actors: # do not destroy external actors. 1435 logger.debug("Adding player to destruction list.") 1436 self.actors.append(self.player) 1437 elif not destroy_ego and self.player in self.actors: 1438 logger.warning("destroy_ego=False, but player is in actors list. " 1439 "Destroying the actor from within WorldModel.destroy.") 1440 1441 #logger.info("to destroy %s", list(map(str, self.actors))) 1442 # Batch destroy in one simulation step 1443 real_actors: Sequence[carla.Actor] = [actor for actor in self.actors 1444 if isinstance(actor, carla.Actor)] 1445 GameFramework.destroy_actors(real_actors) 1446 1447 # e.g. CustomSensorInterface 1448 other_actors = [actor for actor in self.actors 1449 if not isinstance(actor, carla.Actor)] 1450 while other_actors: 1451 actor = other_actors.pop(0) 1452 if actor is not None: 1453 print("destroying actor: " + str(actor), end=" destroyed=") 1454 try: 1455 actor.stop() 1456 except AttributeError as e: 1457 logger.debug("Error with actor {}: {}", actor, e) 1458 try: 1459 x = actor.destroy() # Non carla instances 1460 print(x) 1461 except RuntimeError: 1462 logger.warning("Could not destroy actor: " + str(actor)) 1463 self.actors.clear() 1464 if self._args.handle_ticks and self.get_world(): 1465 self.get_world().tick() 1466 1467 if self._has_recorded: 1468 runtime_dir = GameFramework.get_hydra_config().runtime.output_dir 1469 for dir_name_formatted in self._recording_dirs: 1470 dirname = os.path.split(dir_name_formatted)[1] 1471 subprocess.run(["ffmpeg", "-an", "-sn", "-i", f"{dir_name_formatted}/%08d.bmp", "-framerate", "1", "-vcodec", "mpeg4", "-r", "60", f"{Path(runtime_dir) / dirname}.avi"]) # noqa: E501 1472 # os.system(f'ffmpeg -an -sn -i "{dir_name_formatted}/%08d.bmp" -framerate 1 -vcodec mpeg4 -r 60 "{os.path.join(runtime_dir, dirname)}.avi"') # noqa: E501 1473 print(f"Recording saved in {Path(runtime_dir) / dirname}.avi")
1474
[docs] 1475 def rss_check_control(self, vehicle_control: carla.VehicleControl) -> Union[carla.VehicleControl, None]: 1476 """ 1477 Checks the vehicle control against the RSS restrictions and possibly proposes an alternative. 1478 """ 1479 self.hud.original_vehicle_control = vehicle_control 1480 self.hud.restricted_vehicle_control = vehicle_control 1481 if not AD_RSS_AVAILABLE or not self.rss_sensor: 1482 return None 1483 1484 if (self.rss_sensor.log_level <= carla.RssLogLevel.warn 1485 and self.rss_sensor.ego_dynamics_on_route 1486 and not self.rss_sensor.ego_dynamics_on_route.ego_center_within_route): 1487 logger.warning("RSS: Not on route! " + str(self.rss_sensor.ego_dynamics_on_route)[:97] + "...") 1488 # Is there a proper response? 1489 rss_proper_response = self.rss_sensor.proper_response if self.rss_sensor and self.rss_sensor.response_valid else None 1490 if rss_proper_response: 1491 # adjust the controls 1492 proposed_vehicle_control = self._restrictor.restrict_vehicle_control( # pyright: ignore[reportOptionalMemberAccess] 1493 vehicle_control, 1494 rss_proper_response, 1495 self.rss_sensor.ego_dynamics_on_route, 1496 self._vehicle_physics) 1497 self.hud.restricted_vehicle_control = proposed_vehicle_control 1498 self.hud.allowed_steering_ranges = self.rss_sensor.get_steering_ranges() 1499 return proposed_vehicle_control 1500 return None