Source code for classes.worldmodel

   1"""
   2Interface classes between CARLA, the agent, and the user interface.
   3"""
   4
   5# pyright: reportUnknownMemberType=false
   6
   7from __future__ import annotations
   8
   9import contextlib
  10import os
  11from pathlib import Path
  12import subprocess
  13import sys
  14import time
  15import weakref
  16
  17from collections.abc import Mapping
  18from typing import TYPE_CHECKING, Any, ClassVar, Iterable, List, NoReturn, Optional, Sequence, Union, overload
  19import typing
  20
  21import carla
  22import hydra
  23from numpy import random as nprandom
  24import pygame
  25import random
  26from hydra.core.global_hydra import GlobalHydra
  27from hydra.core.hydra_config import HydraConfig
  28from hydra.core.utils import configure_log
  29from omegaconf import DictConfig, OmegaConf, open_dict
  30
  31from agents.tools.config_creation import LaunchConfig, RssLogLevel, RssRoadBoundariesMode, config_store
  32from classes import MockDummy, exceptions as _exceptions
  33from classes.ui.camera_manager import CameraManager
  34from classes.sensors._sensor_interface import CustomSensorInterface
  35from classes.sensors.carla_originals import CollisionSensor, GnssSensor, IMUSensor, LaneInvasionSensor, RadarSensor
  36from classes.exceptions import AgentDoneException, ContinueLoopException
  37from classes.ui.hud import HUD, get_actor_display_name
  38from classes.ui.keyboard_controls import KeyboardControl, RSSKeyboardControl
  39from classes.sensors.rss_sensor import AD_RSS_AVAILABLE, RssSensor
  40from classes.sensors.rss_visualization import (
  41    RssBoundingBoxVisualizer,
  42    RssStateVisualizer,
  43    RssUnstructuredSceneVisualizer,
  44)
  45from classes.information_manager import InformationManager
  46from launch_tools import class_or_instance_method
  47
  48if TYPE_CHECKING:
  49    from hydra.conf import HydraConf
  50    from typing_extensions import Self
  51
  52    from agents.lunatic_agent import LunaticAgent
  53    from agents.navigation.global_route_planner import GlobalRoutePlanner
  54    from agents.tools.config_creation import LunaticAgentSettings, RssRoadBoundariesModeAlias
  55    from classes.type_protocols import ControllerClassT
  56    from classes.detection_matrix import DetectionMatrix
  57
  58from agents.tools.logs import logger
  59from launch_tools import CarlaDataProvider, Literal
  60from launch_tools.blueprint_helpers import get_actor_blueprints
  61
  62
[docs] 63class AccessCarlaMixin: 64 """ 65 Mixin class that delegates the attributes :py:attr:`client`, :py:attr:`map`, and :py:attr:`world` 66 to the :py:class:`.CarlaDataProvider` to keep them in sync. 67 68 Note: 69 This mixin only works for instances, they are not class attributes. 70 """ 71 72 @property 73 def client(self) -> carla.Client: 74 return CarlaDataProvider.get_client() 75 76 @client.setter 77 def client(self, value: carla.Client): 78 CarlaDataProvider.set_client(value) 79 80 @property 81 def world(self) -> carla.World: 82 return CarlaDataProvider.get_world() 83 84 @world.setter 85 def world(self, value: carla.World): 86 CarlaDataProvider.set_world(value) 87 88 @property 89 def map(self) -> carla.Map: 90 return CarlaDataProvider.get_map() 91 92 @map.setter 93 def map(self, value: carla.Map): 94 """ 95 Avoid setting the map directly. Use :py:meth:`.CarlaDataProvider.set_world` instead. 96 97 Raises: 98 ValueError: If the map is not the same as the one set in :py:attr:`.CarlaDataProvider.get_map`. 99 100 :meta private: 101 """ 102 if CarlaDataProvider.get_map() != value: 103 raise ValueError("CarlaDataProvider.get_map() and passed map are not the same.") 104 # Do nothing as map is set when using get_map or set_world 105
[docs] 106 @staticmethod 107 def get_blueprint_library() -> carla.BlueprintLibrary: 108 """ 109 Access to a cached version of the blueprint library 110 111 Raises: 112 ValueError: If the blueprint library is not set, which happens when the world is not set. 113 The world must be setup before (:py:class:`CarlaDataProvider.set_world()<.CarlaDataProvider>`). 114 """ 115 if CarlaDataProvider._blueprint_library is None: 116 raise ValueError("Blueprint Library not set. Call CarlaDataProvider.set_world() first.") 117 return CarlaDataProvider._blueprint_library
118 119 120# ============================================================================== 121# -- Game Framework --------------------------------------------------------------- 122# ============================================================================== 123 124
[docs] 125class GameFramework(AccessCarlaMixin, CarlaDataProvider): 126 """ 127 A utility class to setup CARLA, pygame, Hydra_, the configuration and parts of the user interface 128 and the agent. 129 130 A :py:class:`GameFramework` instance can be used to control the game loop and work as a 131 handler for the :py:mod:`~classes.exceptions` of this project. Furthermore can it manage the cooldown 132 of the :py:class:`.Rule` classes. 133 134 Note: 135 The GameFramework derives from the :py:class:`.CarlaDataProvider` which gives this class 136 more utility methods that currently are not included in this part of the documentation. 137 """ 138 139 clock: ClassVar[pygame.time.Clock | None] = None 140 display: ClassVar[pygame.Surface | Literal[False] | None] = None 141 """ 142 The :py:mod:`pygame` surface to render on. 143 144 Is None before the initialization. :code:`False` if pygame is disabled. 145 """ 146 147 controller: weakref.ProxyType[RSSKeyboardControl] | RSSKeyboardControl 148 """Parser for keyboard events""" 149 150 traffic_manager: Optional[carla.TrafficManager] = None 151 152 @property 153 def launch_config(self) -> LaunchConfig: 154 """:py:class:`.LaunchConfig` object that was used for the initialization (**args**)""" 155 return self._launch_config 156 157 @property 158 def agent_config(self) -> LunaticAgentSettings: 159 """ 160 The configuration of the attached :py:attr:`agent`, if it exists otherwise the 161 **agent** attribute of the stored :py:attr:`launch_config`. 162 """ 163 return self.agent.config if self.agent else self._launch_config.agent 164 165 # ----- Init Functions ----- 166
[docs] 167 @classmethod 168 def quickstart(cls, launch_config: Optional[LaunchConfig] = None, *, logging: bool = False) -> "Self": 169 """ 170 Initializes Hydra_ in a limited way, i.e. does not allow for command line overrides. 171 172 Sets up the :py:class:`carla.Client` and related instances as well as pygame. 173 174 Note: 175 It is recommended that you use a :python:`@hydra.main` decorated main function instead 176 to make full use of the Hydra_ framework. 177 178 Parameters: 179 launch_config: The configuration to use. If :code:`None`, will use the default 180 configuration from :code:`./conf/launch_config.yaml`. 181 logging: If True, change the how logging is done by applying the logger settings from 182 :code:`./conf/config_extensions/job_logging.yaml`. 183 Default is :code:`False`. 184 185 Returns: 186 The initialized :py:class:`GameFramework` instance. 187 188 See Also: 189 This function uses: 190 - :py:meth:`.initialize_hydra` 191 - :py:meth:`.init_carla` 192 - :py:meth:`.init_pygame` 193 """ 194 if not launch_config: 195 launch_config = cls.initialize_hydra(logging=logging) 196 if not logging and AD_RSS_AVAILABLE: 197 launch_config.agent.rss.log_level = RssLogLevel.off 198 cls.init_carla(launch_config) 199 cls.init_pygame(launch_config) 200 if launch_config.pygame is False: 201 cls.display = False 202 pygame.quit() 203 return cls(launch_config)
204 205 # Hydra Tools 206 # TODO: this could be some launch_tools MixinClass
[docs] 207 @staticmethod 208 def initialize_hydra( 209 config_dir: str = "./conf", 210 config_name: str = "launch_config", 211 version_base=None, 212 *, 213 job_name="LunaticAgentJob", 214 logging=True, 215 structured=True, 216 ) -> "LaunchConfig": 217 """ 218 Use this function only if no hydra.main is available. 219 220 Usage: 221 222 .. code-block:: python 223 224 args = GameFramework.initialize_hydra(config_dir=<abs_path_of_conf>, config_name="launch_config") 225 game_framework = GameFramework(args) 226 227 Args: 228 config_dir: The directory where the hydra configuration is stored. 229 config_name: The name of the configuration file. 230 version_base: The version base of hydra for the configuration. Default is None. 231 job_name: The name of the job. 232 logging: If True, will set up logging. 233 structured: If True will create the config based on :py:class:`.LaunchConfig`, 234 otherwise it will be based on a :python:`dict`. This is useful for runtime 235 type-checks and conversions. 236 If the configs :py:attr:`.LaunchConfig.strict_config` value is < 2, this 237 parameter is ignored. Disable if you experience problems. 238 Default is :python:`True`. 239 240 See Also: 241 Hydra functions: 242 - :py:func:`hydra.initialize_config_dir` 243 - :py:func:`hydra.compose` 244 """ 245 config_dir = os.path.abspath(config_dir) 246 hydra_initialized = GameFramework.hydra_initialized() 247 if not hydra_initialized: 248 # Not save-guarding this against multiple calls, expose the hydra error 249 # todo: low-prio check if config dir and the other parameters are the same. 250 hydra.initialize_config_dir(version_base=version_base, config_dir=config_dir, job_name=job_name) 251 252 dict_config = hydra.compose(config_name=config_name, return_hydra_config=not hydra_initialized, overrides=None) 253 if structured and dict_config.get("strict_config", 3) >= 2: 254 # Uses the correct dataclass schemas as values. 255 if config_name == "launch_config": 256 if LaunchConfig._config_path: 257 # Load defined schema from the config Store 258 cn = config_store.load(LaunchConfig._config_path) 259 schema: Optional[DictConfig] = cn.node # type: ignore[assignment] 260 with open_dict(schema): 261 schema.merge_with(dict_config) 262 config = typing.cast("LaunchConfig", schema) 263 else: 264 schema = None 265 else: 266 schema = None 267 if schema is None: 268 logger.debug("No schema found for structured init file %s. Falling back to LaunchConfig", config_name) 269 launch_config = LaunchConfig(**dict_config) # type: ignore 270 config: LaunchConfig = OmegaConf.structured(launch_config, flags={"allow_objects": True}) 271 else: 272 config = typing.cast("LaunchConfig", dict_config) 273 assert config # pyright: ignore[reportPossiblyUnboundVariable] 274 if not hydra_initialized: 275 hydra_conf: HydraConfig = GameFramework.get_hydra_config(raw=True) 276 if OmegaConf.is_missing(config.hydra.runtime, "output_dir"): 277 config.hydra.runtime.output_dir = config.hydra.run.dir 278 hydra_conf.set_config(config) # type: ignore 279 Path(config.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True) 280 if logging: 281 # Assure that our logger works 282 configure_log(config.hydra.job_logging, logger.name) # type: ignore 283 with open_dict(config): 284 del config["hydra"] 285 config.agent._set_flag("allow_objects", True) 286 config.agent.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config. 287 return config
288 289 # TODO: Maybe unify these settings; make overrides available in the config.
[docs] 290 @staticmethod 291 def load_hydra_config(config_name: str = "conf/launch_config") -> "LaunchConfig": 292 if GameFramework.hydra_initialized(): 293 return typing.cast("LaunchConfig", hydra.compose(config_name=config_name)) 294 config_dir, config_name = os.path.split(config_name) 295 # Try to get job name from file in the stack. 296 import inspect # noqa 297 298 frame = inspect.stack()[-1] 299 module = inspect.getmodule(frame[0]) 300 name = module.__file__ if module and module.__file__ else "unknown" 301 return GameFramework.initialize_hydra(config_dir, config_name, job_name=name)
302
[docs] 303 def __init__( 304 self, 305 args: "LaunchConfig", 306 config: Optional[DictConfig] = None, 307 timeout: float = 10.0, 308 worker_threads: int = 0, 309 *, 310 map_layers=carla.MapLayer.All, 311 ): 312 """ 313 Parameters: 314 args: Configuration for the GameFramework. 315 config: Optional config for the agent (unused in this project) 316 timeout: Timeout for the :py:class:`carla.Client`. 317 worker_threads: See :py:class:`carla.Client`. 318 map_layers: See :py:meth:`carla.Client.load_world` 319 """ 320 if args.seed: 321 random.seed(args.seed) 322 nprandom.seed(args.seed) 323 self._launch_config = args 324 self.world_settings: carla.WorldSettings = self.init_carla(args, timeout, worker_threads, map_layers=map_layers) 325 326 # These are class variables 327 GameFramework.clock, GameFramework.display = self.init_pygame(args) 328 if args.pygame is False: 329 GameFramework.display = False 330 pygame.quit() 331 332 self.config = config 333 self.agent = None 334 self.world_model = None 335 self.controller = None 336 337 self.debug = self.world.debug 338 self.continue_loop = True 339 self.traffic_manager: Optional[carla.TrafficManager] = self.init_traffic_manager() 340 341 # Import here to avoid circular imports 342 from classes.rule import BlockingRule, Rule # noqa: PLC0415,RUF100 343 344 self.cooldown_framework = Rule.CooldownFramework() # used in context manager. # NOTE: Currently can be constant 345 BlockingRule._gameframework = weakref.proxy(self)
346
[docs] 347 @class_or_instance_method 348 def init_pygame( 349 cls_or_self: "Self | type[Self]", launch_config: Optional[LaunchConfig] = None, recreate: bool = False 350 ) -> tuple[pygame.time.Clock, pygame.Surface | Literal[False] | None]: 351 """ 352 Parameters: 353 launch_config: Will use the :py:attr:`width<.LaunchConfig.width>` and 354 :py:attr:`height<.LaunchConfig.height> attributes of this object if set the 355 :py:mod:`pygame` windows size. Otherwise will use :python:`(1280, 720)`. 356 Defaults to :code:`None`. 357 358 recreate: If :python:`True`, will reinitialize pygame a second time if this function is 359 called. 360 361 .. experimental; returns None, None if launch_config.pygame is False. 362 """ 363 if recreate or GameFramework.clock is None or GameFramework.display is None: 364 if launch_config is None: 365 launch_config = getattr(cls_or_self, "_args", None) # get from inst. 366 # NOTE: launch_config could still be None, i.e. when called from a class. 367 if getattr(launch_config, "pygame", None): 368 pygame.init() # NOTE: this is currently not guaranteed anymore. 369 pygame.font.init() 370 if recreate or GameFramework.clock is None: 371 GameFramework.clock = pygame.time.Clock() 372 if recreate or GameFramework.display is None: 373 if getattr(launch_config, "pygame", None) and "READTHEDOCS" not in os.environ: 374 GameFramework.display = pygame.display.set_mode( 375 size=(launch_config.width, launch_config.height) if launch_config else (1280, 720), 376 flags=pygame.HWSURFACE | pygame.DOUBLEBUF, 377 ) 378 elif getattr(launch_config, "pygame", None) is False: 379 GameFramework.display = False 380 else: 381 GameFramework.display = None # Note sure if should initialize, try again next time. 382 return GameFramework.clock, GameFramework.display
383
[docs] 384 @staticmethod 385 def init_carla( 386 args: Optional[LaunchConfig] = None, 387 timeout: Optional[float] = None, 388 worker_threads: int = 0, 389 *, 390 map_layers: carla.MapLayer = carla.MapLayer.All, 391 ) -> carla.WorldSettings: 392 """ 393 Initializes the :py:class:`carla.Client` and the connects it to the simulator. 394 395 Parameters: 396 args: The configuration for the GameFramework. 397 timeout: The timeout for the :py:class:`carla.Client`. Default is 10.0. 398 399 .. deprecated:: _ 400 Use the :py:attr:`.LaunchConfig.timeout` attribute instead. 401 402 worker_threads: See :py:class:`carla.Client`. 403 map_layers: The map layers to load. Default is :py:attr:`carla.MapLayer.All`. 404 405 Returns: 406 The settings of the loaded world. 407 408 Note: 409 This function has to be called before :py:attr:`client`, :py:attr:`world` 410 or :py:attr:`map` can be accessed. 411 Alternatively the client and :py:class:`CarlaDataProvider` need to be setup in a 412 different way beforehand. 413 414 See Also: 415 - :py:class:`carla.Client` 416 - :py:meth:`carla.Client.load_world` 417 """ 418 # Note: This sets up the CarlaDataProvider 419 if args is None: 420 timeout = timeout or 10.0 421 GameFramework.setup_client_map_and_world( 422 timeout=timeout, worker_threads=worker_threads, map_layers=map_layers 423 ) 424 else: 425 GameFramework.setup_client_map_and_world( 426 args.map, 427 args.host, 428 args.port, 429 timeout=timeout or args.timeout, 430 worker_threads=worker_threads, 431 map_layers=map_layers, 432 sync=args.sync, 433 fps=args.fps, 434 ) 435 return CarlaDataProvider.get_world().get_settings()
436
[docs] 437 @staticmethod 438 def setup_client_map_and_world( 439 map_name: str = "Town04", 440 ip: str = "127.0.0.1", 441 port: int = 2000, 442 *, 443 timeout: float = 10.0, 444 worker_threads: int = 0, 445 reload_world: bool = False, 446 reset_settings: bool = True, 447 map_layers: carla.MapLayer = carla.MapLayer.All, 448 sync: Union[bool, None] = True, 449 fps: int = 20, 450 ) -> tuple[carla.Client, carla.World, carla.Map]: 451 """ 452 Loads the :py:class:`carla.Client`, the :py:class:`carla.World`, and the :py:class:`carla.Map`. 453 454 This is a subfunction of :py:meth:`.init_carla` 455 that can be used without a :py:class:`LaunchConfig`. 456 457 See Also: 458 :py:meth:`.init_carla` 459 :py:meth:`carla.Client.set_timeout` 460 :py:meth:`carla.Client.reload_world` 461 """ 462 client = CarlaDataProvider.get_client() 463 if client is None: 464 client = carla.Client(ip, port, worker_threads) 465 client.set_timeout(timeout) 466 CarlaDataProvider.set_client(client) 467 468 world = CarlaDataProvider.get_world() 469 if not world: 470 world = client.get_world() 471 map_ = world.get_map() # CarlaDataProvider map not yet set -> set_world 472 473 if map_name and map_.name != "Carla/Maps/" + map_name: 474 world: carla.World = client.load_world(map_name, reset_settings, map_layers) 475 elif reload_world: 476 world = client.reload_world(reset_settings) 477 logger.info("Reloaded world - map_layers ignored.") 478 elif map_name is None: 479 logger.info("Provided map_name is None, skipped loading world. Assuming world is already loaded.") 480 else: 481 logger.info( 482 "skipped loading world '%s', already loaded - map_layers and reset_settings ignored.", map_.name 483 ) 484 485 world_settings = world.get_settings() 486 # Apply world settings 487 if sync is not None: 488 if sync: 489 logger.debug("Using synchronous mode.") 490 # apply synchronous mode if wanted 491 world_settings.synchronous_mode = True 492 world_settings.fixed_delta_seconds = 1 / fps # 0.05 493 world.apply_settings(world_settings) 494 else: 495 logger.debug("Using asynchronous mode.") 496 world_settings.synchronous_mode = False 497 world.apply_settings(world_settings) 498 logger.info("World Settings:\n%s", world_settings) 499 500 # Note: set_world loads multiple information. It has to be called after applying the world settings. 501 CarlaDataProvider.set_world(world) 502 503 return client, world, CarlaDataProvider.get_map()
504
[docs] 505 def init_traffic_manager(self, port: Optional[int] = None) -> carla.TrafficManager: 506 """ 507 Returns an instance of the :py:class:`carla.TrafficManager` 508 related to the specified port. If it does not exist, this will be created. 509 510 See Also: 511 :py:meth:`carla.Client.get_trafficmanager` 512 513 Parameters: 514 port: The port to use. If :code:`None`, will use the port from 515 :py:meth:`.CarlaDataProvider.get_traffic_manager_port`, which defaults to :code:`8000`. 516 """ 517 if port is None: 518 port = CarlaDataProvider.get_traffic_manager_port() 519 traffic_manager = self.client.get_trafficmanager(port) 520 if self._launch_config.handle_ticks: 521 if self._launch_config.sync: 522 traffic_manager.set_synchronous_mode(True) 523 traffic_manager.set_hybrid_physics_mode(True) # Note default 50m 524 traffic_manager.set_hybrid_physics_radius(50.0) # TODO: make a LaunchConfig config variable 525 self.traffic_manager = traffic_manager 526 return traffic_manager
527
[docs] 528 def init_agent_and_interface( 529 self, 530 ego: Optional[carla.Vehicle], 531 agent_class: "type[LunaticAgent]", 532 config: Optional[LunaticAgentSettings] = None, 533 overwrites: Optional[dict[str, Any]] = None, 534 ) -> "tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl]": 535 """ 536 Quick setup for the agent and the world model. 537 538 Among others this executes: 539 - :py:meth:`.LunaticAgent.create_world_and_agent` 540 - :py:meth:`.GameFramework.make_controller` 541 - :py:meth:`.WorldModel.tick_server_world` 542 543 .. code-block:: python 544 545 from agents.lunatic_agent import LunaticAgent, LunaticAgentSettings 546 547 ego = world.spawn_actor(world.get_blueprint_library().find("vehicle.audi.tt")) 548 agent, world_model, global_planner, controller = ( 549 game_framework.init_agent_and_interface(ego, LunaticAgent) 550 ) 551 552 Arguments: 553 ego: The ego vehicle. Can be :code:`None` if the agent is set to use an 554 external actor (:py:attr:`.LaunchConfig.externalActor`). 555 agent_class: The agent class to instantiate. 556 config: The configuration of the agent. If :code:`None` the 557 :py:attr:`.agent_class.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` are used. 558 overwrites: Additional overwrites to the configuration. 559 """ 560 if ego is None and not self._launch_config.externalActor: 561 raise ValueError("`ego` must be passed if ``externalActor` is not set.") 562 self.agent, self.world_model, self.global_planner = agent_class.create_world_and_agent( 563 self._launch_config, vehicle=ego, sim_world=self.world, agent_config=config, overwrites=overwrites 564 ) 565 self.config = self.agent.config 566 controller = self.make_controller( 567 self.world_model, RSSKeyboardControl, start_in_autopilot=False 568 ) # Note: stores weakref to controller 569 self.world_model.game_framework = weakref.proxy(self) 570 self.world_model.tick_server_world() 571 self.agent.verify_settings( 572 strictness=-1 573 ) # NOTE: Here live info is already available and will throw some errors 574 return self.agent, self.world_model, self.global_planner, controller
575
[docs] 576 def make_world_model(self, config: "LunaticAgentSettings", player: Optional[carla.Vehicle] = None) -> "WorldModel": 577 """ 578 Creates a :py:class:`WorldModel` with a backreference to the GameFramework. 579 """ 580 self.world_model = WorldModel(config, self._launch_config, player=player) 581 self.world_model.game_framework = weakref.proxy(self) 582 return self.world_model
583
[docs] 584 def make_controller( 585 self, world_model: "WorldModel", controller_class: type[ControllerClassT] = RSSKeyboardControl, **kwargs: Any 586 ) -> ControllerClassT: 587 """ 588 Creates a keyboard controller and attaches it to the world model. 589 590 Args: 591 world_model: The world model to attach the controller to. 592 controller_class: The controller class to instantiate. Defaults to :py:class:`.RSSKeyboardControl`. 593 **kwargs: Additional arguments to pass to the controller. 594 """ 595 controller = controller_class( 596 world_model, 597 # config=self.config, 598 clock=self.clock, 599 **kwargs, 600 ) 601 self.controller = weakref.proxy(controller) 602 if self._launch_config.pygame is False: 603 controller.enable(False) 604 return controller # NOTE: does not return the proxy object.
605
[docs] 606 @staticmethod 607 def hydra_initialized() -> bool: 608 """ 609 Checks wether Hydra_ is initialized. This is normally only done 610 when the :py:func:`@hydra.main` decorator is used. 611 """ 612 return GlobalHydra.instance().is_initialized()
613 614 @overload 615 @staticmethod 616 def get_hydra_config(raw: Literal[False] = False) -> "HydraConf": ... 617 618 @overload 619 @staticmethod 620 def get_hydra_config(raw: Literal[True]) -> HydraConfig: ... 621
[docs] 622 @staticmethod 623 def get_hydra_config(raw: bool = False) -> "HydraConfig | HydraConf": 624 """ 625 Retrieves the Hydra_ configuration object. 626 627 Parameters: 628 raw: If :python:`True`, returns the :py:class:`hydra.conf.HydraConf` dataclass, otherwise the 629 :py:class:`hydra.core.hydra_config.HydraConfig` singleton. Default is :code:`False`. 630 631 Raises: 632 ValueError: If the HydraConfig was not set up yet and :python:`raw=True`. 633 """ 634 if raw: 635 return HydraConfig.instance() 636 return HydraConfig.get()
637 638 # ----- Setters ----- 639 640 def set_controller(self, controller: KeyboardControl) -> None: 641 """ 642 Set the :py:class:`KeyboardControl` 643 644 :meta private: 645 """ 646 self.controller = controller # type: ignore # maybe use proxy here too 647 648 def set_config(self, config: DictConfig) -> None: 649 """ 650 Change the :py:attr:`config`. 651 652 :meta private: 653 """ 654 self.config = config 655 656 # ----- UI Functions ----- 657
[docs] 658 def parse_controller_events(self, final_controls: Optional[carla.VehicleControl]): 659 """ 660 Parses the keyboard events with the :py:attr:`controller`. 661 662 Raises: 663 AttributeError: If neither :py:attr:`control` nor :py:attr:`world_model.controller<world_model>` are set. 664 """ 665 try: 666 controller = self.controller if self.controller or not self.world_model else self.world_model.controller 667 except ReferenceError as e: 668 raise AttributeError("No controller set.") from e 669 if controller is None: 670 raise AttributeError("No controller set.") 671 return controller.parse_events(final_controls)
672
[docs] 673 def render_everything(self): 674 """ 675 Update render and hud 676 677 Note: 678 This is the preferred method to update the world and render the camera. 679 """ 680 self.world_model.tick( 681 self.clock 682 ) # NOTE: Ticks WorldMODEL not CARLA WORLD! # pyright: ignore[reportOptionalMemberAccess, reportArgumentType] 683 if self.display: 684 self.world_model.render(self.display, finalize=False) # pyright: ignore[reportOptionalMemberAccess] 685 self.controller.render(self.display) 686 # These two types must be in sync: 687 dm_render_conf: "DetectionMatrix.RenderOptions" = self._launch_config.camera.hud.detection_matrix # type: ignore[assignment] 688 if dm_render_conf and self.agent: 689 self.agent.render_detection_matrix(self.display, **dm_render_conf) 690 self.world_model.finalize_render(self.display) # pyright: ignore[reportOptionalMemberAccess]
691
[docs] 692 @staticmethod 693 def skip_rest_of_loop(message: str = "GameFramework.end_loop") -> NoReturn: 694 """ 695 Terminates the current iteration and exits the GameFramework by raising a :py:exc:`.ContinueLoopException`. 696 697 Note: 698 It is the users responsibility to manage the agent & local planner 699 before calling this function, i.e. that the agent has a :py:class:`carla.VehicleControl` set. 700 701 Raises: 702 ContinueLoopException: With the given **message**. 703 """ 704 # TODO: add option that still allows for rss. 705 raise ContinueLoopException(message)
706 707 # -------- Tools -------- 708
[docs] 709 @staticmethod 710 def destroy_actors(actors: Iterable[carla.Actor | CustomSensorInterface]): 711 """ 712 Destroys the given actors and customs sensors implemented in this package. 713 714 Removes destroyed actors from the :py:class:`.CarlaDataProvider` actor pool. 715 """ 716 batch: "list[carla.command.DestroyActor]" = [] 717 for actor in actors: 718 if isinstance(actor, (carla.Sensor, CustomSensorInterface)): 719 actor.stop() 720 if isinstance(actor, carla.Actor): 721 if actor.is_alive: 722 batch.append(carla.command.DestroyActor(actor)) 723 else: 724 actor.destroy() 725 726 if batch and CarlaDataProvider._client: 727 try: 728 CarlaDataProvider._client.apply_batch(batch) 729 except RuntimeError as e: 730 if "time-out" in str(e): 731 logger.warning("Timeout while destroying actors.") 732 else: 733 raise 734 else: 735 for command in batch: 736 if CarlaDataProvider.actor_id_exists(command.actor_id): 737 del CarlaDataProvider._carla_actor_pool[ 738 command.actor_id 739 ] # remove by batch and not by individual command 740 elif not CarlaDataProvider._client: 741 logger.error("No client available to destroy actors.")
742 743 # -------- Context Manager -------- 744 745 def __call__(self, agent: "LunaticAgent"): 746 """ 747 Use as context manager to handle the game loop. 748 Pass an agent if None is set yet. 749 """ 750 self.agent = agent 751 self.world_model = agent._world_model 752 try: 753 if self.world_model.controller: # weakref.proxy 754 self.controller = self.world_model.controller 755 except ReferenceError: 756 self.controller = None # type: ignore[assignment] 757 if not self.controller: 758 logger.debug("Creating new controller.") 759 self.controller = self.make_controller( 760 self.world_model, start_in_autopilot=self._launch_config.autopilot 761 ) # hard reference 762 self.world_model.controller = self.controller # hard instead of weak reference 763 self.agent._validate_phases = False 764 return self 765 766 def __enter__(self): 767 if self.agent is None: 768 raise ValueError("Agent not initialized.") 769 if self.world_model is None: 770 raise ValueError("World Model not initialized.") 771 if self.controller is None: 772 raise ValueError("Controller not initialized.") 773 774 self.clock.tick() # self.args.fps) # pyright: ignore[reportOptionalMemberAccess] 775 frame = None 776 if self._launch_config.handle_ticks: # i.e. no scenario runner doing it for us 777 if CarlaDataProvider.is_sync_mode(): 778 frame = self.world_model.world.tick(self._launch_config.timeout) 779 else: 780 frame = self.world_model.world.wait_for_tick(self._launch_config.timeout).frame 781 CarlaDataProvider.on_carla_tick() 782 783 if CarlaDataProvider.is_sync_mode(): 784 # We do this only in sync mode as frames could pass between gathering this information 785 # and an agent calling InformationManager.tick(), which in turn calls global_tick 786 # with possibly a DIFFERENT frame wasting computation. 787 if frame is None: 788 frame = self.get_world().get_snapshot().frame 789 InformationManager.global_tick(frame) 790 return self 791 792 def __exit__(self, exc_type, exc_val, exc_tb): 793 self.cooldown_framework.__exit__(exc_type, exc_val, exc_tb) 794 if exc_val: 795 if isinstance(exc_val, AgentDoneException): 796 self.continue_loop = False 797 elif isinstance(exc_val, ContinueLoopException): 798 logger.error( 799 "ContinueLoopException(%s) should be thrown during `agent.run_step` but caught by GameFramework this should not happen. Skipping this step; no controls are applied!", 800 exc_val, 801 ) 802 else: 803 return # skip render and likely terminate. 804 self.render_everything() 805 if self._launch_config.pygame: 806 pygame.display.flip() 807
[docs] 808 @class_or_instance_method 809 def cleanup(cls_or_self: "type[Self] | Self", *, disable_sync: bool = True, quit_pygame: bool = True): 810 """ 811 Cleans up resources and actors. 812 813 Args: 814 disable_sync: If True, will disable synchronous mode. This will 815 prevent the freezing of the Unreal Editor. 816 Default is True. 817 quit_pygame: If True, will call :py:func:`pygame.quit`. Default is True. 818 819 Note: 820 - When called from an instance with an attached agent, 821 the :python:`agent.destroy()` method is called. 822 - Otherwise will call :py:obj:`CarlaDataProvider.cleanup() <.CarlaDataProvider>`. 823 """ 824 try: 825 # Should only work for instance version, but maybe future Singleton support 826 try: 827 if cls_or_self.agent: # pyright: ignore[reportAttributeAccessIssue] 828 cls_or_self.agent.destroy() # pyright: ignore[reportAttributeAccessIssue] 829 finally: 830 if cls_or_self.world_model: # pyright: ignore[reportAttributeAccessIssue] 831 cls_or_self.world_model.destroy() # pyright: ignore[reportAttributeAccessIssue] 832 if disable_sync and cls_or_self.traffic_manager: 833 with contextlib.suppress(Exception): 834 cls_or_self.traffic_manager.set_synchronous_mode(False) 835 finally: 836 # Prevent freezing of the editor 837 if disable_sync and CarlaDataProvider.get_world() is not None: 838 # Disable Synchronous Mode 839 world_settings = carla.WorldSettings(synchronous_mode=False, fixed_delta_seconds=0.0) 840 cls_or_self._world.apply_settings(world_settings) 841 CarlaDataProvider.cleanup() 842 if quit_pygame: 843 pygame.quit()
844 845 # Include access to our exceptions here 846 exceptions = _exceptions 847 """ 848 shortcut to :py:mod:`.exceptions` module containing custom exceptions. 849 850 :meta hide-value: 851 """
852 853 854# ============================================================================== 855# -- World --------------------------------------------------------------- 856# ============================================================================== 857 858
[docs] 859class WorldModel(AccessCarlaMixin, CarlaDataProvider): 860 """ 861 Class representing the surrounding environment. 862 863 This class is the interface between the agent, the :external_py_class:`carla.World`, the 864 :py:class:`.HUD`, and the :py:class:`.KeyboardControl`. 865 It handles ticking of the simulator and rendering of the pygame interface. 866 867 If :py:attr:`.LaunchConfig.externalActor` is set, it will look for an actor with the role name 868 :py:attr:`.LaunchConfig.rolename`, if such an actor does not yet exist it will wait for its 869 creation until the calling script continues. 870 """ 871 872 controller: Optional[Union[RSSKeyboardControl, weakref.ProxyType[RSSKeyboardControl]]] = None 873 """ 874 Set when controller is created. Uses :py:func`weakref.proxy` as backreference. 875 This is not a :py:mod:`weakref` object, when 876 `with gameframework(agent) <GameFramework.__call__>`:py:meth: is used. 877 """ 878 879 game_framework: Optional["weakref.CallableProxyType[GameFramework]"] = None 880 """ 881 Set when the WorldModel is created via the :py:class:`GameFramework`. 882 Uses :py:func`weakref.proxy` as backreference. 883 884 Attention: 885 Currently not used only only set when using: 886 - py:meth:`GameFramework.init_agent_and_interface` 887 - py:meth:`GameFramework.make_world_model` 888 889 :meta private: 890 """ 891 892 player: carla.Vehicle 893 """ 894 The linked actor. If :py:attr:`.external_actor` is set this will be the first actor found 895 with that role name. 896 """ 897
[docs] 898 def __init__( 899 self, 900 config: "LunaticAgentSettings", 901 args: Union[LaunchConfig, Mapping[str, Any], "os.PathLike[str]", str] = "./conf/launch_config.yaml", 902 agent: Optional[LunaticAgent] = None, 903 *, 904 carla_world: Optional[carla.World] = None, 905 player: Optional[carla.Vehicle] = None, 906 map_inst: Optional[carla.Map] = None, 907 ): 908 """Constructor method""" 909 # Set World 910 if self.get_world() is None: 911 if carla_world is None: 912 raise ValueError("CarlaDataProvider not available and `carla_world` not passed.") 913 self.world = carla_world 914 elif carla_world is not None and self.world != carla_world: 915 raise ValueError("CarlaDataProvider.get_world() and passed `carla_world` are not the same.") 916 917 self.world_settings: carla.WorldSettings = self.world.get_settings() 918 """Object containing data about the simulation such as synchrony or rendering mode.""" 919 920 if agent: 921 agent._world_model = self # backreference, if needed; the LunaticAgent sets this as well. 922 923 if self.map is not None: # if this is set accesses CarlaDataProvider 924 if map_inst and self.map != map_inst: 925 raise ValueError("CarlaDataProvider.get_map() and passed map_inst are not the same.") 926 elif map_inst: 927 if isinstance(map_inst, carla.Map): 928 self.map = map_inst 929 else: 930 logger.warning("Warning: Ignoring the given map as it is not a 'carla.Map'") 931 if self.map is None: 932 try: 933 self.set_world(self.world) # CDP function 934 except RuntimeError as error: 935 print(f"RuntimeError: {error}") 936 print(" The server could not send the OpenDRIVE (.xodr) file:") 937 print(" Make sure it exists, has the same name of your town, and is correct.") 938 sys.exit(1) 939 940 self._config = config 941 if not isinstance(args, (Mapping, DictConfig, LaunchConfig)): # TODO: should rather check for string like 942 # Args is expected to be a string here 943 # NOTE: This does NOT INCLUDE CLI OVERWRITES 944 # When passed as path with directory 945 config_dir, config_name = os.path.split(args) 946 try: 947 args = GameFramework.initialize_hydra(config_dir, config_name) 948 except ValueError: 949 # Hydra already initialized 950 args = GameFramework.load_hydra_config(config_name) 951 except Exception: 952 print("Problem with", type(args), args) 953 raise 954 args.externalActor = not (player is not None or agent is not None) # TEMP: Remove to force clean config. 955 self._args: LaunchConfig = typing.cast("LaunchConfig", args) 956 957 self.hud: HUD 958 """The :py:class:`HUD` that is managed.""" 959 960 self.world_tick_id: Optional[int] 961 """The ID of the callback tick event for the :py:class:`HUD`.""" 962 963 if self._args.pygame: 964 self.hud = HUD(self._args.width, self._args.height, self.world) 965 self.world_tick_id = self.world.on_tick(self.hud.on_world_tick) 966 else: 967 self.hud = MockDummy.create_dummy(HUD) 968 self.world_tick_id = None 969 970 self.sync: Optional[bool] = self._args.sync 971 """Set from :py:attr:`.LaunchConfig.sync`""" 972 973 self.dim = (self._args.width, self._args.height) 974 975 self.external_actor: bool = self._args.externalActor 976 """Set from :py:attr:`.LaunchConfig.externalActor`""" 977 978 self.actor_role_name: Optional[str] = self._args.rolename 979 """Set from :py:attr:`.LaunchConfig.rolename`""" 980 981 self._actor_filter = self._args.filter 982 self._actor_generation: Literal[1, 2, "all"] = self._args.generation 983 self._gamma = self._args.camera.gamma 984 985 # TODO: Unify with CameraManager 986 self.recording = False 987 self._has_recorded = False 988 self._recording_dirs = [] 989 self.recording_frame_num = 0 990 self.recording_dir_num = 0 991 992 # From manual controls; used client.start_recorder() 993 # 994 self.recording_enabled: bool = False 995 """ 996 Indicator if :py:attr:`carla.Client` recording feature is on or off. 997 998 Experimental & Untested! 999 CTRL + R : toggle recording of simulation (replacing any previous) 1000 CTRL + P : start replaying last recorded simulation 1001 1002 :meta private: 1003 """ 1004 1005 self.recording_start = 0 1006 """ 1007 CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds) 1008 CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds) 1009 1010 :meta private: 1011 """ 1012 1013 if self.external_actor and (player is not None or agent is not None): 1014 raise ValueError("External actor cannot be used with player or agent.") 1015 if player is None and agent is not None: 1016 self.player = agent._vehicle 1017 elif player is not None and agent is not None: 1018 if player != agent._vehicle: 1019 raise ValueError("Passed `player` and `agent._vehicle` are not the same.") 1020 self.player = player 1021 else: 1022 # If player is None here, will set in in restart() 1023 self.player = player # pyright: ignore[reportAttributeAccessIssue] 1024 1025 assert self.player is not None or self.external_actor # Note: Former optional. Player set in restart 1026 1027 self.collision_sensor: CollisionSensor = None # type: ignore # set in restart 1028 self.lane_invasion_sensor: Optional[LaneInvasionSensor] = None # set in restart 1029 self.gnss_sensor: Optional[GnssSensor] = None 1030 self.imu_sensor: Optional[IMUSensor] = None # from interactive 1031 self.radar_sensor: Optional[RadarSensor] = None # from interactive 1032 self.camera_manager: CameraManager = None # type: ignore # set in restart 1033 """ 1034 Manages cameras for the user interface and :py:class:`.HUD`. 1035 """ 1036 1037 self._weather_presets = CarlaDataProvider.find_weather_presets() 1038 self._weather_index = 0 1039 self.weather: str = "NotSet" 1040 """ 1041 Name of currently used weather preset. 1042 See also: :py:class:`CarlaDataProvider.find_weather_presets()<CarlaDataProvider>` 1043 1044 :meta hide-value: 1045 """ 1046 1047 self.actors: List[Union[carla.Actor, CustomSensorInterface]] = [] 1048 """Actors attached to this instance for the user interface and :py:class:`.HUD`.""" 1049 1050 # From interactive: 1051 self.constant_velocity_enabled = NotImplemented #: :meta private: 1052 self.show_vehicle_telemetry = False #: :meta private: # enabled via KeyboardController 1053 self.doors_are_open: bool = False 1054 """ 1055 Note: 1056 Only set over the KeyboardController, does not query the actor or simulation 1057 1058 :meta private: 1059 """ 1060 1061 self.current_map_layer = 0 1062 self.map_layer_names = [ 1063 carla.MapLayer.NONE, 1064 carla.MapLayer.Buildings, 1065 carla.MapLayer.Decals, 1066 carla.MapLayer.Foliage, 1067 carla.MapLayer.Ground, 1068 carla.MapLayer.ParkedVehicles, 1069 carla.MapLayer.Particles, 1070 carla.MapLayer.Props, 1071 carla.MapLayer.StreetLights, 1072 carla.MapLayer.Walls, 1073 carla.MapLayer.All, 1074 ] 1075 # RSS 1076 # set in restart 1077 self.rss_sensor: Optional[RssSensor] = None 1078 self.rss_unstructured_scene_visualizer: RssUnstructuredSceneVisualizer = None # type: ignore[assignment] 1079 self.rss_bounding_box_visualizer: RssBoundingBoxVisualizer = None # type: ignore[assignment] 1080 1081 if config.rss: 1082 if config.rss.enabled and not self._actor_filter.startswith("vehicle."): 1083 print("Error: RSS only supports vehicles as ego. Disable RSS or use a vehicle filter for the actor.") 1084 sys.exit(1) 1085 if AD_RSS_AVAILABLE and config.rss.enabled: 1086 self._restrictor = carla.RssRestrictor() 1087 else: 1088 self._restrictor = None 1089 else: 1090 self._restrictor = None 1091 1092 logger.info("Calling WorldModel.restart()") 1093 self._first_start = True 1094 """Indicate that restart was called for the first time.""" 1095 self.restart() 1096 assert self.player is not None 1097 self._vehicle_physics = self.player.get_physics_control() 1098 1099 # Both of these cases should not happen; call_prepare map again. 1100 if CarlaDataProvider._traffic_light_map is None: 1101 logger.error("Traffic light map not set at this point on the ") 1102 CarlaDataProvider.set_world(self._world) 1103 elif not CarlaDataProvider._traffic_light_map: 1104 logger.warning("Traffic light map is empty. Are there no traffic lights in the map?. Checking again.") 1105 CarlaDataProvider.prepare_map()
1106
[docs] 1107 def rss_set_road_boundaries_mode( 1108 self, 1109 road_boundaries_mode: Optional[Union["RssRoadBoundariesModeAlias", carla.RssRoadBoundariesMode, bool]] = None, 1110 ) -> None: 1111 """ 1112 Choose wether or not to use the RSS road boundaries feature. 1113 1114 Toggles: :py:attr:`.RssSettings.use_stay_on_road_feature` 1115 1116 Parameters: 1117 road_boundaries_mode: If :python:`None`, uses the value from the config. 1118 If :python:`True`, sets to :py:attr:`carla.RssRoadBoundariesMode.On`. 1119 If :python:`False`, sets to :py:attr:`carla.RssRoadBoundariesMode.Off`. 1120 1121 See Also: 1122 - :py:class:`carla.RssSensor` 1123 """ 1124 # Called from KeyboardControl 1125 if road_boundaries_mode is None: 1126 road_boundaries_mode = self._config.rss.use_stay_on_road_feature 1127 else: 1128 # Depending on AD_RSS_AVAILABLE this uses carla.RssRoadBoundariesMode or the alias 1129 if road_boundaries_mode: 1130 self._config.rss.use_stay_on_road_feature = RssRoadBoundariesMode.On 1131 else: 1132 self._config.rss.use_stay_on_road_feature = RssRoadBoundariesMode.Off 1133 if self.rss_sensor: 1134 self.rss_sensor.sensor.road_boundaries_mode = ( 1135 carla.RssRoadBoundariesMode.On if road_boundaries_mode else carla.RssRoadBoundariesMode.Off 1136 ) 1137 else: 1138 print("Warning: RSS Road Boundaries Mode not set. RSS sensor not found.")
1139 1140 def toggle_pause(self): 1141 """ 1142 Toggle pause_simulation from the KeyboardControls. 1143 1144 :meta private: 1145 """ 1146 settings = self.world.get_settings() 1147 self.pause_simulation(not settings.synchronous_mode) 1148
[docs] 1149 def pause_simulation(self, pause: bool): 1150 """ 1151 Pauses the simulation by setting the world to synchronous mode. 1152 1153 Attention: 1154 Only works reliable in **asynchronous mode** (:py:attr:`sync=False <.LaunchConfig.sync>`) 1155 and might lead to unexpected behavior in synchronous mode. 1156 """ 1157 if self._args.sync: 1158 logger.warning("Pause simulation only works in asynchronous mode.") 1159 1160 settings = self.world.get_settings() 1161 if pause and not settings.synchronous_mode: 1162 settings.synchronous_mode = True 1163 settings.fixed_delta_seconds = 0.05 1164 self.world.apply_settings(settings) 1165 elif not pause and settings.synchronous_mode: 1166 settings.synchronous_mode = False 1167 settings.fixed_delta_seconds = None 1168 self.world.apply_settings(settings)
1169 1170 @staticmethod 1171 def _find_external_actor( 1172 world: carla.World, role_name: str, actor_list: Optional[carla.ActorList] = None 1173 ) -> Optional[carla.Actor]: 1174 """ 1175 Looks to find an actor with a matching **role_name**. 1176 """ 1177 player = None 1178 for actor in actor_list or world.get_actors(): 1179 if actor.attributes.get("role_name") == role_name: 1180 if player is not None: 1181 logger.error( 1182 "Multiple actors with role_name `%s` found. id: %s. Returning the first one found.", 1183 role_name, 1184 actor.id, 1185 ) 1186 else: 1187 player = actor 1188 return player 1189 1190 def _wait_for_external_actor(self, timeout: float = 20, sleep: float = 3) -> carla.Actor: 1191 """ 1192 Does not resume the script until an external actor with the role name is found. 1193 1194 Raises: 1195 AssertionError: If :py:attr:`actor_role_name` is not set. 1196 SystemExit: If the actor is not found within the time period. 1197 """ 1198 assert self.actor_role_name 1199 self.tick_server_world() # Tick the world? 1200 start = time.time() 1201 t = start 1202 while t < start + timeout: 1203 player = self._find_external_actor(self.world, self.actor_role_name) 1204 if player is not None: 1205 return player 1206 logger.info("...External actor not found. Waiting to find external actor named `%s`", self.actor_role_name) 1207 time.sleep(sleep) # Note if on same thread, nothing will happen. Put function into thread? 1208 self.tick_server_world() # Tick the world? 1209 t = time.time() 1210 logger.error(f"External actor `{self.actor_role_name}` not found. Exiting...") 1211 print(f"External actor `{self.actor_role_name}` not found. Exiting...") 1212 sys.exit(1) 1213
[docs] 1214 def restart(self): 1215 """ 1216 Restart the world and sets up the :py:class:`.HUD` sensors. 1217 If :py:attr:`player` is not set or :py:attr:`external_actor` is set, 1218 looks for an actor with the role name, or spawns a new actor. 1219 1220 Note: 1221 Called during :py:meth:`__init__`. 1222 """ 1223 # Keep same camera config if the camera manager exists. 1224 cam_index = typing.cast("int", self.camera_manager.index if self.camera_manager is not None else 0) 1225 cam_pos_id = self.camera_manager.transform_index if self.camera_manager is not None else 0 1226 if self.external_actor: 1227 # Check whether there is already an actor with defined role name 1228 if not self.actor_role_name: 1229 raise ValueError("When using external actor, rolename must be set.") 1230 actor_list = self.world.get_actors() # In sync mode the actor list could be empty 1231 external_actor = self._find_external_actor(self.world, self.actor_role_name, actor_list) 1232 if self.player is None: 1233 if external_actor: 1234 self.player = typing.cast("carla.Vehicle", external_actor) 1235 else: 1236 self.player = typing.cast( 1237 "carla.Vehicle", 1238 self._wait_for_external_actor(timeout=self._args.timeout + 10), 1239 ) 1240 elif ( 1241 external_actor and self.player.id != external_actor.id 1242 ): # NOTE: even with same id different instances and hashes. 1243 logger.warning( 1244 "External actor found with role_name `%s` but different id. " 1245 "Keeping the current actor (%s) and ignoring the external actor (%s)", 1246 self.actor_role_name, 1247 self.player.id, 1248 external_actor.id, 1249 ) 1250 if TYPE_CHECKING: 1251 self.player = typing.cast("carla.Vehicle", self.player) 1252 1253 else: 1254 # Get a random blueprint. 1255 if self.player is None or self.camera_manager is not None: 1256 # First pass without a player or second pass -> new player 1257 blueprint: carla.ActorBlueprint = typing.cast( 1258 "carla.ActorBlueprint", 1259 random.choice(get_actor_blueprints(self._actor_filter, self._actor_generation)), 1260 ) 1261 blueprint.set_attribute("role_name", self.actor_role_name) # type: ignore[arg-type] 1262 if blueprint.has_attribute("color"): 1263 color = random.choice(blueprint.get_attribute("color").recommended_values) 1264 blueprint.set_attribute("color", color) 1265 # From Interactive: 1266 if blueprint.has_attribute("terramechanics"): # For Tire mechanics/Physics? # Todo is that needed? 1267 blueprint.set_attribute("terramechanics", "false") 1268 if blueprint.has_attribute("driver_id"): 1269 driver_id = random.choice(blueprint.get_attribute("driver_id").recommended_values) 1270 blueprint.set_attribute("driver_id", driver_id) 1271 if blueprint.has_attribute("is_invincible"): 1272 blueprint.set_attribute("is_invincible", "true") 1273 1274 # TODO: Make this a config option to choose automatically. 1275 # set the max speed 1276 # if blueprint.has_attribute('speed'): 1277 # self.player_max_speed = float(blueprint.get_attribute('speed').recommended_values[1]) 1278 # self.player_max_speed_fast = float(blueprint.get_attribute('speed').recommended_values[2]) 1279 1280 # Spawn the player. 1281 if self.player is not None: 1282 spawn_point = self.player.get_transform() 1283 spawn_point.location.z += 2.0 1284 spawn_point.rotation.roll = 0.0 1285 spawn_point.rotation.pitch = 0.0 1286 if self.camera_manager is not None: # None at first start; not None if player was already set before 1287 self.destroy() 1288 self.player = typing.cast("carla.Vehicle", self.world.try_spawn_actor(blueprint, spawn_point)) # pyright: ignore[reportPossiblyUnboundVariable] 1289 self.modify_vehicle_physics(self.player) 1290 while self.player is None: 1291 if not self.map.get_spawn_points(): 1292 print("There are no spawn points available in your map/town.") 1293 print("Please add some Vehicle Spawn Point to your UE4 scene.") 1294 sys.exit(1) 1295 spawn_points = self.map.get_spawn_points() 1296 spawn_point: carla.Transform = random.choice(spawn_points) if spawn_points else carla.Transform() # type: ignore 1297 self.player = typing.cast("carla.Vehicle", self.world.try_spawn_actor(blueprint, spawn_point)) # pyright: ignore[reportPossiblyUnboundVariable] 1298 # From Interactive: 1299 # See: https://carla.readthedocs.io/en/latest/tuto_G_control_vehicle_physics/ 1300 self.show_vehicle_telemetry = False 1301 self.modify_vehicle_physics(self.player) 1302 1303 # Clean external actors restarting a second time 1304 if self.external_actor and ( 1305 # None cleans only first time; False will never clean. 1306 (self._args.restart_clean_sensors is not False and not self._first_start) 1307 or self._args.restart_clean_sensors is True 1308 ): 1309 ego_sensors = [actor for actor in self.world.get_actors() if actor.parent == self.player] 1310 1311 # Remove all old sensors 1312 for ego_sensor in ego_sensors: 1313 if ego_sensor is not None: 1314 ego_sensor.destroy() 1315 1316 # Set up the sensors. 1317 self.collision_sensor = CollisionSensor(self.player, self.hud) 1318 if not HUD.is_dummy(self.hud): 1319 self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud) 1320 self.actors.append(self.lane_invasion_sensor) 1321 else: 1322 self.lane_invasion_sensor = None 1323 self.gnss_sensor = None # GnssSensor(self.player) # TODO: make it optional 1324 self.imu_sensor = None # IMUSensor(self.player) 1325 self.actors.extend([ 1326 self.collision_sensor, 1327 ]) 1328 if self.gnss_sensor: 1329 self.actors.append(self.gnss_sensor) 1330 if self.imu_sensor: 1331 self.actors.append(self.imu_sensor) 1332 1333 logger.log(0, "Setting up camera manager") 1334 if self._args.pygame: 1335 assert isinstance(self.hud, HUD) 1336 self.camera_manager = CameraManager(self.player, self.hud, self._args) 1337 self.camera_manager.transform_index = cam_pos_id 1338 self.camera_manager.set_sensor(cam_index, notify=False) 1339 assert self.camera_manager.sensor 1340 logger.log(0, "Camera Manager set up") 1341 1342 actor_type = get_actor_display_name(self.player) 1343 self.hud.notification(text=actor_type) 1344 1345 self.rss_unstructured_scene_visualizer = RssUnstructuredSceneVisualizer( 1346 self.player, self.world, self.dim, gamma_correction=self._gamma 1347 ) # TODO: use args instead of gamma 1348 self.rss_bounding_box_visualizer = RssBoundingBoxVisualizer( 1349 self.dim, self.world, self.camera_manager.sensor 1350 ) 1351 rss_state_visualizer = self.hud.rss_state_visualizer 1352 else: 1353 if self._args.camera.spectator: 1354 # only use spectator 1355 CameraManager.follow_actor(self.player) 1356 self.camera_manager = MockDummy.create_dummy(CameraManager) # NOTE: Does not end thread 1357 self.rss_bounding_box_visualizer = MockDummy.create_dummy(RssBoundingBoxVisualizer) 1358 self.rss_unstructured_scene_visualizer = MockDummy.create_dummy(RssUnstructuredSceneVisualizer) 1359 rss_state_visualizer: "RssStateVisualizer" = MockDummy.create_dummy() 1360 if AD_RSS_AVAILABLE and self._config.rss and self._config.rss.enabled: 1361 log_level = self._config.rss.log_level 1362 # Assure correct log level type 1363 if not isinstance(log_level, carla.RssLogLevel): 1364 try: 1365 if isinstance(log_level, str): 1366 log_level = carla.RssLogLevel.names[log_level] 1367 else: 1368 log_level = carla.RssLogLevel(log_level) 1369 except Exception as e: 1370 msg = f"Could not convert '{log_level}' to RssLogLevel must be in {list(carla.RssLogLevel.names.keys())}" 1371 raise KeyError(msg) from e 1372 logger.debug("Carla Log level was not a RssLogLevel") 1373 self.rss_sensor = RssSensor( 1374 self.player, 1375 self.rss_unstructured_scene_visualizer, 1376 self.rss_bounding_box_visualizer, 1377 rss_state_visualizer, 1378 visualizer_mode=self._config.rss.debug_visualization_mode, 1379 log_level=log_level, 1380 ) 1381 self.rss_set_road_boundaries_mode(self._config.rss.use_stay_on_road_feature) 1382 else: 1383 self.rss_sensor = None 1384 self._first_start = False 1385 self.tick_server_world()
1386
[docs] 1387 def tick_server_world(self) -> "int | carla.WorldSnapshot | None": 1388 """ 1389 When :py:attr:`.LaunchConfig.handle_ticks` is :python:`True` 1390 uses :external_py_meth:`carla.World.tick` or :external_py_meth:`carla.World.wait_for_tick` 1391 depending on :py:attr:`.LaunchConfig.sync`. 1392 """ 1393 if self._args.handle_ticks: 1394 if self.sync: 1395 # if timeout is used it it might interfere with _wait_for_external_actor 1396 return self.world.tick(self._args.timeout) 1397 return self.world.wait_for_tick(self._args.timeout) 1398 return None
1399 1400 # def tick(self, clock): 1401 # self.hud.tick(self.player, clock) # RSS example. TODO: Check which has to be used! 1402
[docs] 1403 def tick(self, clock: "pygame.time.Clock"): 1404 """Method for every tick""" 1405 self.hud.tick(self, clock, InformationManager.obstacles)
1406
[docs] 1407 def next_weather(self, reverse: bool = False) -> None: 1408 """Get next weather setting""" 1409 self._weather_index += -1 if reverse else 1 1410 self._weather_index %= len(self._weather_presets) 1411 preset = self._weather_presets[self._weather_index] 1412 self.hud.notification(f"Weather: {preset[1]}") 1413 self.player.get_world().set_weather(preset[0]) 1414 self.weather = preset[1]
1415
[docs] 1416 def next_map_layer(self, reverse: bool = False) -> None: 1417 self.current_map_layer += -1 if reverse else 1 1418 self.current_map_layer %= len(self.map_layer_names) 1419 selected = self.map_layer_names[self.current_map_layer] 1420 self.hud.notification(f"LayerMap selected: {selected}")
1421
[docs] 1422 def load_map_layer(self, unload: bool = False): 1423 selected = self.map_layer_names[self.current_map_layer] 1424 if unload: 1425 self.hud.notification(f"Unloading map layer: {selected}") 1426 self.world.unload_map_layer(selected) 1427 else: 1428 self.hud.notification(f"Loading map layer: {selected}") 1429 self.world.load_map_layer(selected)
1430
[docs] 1431 def toggle_recording(self): 1432 """ 1433 Start recording images from the camera output. 1434 1435 Saved in 1436 :py:attr:`LaunchConfig.camera.recorder.output_path <.CameraConfig.RecorderSettings.output_path>` 1437 with the current frame number. 1438 """ 1439 if not self.recording: 1440 self._has_recorded = True 1441 dir_name, filename = os.path.split(self._args.camera.recorder.output_path) 1442 try: 1443 dir_name_formatted = dir_name % self.recording_dir_num 1444 except TypeError: 1445 dir_name += "%04d" 1446 dir_name_formatted = dir_name % self.recording_dir_num 1447 while os.path.exists(dir_name_formatted): # noqa: PTH110 1448 self.recording_dir_num += 1 1449 dir_name_formatted = dir_name % self.recording_dir_num 1450 self.recording_file_format = os.path.join(dir_name, filename) # keep unformatted. # noqa: PTH118 1451 self.recording_frame_num = 0 1452 Path(dir_name_formatted).mkdir(parents=True, exist_ok=True) 1453 self.hud.notification(f"Started recording (folder: {dir_name_formatted})") 1454 self._recording_dirs.append(dir_name_formatted) 1455 else: 1456 dir_name_formatted = os.path.split(self.recording_file_format)[0] % self.recording_dir_num 1457 self.hud.notification(f"Recording finished (folder: {dir_name_formatted})") 1458 1459 self.recording = not self.recording
1460
[docs] 1461 def toggle_radar(self): 1462 """Adds or destroys a radar sensor for the user interface""" 1463 if self.radar_sensor is None: 1464 self.radar_sensor = RadarSensor(self.player) 1465 elif self.radar_sensor.sensor is not None: 1466 self.radar_sensor.sensor.destroy() 1467 self.radar_sensor = None
1468
[docs] 1469 def modify_vehicle_physics(self, actor: carla.Vehicle): 1470 # If actor is not a vehicle, we cannot use the physics control 1471 try: 1472 physics_control = actor.get_physics_control() 1473 physics_control.use_sweep_wheel_collision = True 1474 actor.apply_physics_control(physics_control) 1475 except Exception: 1476 logger.warning("Could not modify vehicle physics for actor: %s", actor)
1477
[docs] 1478 def finalize_render(self, display: pygame.Surface): 1479 """ 1480 Draws the HUD and saves the image if recording is enabled. 1481 1482 Attention: 1483 Assumes that :py:meth:`render(..., finalize=False)<render>` was called before and 1484 something should be rendered in between. 1485 Use :py:meth:`finalize_render` as the very last step in the render process. 1486 """ 1487 self.hud.render(display) 1488 if self.recording: 1489 try: 1490 pygame.image.save( 1491 display, self.recording_file_format % (self.recording_dir_num, self.recording_frame_num) 1492 ) 1493 except Exception as e: 1494 logger.error( 1495 "Could not save image format: `%s` % (self.recording_dir_num, self.recording_frame_num): %s", 1496 self.recording_file_format, 1497 e, 1498 ) 1499 self.recording_frame_num += 1
1500
[docs] 1501 def render(self, display: pygame.Surface, finalize: bool = True): 1502 """ 1503 Render the world and draw it to the :py:class:`pygame.Surface`. 1504 This function is part of :py:meth:`.GameFramework.render_everything` **which is the 1505 recommended way to handle the rendering process.** 1506 1507 Hint: 1508 Recording of the fully rendered output should be done at the end of the render method, 1509 however :py:meth:`.CameraManager.render` is called first to render the camera. 1510 1511 Call with **finalize=False** to only render the camera but not the :py:class:`.HUD`. 1512 1513 Afterwards apply other render features and call :py:meth:`finalize_render` 1514 to draw the HUD and save the image if recording is enabled. 1515 1516 Note: 1517 This renders the :py:class:`.CameraManger` and the RSS bounding boxes. 1518 If **finalize** is :python:`True`, it will also render the :py:class:`HUD` by calling 1519 :py:meth:`finalize_render`. 1520 """ 1521 self.camera_manager.render(display) 1522 self.rss_bounding_box_visualizer.render(display, self.camera_manager.current_frame) 1523 self.rss_unstructured_scene_visualizer.render(display) 1524 if finalize: 1525 self.finalize_render(display)
1526
[docs] 1527 def destroy_sensors(self): 1528 """Destroy sensors""" 1529 if self.rss_sensor: 1530 self.rss_sensor.destroy() 1531 self.rss_sensor = None 1532 if self.rss_unstructured_scene_visualizer: 1533 self.rss_unstructured_scene_visualizer.destroy() 1534 self.rss_unstructured_scene_visualizer = None # type: ignore[assignment] 1535 if self.camera_manager is not None: 1536 self.camera_manager.destroy() 1537 self.camera_manager = None # type: ignore[assignment] 1538 if self.radar_sensor is not None: 1539 self.toggle_radar() # destroys it if not None
1540
[docs] 1541 def destroy(self, destroy_ego: bool = False): 1542 """ 1543 Destroys all actors 1544 1545 Parameters: 1546 destroy_ego: If True, will destroy the :py:attr:`player` as well. Else assume that it 1547 is destroyed by someone else. 1548 """ 1549 # stop from ticking 1550 if self.world_tick_id and self.world: 1551 self.world.remove_on_tick(self.world_tick_id) 1552 self.destroy_sensors() 1553 if destroy_ego and self.player not in self.actors: # do not destroy external actors. 1554 logger.debug("Adding player to destruction list.") 1555 self.actors.append(self.player) 1556 elif not destroy_ego and self.player in self.actors: 1557 logger.warning( 1558 "destroy_ego=False, but player is in actors list. Destroying the actor from within WorldModel.destroy." 1559 ) 1560 1561 # logger.info("to destroy %s", list(map(str, self.actors))) 1562 # Batch destroy in one simulation step 1563 real_actors: Sequence[carla.Actor] = [actor for actor in self.actors if isinstance(actor, carla.Actor)] 1564 GameFramework.destroy_actors(real_actors) 1565 1566 # e.g. CustomSensorInterface 1567 other_actors = [actor for actor in self.actors if actor and not isinstance(actor, carla.Actor)] 1568 while other_actors: 1569 actor = other_actors.pop(0) 1570 if actor is not None: 1571 print("destroying actor: " + str(actor), end=" destroyed=") 1572 try: 1573 actor.stop() 1574 except AttributeError as e: 1575 logger.debug("Error with actor {}: {}", actor, e) 1576 try: 1577 x = actor.destroy() # Non carla instances 1578 print(x) 1579 except RuntimeError: 1580 logger.warning("Could not destroy actor: " + str(actor)) 1581 self.actors.clear() 1582 if self._args.handle_ticks and self.get_world(): 1583 self.get_world().tick() 1584 1585 if self._has_recorded: 1586 runtime_dir = GameFramework.get_hydra_config().runtime.output_dir 1587 for dir_name_formatted in self._recording_dirs: 1588 dirname = os.path.split(dir_name_formatted)[1] 1589 subprocess.run([ 1590 "ffmpeg", 1591 "-an", 1592 "-sn", 1593 "-i", 1594 f"{dir_name_formatted}/%08d.bmp", 1595 "-framerate", 1596 "1", 1597 "-vcodec", 1598 "mpeg4", 1599 "-r", 1600 "60", 1601 f"{Path(runtime_dir) / dirname}.avi", 1602 ]) # noqa: E501 1603 # os.system(f'ffmpeg -an -sn -i "{dir_name_formatted}/%08d.bmp" -framerate 1 -vcodec mpeg4 -r 60 "{os.path.join(runtime_dir, dirname)}.avi"') # noqa: E501 1604 print(f"Recording saved in {Path(runtime_dir) / dirname}.avi")
1605
[docs] 1606 def rss_check_control(self, vehicle_control: carla.VehicleControl) -> carla.VehicleControl | None: 1607 """ 1608 Checks the vehicle control against the RSS restrictions and possibly proposes an alternative. 1609 1610 This is called during :py:attr:`.Phase.RSS_EVALUATION`. 1611 1612 Todo: 1613 This should be an agent method, but currently the :py:class:`~.rss_sensor.RssSensor` is 1614 tied to this class and not the agent. 1615 """ 1616 self.hud.original_vehicle_control = vehicle_control 1617 self.hud.restricted_vehicle_control = vehicle_control 1618 if not AD_RSS_AVAILABLE or not self.rss_sensor: 1619 return None 1620 1621 if ( 1622 self.rss_sensor.log_level <= carla.RssLogLevel.warn 1623 and self.rss_sensor.ego_dynamics_on_route 1624 and not self.rss_sensor.ego_dynamics_on_route.ego_center_within_route 1625 ): 1626 logger.warning("RSS: Not on route! " + str(self.rss_sensor.ego_dynamics_on_route)[:47] + "...") 1627 # Is there a proper response? 1628 rss_proper_response = ( 1629 self.rss_sensor.proper_response if self.rss_sensor and self.rss_sensor.response_valid else None 1630 ) 1631 if rss_proper_response: 1632 # adjust the controls 1633 proposed_vehicle_control = self._restrictor.restrict_vehicle_control( # pyright: ignore[reportOptionalMemberAccess] 1634 vehicle_control, 1635 rss_proper_response, 1636 self.rss_sensor.ego_dynamics_on_route, # type: ignore[arg-type] 1637 self._vehicle_physics, 1638 ) 1639 self.hud.restricted_vehicle_control = proposed_vehicle_control 1640 self.hud.allowed_steering_ranges = self.rss_sensor.get_steering_ranges() 1641 return proposed_vehicle_control 1642 return None