1"""
2Interface classes between CARLA, the agent, and the user interface.
3"""
4
5# pyright: reportUnknownMemberType=false
6
7from __future__ import annotations
8
9import contextlib
10import os
11from pathlib import Path
12import subprocess
13import sys
14import time
15import weakref
16
17from collections.abc import Mapping
18from typing import TYPE_CHECKING, Any, ClassVar, Iterable, List, NoReturn, Optional, Sequence, Union, overload
19import typing
20
21import carla
22import hydra
23from numpy import random as nprandom
24import pygame
25import random
26from hydra.core.global_hydra import GlobalHydra
27from hydra.core.hydra_config import HydraConfig
28from hydra.core.utils import configure_log
29from omegaconf import DictConfig, OmegaConf, open_dict
30
31from agents.tools.config_creation import LaunchConfig, RssLogLevel, RssRoadBoundariesMode, config_store
32from classes import MockDummy, exceptions as _exceptions
33from classes.ui.camera_manager import CameraManager
34from classes.sensors._sensor_interface import CustomSensorInterface
35from classes.sensors.carla_originals import CollisionSensor, GnssSensor, IMUSensor, LaneInvasionSensor, RadarSensor
36from classes.exceptions import AgentDoneException, ContinueLoopException
37from classes.ui.hud import HUD, get_actor_display_name
38from classes.ui.keyboard_controls import KeyboardControl, RSSKeyboardControl
39from classes.sensors.rss_sensor import AD_RSS_AVAILABLE, RssSensor
40from classes.sensors.rss_visualization import (
41 RssBoundingBoxVisualizer,
42 RssStateVisualizer,
43 RssUnstructuredSceneVisualizer,
44)
45from classes.information_manager import InformationManager
46from launch_tools import class_or_instance_method
47
48if TYPE_CHECKING:
49 from hydra.conf import HydraConf
50 from typing_extensions import Self
51
52 from agents.lunatic_agent import LunaticAgent
53 from agents.navigation.global_route_planner import GlobalRoutePlanner
54 from agents.tools.config_creation import LunaticAgentSettings, RssRoadBoundariesModeAlias
55 from classes.type_protocols import ControllerClassT
56 from classes.detection_matrix import DetectionMatrix
57
58from agents.tools.logs import logger
59from launch_tools import CarlaDataProvider, Literal
60from launch_tools.blueprint_helpers import get_actor_blueprints
61
62
[docs]
63class AccessCarlaMixin:
64 """
65 Mixin class that delegates the attributes :py:attr:`client`, :py:attr:`map`, and :py:attr:`world`
66 to the :py:class:`.CarlaDataProvider` to keep them in sync.
67
68 Note:
69 This mixin only works for instances, they are not class attributes.
70 """
71
72 @property
73 def client(self) -> carla.Client:
74 return CarlaDataProvider.get_client()
75
76 @client.setter
77 def client(self, value: carla.Client):
78 CarlaDataProvider.set_client(value)
79
80 @property
81 def world(self) -> carla.World:
82 return CarlaDataProvider.get_world()
83
84 @world.setter
85 def world(self, value: carla.World):
86 CarlaDataProvider.set_world(value)
87
88 @property
89 def map(self) -> carla.Map:
90 return CarlaDataProvider.get_map()
91
92 @map.setter
93 def map(self, value: carla.Map):
94 """
95 Avoid setting the map directly. Use :py:meth:`.CarlaDataProvider.set_world` instead.
96
97 Raises:
98 ValueError: If the map is not the same as the one set in :py:attr:`.CarlaDataProvider.get_map`.
99
100 :meta private:
101 """
102 if CarlaDataProvider.get_map() != value:
103 raise ValueError("CarlaDataProvider.get_map() and passed map are not the same.")
104 # Do nothing as map is set when using get_map or set_world
105
[docs]
106 @staticmethod
107 def get_blueprint_library() -> carla.BlueprintLibrary:
108 """
109 Access to a cached version of the blueprint library
110
111 Raises:
112 ValueError: If the blueprint library is not set, which happens when the world is not set.
113 The world must be setup before (:py:class:`CarlaDataProvider.set_world()<.CarlaDataProvider>`).
114 """
115 if CarlaDataProvider._blueprint_library is None:
116 raise ValueError("Blueprint Library not set. Call CarlaDataProvider.set_world() first.")
117 return CarlaDataProvider._blueprint_library
118
119
120# ==============================================================================
121# -- Game Framework ---------------------------------------------------------------
122# ==============================================================================
123
124
[docs]
125class GameFramework(AccessCarlaMixin, CarlaDataProvider):
126 """
127 A utility class to setup CARLA, pygame, Hydra_, the configuration and parts of the user interface
128 and the agent.
129
130 A :py:class:`GameFramework` instance can be used to control the game loop and work as a
131 handler for the :py:mod:`~classes.exceptions` of this project. Furthermore can it manage the cooldown
132 of the :py:class:`.Rule` classes.
133
134 Note:
135 The GameFramework derives from the :py:class:`.CarlaDataProvider` which gives this class
136 more utility methods that currently are not included in this part of the documentation.
137 """
138
139 clock: ClassVar[pygame.time.Clock | None] = None
140 display: ClassVar[pygame.Surface | Literal[False] | None] = None
141 """
142 The :py:mod:`pygame` surface to render on.
143
144 Is None before the initialization. :code:`False` if pygame is disabled.
145 """
146
147 controller: weakref.ProxyType[RSSKeyboardControl] | RSSKeyboardControl
148 """Parser for keyboard events"""
149
150 traffic_manager: Optional[carla.TrafficManager] = None
151
152 @property
153 def launch_config(self) -> LaunchConfig:
154 """:py:class:`.LaunchConfig` object that was used for the initialization (**args**)"""
155 return self._launch_config
156
157 @property
158 def agent_config(self) -> LunaticAgentSettings:
159 """
160 The configuration of the attached :py:attr:`agent`, if it exists otherwise the
161 **agent** attribute of the stored :py:attr:`launch_config`.
162 """
163 return self.agent.config if self.agent else self._launch_config.agent
164
165 # ----- Init Functions -----
166
[docs]
167 @classmethod
168 def quickstart(cls, launch_config: Optional[LaunchConfig] = None, *, logging: bool = False) -> "Self":
169 """
170 Initializes Hydra_ in a limited way, i.e. does not allow for command line overrides.
171
172 Sets up the :py:class:`carla.Client` and related instances as well as pygame.
173
174 Note:
175 It is recommended that you use a :python:`@hydra.main` decorated main function instead
176 to make full use of the Hydra_ framework.
177
178 Parameters:
179 launch_config: The configuration to use. If :code:`None`, will use the default
180 configuration from :code:`./conf/launch_config.yaml`.
181 logging: If True, change the how logging is done by applying the logger settings from
182 :code:`./conf/config_extensions/job_logging.yaml`.
183 Default is :code:`False`.
184
185 Returns:
186 The initialized :py:class:`GameFramework` instance.
187
188 See Also:
189 This function uses:
190 - :py:meth:`.initialize_hydra`
191 - :py:meth:`.init_carla`
192 - :py:meth:`.init_pygame`
193 """
194 if not launch_config:
195 launch_config = cls.initialize_hydra(logging=logging)
196 if not logging and AD_RSS_AVAILABLE:
197 launch_config.agent.rss.log_level = RssLogLevel.off
198 cls.init_carla(launch_config)
199 cls.init_pygame(launch_config)
200 if launch_config.pygame is False:
201 cls.display = False
202 pygame.quit()
203 return cls(launch_config)
204
205 # Hydra Tools
206 # TODO: this could be some launch_tools MixinClass
[docs]
207 @staticmethod
208 def initialize_hydra(
209 config_dir: str = "./conf",
210 config_name: str = "launch_config",
211 version_base=None,
212 *,
213 job_name="LunaticAgentJob",
214 logging=True,
215 structured=True,
216 ) -> "LaunchConfig":
217 """
218 Use this function only if no hydra.main is available.
219
220 Usage:
221
222 .. code-block:: python
223
224 args = GameFramework.initialize_hydra(config_dir=<abs_path_of_conf>, config_name="launch_config")
225 game_framework = GameFramework(args)
226
227 Args:
228 config_dir: The directory where the hydra configuration is stored.
229 config_name: The name of the configuration file.
230 version_base: The version base of hydra for the configuration. Default is None.
231 job_name: The name of the job.
232 logging: If True, will set up logging.
233 structured: If True will create the config based on :py:class:`.LaunchConfig`,
234 otherwise it will be based on a :python:`dict`. This is useful for runtime
235 type-checks and conversions.
236 If the configs :py:attr:`.LaunchConfig.strict_config` value is < 2, this
237 parameter is ignored. Disable if you experience problems.
238 Default is :python:`True`.
239
240 See Also:
241 Hydra functions:
242 - :py:func:`hydra.initialize_config_dir`
243 - :py:func:`hydra.compose`
244 """
245 config_dir = os.path.abspath(config_dir)
246 hydra_initialized = GameFramework.hydra_initialized()
247 if not hydra_initialized:
248 # Not save-guarding this against multiple calls, expose the hydra error
249 # todo: low-prio check if config dir and the other parameters are the same.
250 hydra.initialize_config_dir(version_base=version_base, config_dir=config_dir, job_name=job_name)
251
252 dict_config = hydra.compose(config_name=config_name, return_hydra_config=not hydra_initialized, overrides=None)
253 if structured and dict_config.get("strict_config", 3) >= 2:
254 # Uses the correct dataclass schemas as values.
255 if config_name == "launch_config":
256 if LaunchConfig._config_path:
257 # Load defined schema from the config Store
258 cn = config_store.load(LaunchConfig._config_path)
259 schema: Optional[DictConfig] = cn.node # type: ignore[assignment]
260 with open_dict(schema):
261 schema.merge_with(dict_config)
262 config = typing.cast("LaunchConfig", schema)
263 else:
264 schema = None
265 else:
266 schema = None
267 if schema is None:
268 logger.debug("No schema found for structured init file %s. Falling back to LaunchConfig", config_name)
269 launch_config = LaunchConfig(**dict_config) # type: ignore
270 config: LaunchConfig = OmegaConf.structured(launch_config, flags={"allow_objects": True})
271 else:
272 config = typing.cast("LaunchConfig", dict_config)
273 assert config # pyright: ignore[reportPossiblyUnboundVariable]
274 if not hydra_initialized:
275 hydra_conf: HydraConfig = GameFramework.get_hydra_config(raw=True)
276 if OmegaConf.is_missing(config.hydra.runtime, "output_dir"):
277 config.hydra.runtime.output_dir = config.hydra.run.dir
278 hydra_conf.set_config(config) # type: ignore
279 Path(config.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True)
280 if logging:
281 # Assure that our logger works
282 configure_log(config.hydra.job_logging, logger.name) # type: ignore
283 with open_dict(config):
284 del config["hydra"]
285 config.agent._set_flag("allow_objects", True)
286 config.agent.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config.
287 return config
288
289 # TODO: Maybe unify these settings; make overrides available in the config.
[docs]
290 @staticmethod
291 def load_hydra_config(config_name: str = "conf/launch_config") -> "LaunchConfig":
292 if GameFramework.hydra_initialized():
293 return typing.cast("LaunchConfig", hydra.compose(config_name=config_name))
294 config_dir, config_name = os.path.split(config_name)
295 # Try to get job name from file in the stack.
296 import inspect # noqa
297
298 frame = inspect.stack()[-1]
299 module = inspect.getmodule(frame[0])
300 name = module.__file__ if module and module.__file__ else "unknown"
301 return GameFramework.initialize_hydra(config_dir, config_name, job_name=name)
302
[docs]
303 def __init__(
304 self,
305 args: "LaunchConfig",
306 config: Optional[DictConfig] = None,
307 timeout: float = 10.0,
308 worker_threads: int = 0,
309 *,
310 map_layers=carla.MapLayer.All,
311 ):
312 """
313 Parameters:
314 args: Configuration for the GameFramework.
315 config: Optional config for the agent (unused in this project)
316 timeout: Timeout for the :py:class:`carla.Client`.
317 worker_threads: See :py:class:`carla.Client`.
318 map_layers: See :py:meth:`carla.Client.load_world`
319 """
320 if args.seed:
321 random.seed(args.seed)
322 nprandom.seed(args.seed)
323 self._launch_config = args
324 self.world_settings: carla.WorldSettings = self.init_carla(args, timeout, worker_threads, map_layers=map_layers)
325
326 # These are class variables
327 GameFramework.clock, GameFramework.display = self.init_pygame(args)
328 if args.pygame is False:
329 GameFramework.display = False
330 pygame.quit()
331
332 self.config = config
333 self.agent = None
334 self.world_model = None
335 self.controller = None
336
337 self.debug = self.world.debug
338 self.continue_loop = True
339 self.traffic_manager: Optional[carla.TrafficManager] = self.init_traffic_manager()
340
341 # Import here to avoid circular imports
342 from classes.rule import BlockingRule, Rule # noqa: PLC0415,RUF100
343
344 self.cooldown_framework = Rule.CooldownFramework() # used in context manager. # NOTE: Currently can be constant
345 BlockingRule._gameframework = weakref.proxy(self)
346
[docs]
347 @class_or_instance_method
348 def init_pygame(
349 cls_or_self: "Self | type[Self]", launch_config: Optional[LaunchConfig] = None, recreate: bool = False
350 ) -> tuple[pygame.time.Clock, pygame.Surface | Literal[False] | None]:
351 """
352 Parameters:
353 launch_config: Will use the :py:attr:`width<.LaunchConfig.width>` and
354 :py:attr:`height<.LaunchConfig.height> attributes of this object if set the
355 :py:mod:`pygame` windows size. Otherwise will use :python:`(1280, 720)`.
356 Defaults to :code:`None`.
357
358 recreate: If :python:`True`, will reinitialize pygame a second time if this function is
359 called.
360
361 .. experimental; returns None, None if launch_config.pygame is False.
362 """
363 if recreate or GameFramework.clock is None or GameFramework.display is None:
364 if launch_config is None:
365 launch_config = getattr(cls_or_self, "_args", None) # get from inst.
366 # NOTE: launch_config could still be None, i.e. when called from a class.
367 if getattr(launch_config, "pygame", None):
368 pygame.init() # NOTE: this is currently not guaranteed anymore.
369 pygame.font.init()
370 if recreate or GameFramework.clock is None:
371 GameFramework.clock = pygame.time.Clock()
372 if recreate or GameFramework.display is None:
373 if getattr(launch_config, "pygame", None) and "READTHEDOCS" not in os.environ:
374 GameFramework.display = pygame.display.set_mode(
375 size=(launch_config.width, launch_config.height) if launch_config else (1280, 720),
376 flags=pygame.HWSURFACE | pygame.DOUBLEBUF,
377 )
378 elif getattr(launch_config, "pygame", None) is False:
379 GameFramework.display = False
380 else:
381 GameFramework.display = None # Note sure if should initialize, try again next time.
382 return GameFramework.clock, GameFramework.display
383
[docs]
384 @staticmethod
385 def init_carla(
386 args: Optional[LaunchConfig] = None,
387 timeout: Optional[float] = None,
388 worker_threads: int = 0,
389 *,
390 map_layers: carla.MapLayer = carla.MapLayer.All,
391 ) -> carla.WorldSettings:
392 """
393 Initializes the :py:class:`carla.Client` and the connects it to the simulator.
394
395 Parameters:
396 args: The configuration for the GameFramework.
397 timeout: The timeout for the :py:class:`carla.Client`. Default is 10.0.
398
399 .. deprecated:: _
400 Use the :py:attr:`.LaunchConfig.timeout` attribute instead.
401
402 worker_threads: See :py:class:`carla.Client`.
403 map_layers: The map layers to load. Default is :py:attr:`carla.MapLayer.All`.
404
405 Returns:
406 The settings of the loaded world.
407
408 Note:
409 This function has to be called before :py:attr:`client`, :py:attr:`world`
410 or :py:attr:`map` can be accessed.
411 Alternatively the client and :py:class:`CarlaDataProvider` need to be setup in a
412 different way beforehand.
413
414 See Also:
415 - :py:class:`carla.Client`
416 - :py:meth:`carla.Client.load_world`
417 """
418 # Note: This sets up the CarlaDataProvider
419 if args is None:
420 timeout = timeout or 10.0
421 GameFramework.setup_client_map_and_world(
422 timeout=timeout, worker_threads=worker_threads, map_layers=map_layers
423 )
424 else:
425 GameFramework.setup_client_map_and_world(
426 args.map,
427 args.host,
428 args.port,
429 timeout=timeout or args.timeout,
430 worker_threads=worker_threads,
431 map_layers=map_layers,
432 sync=args.sync,
433 fps=args.fps,
434 )
435 return CarlaDataProvider.get_world().get_settings()
436
[docs]
437 @staticmethod
438 def setup_client_map_and_world(
439 map_name: str = "Town04",
440 ip: str = "127.0.0.1",
441 port: int = 2000,
442 *,
443 timeout: float = 10.0,
444 worker_threads: int = 0,
445 reload_world: bool = False,
446 reset_settings: bool = True,
447 map_layers: carla.MapLayer = carla.MapLayer.All,
448 sync: Union[bool, None] = True,
449 fps: int = 20,
450 ) -> tuple[carla.Client, carla.World, carla.Map]:
451 """
452 Loads the :py:class:`carla.Client`, the :py:class:`carla.World`, and the :py:class:`carla.Map`.
453
454 This is a subfunction of :py:meth:`.init_carla`
455 that can be used without a :py:class:`LaunchConfig`.
456
457 See Also:
458 :py:meth:`.init_carla`
459 :py:meth:`carla.Client.set_timeout`
460 :py:meth:`carla.Client.reload_world`
461 """
462 client = CarlaDataProvider.get_client()
463 if client is None:
464 client = carla.Client(ip, port, worker_threads)
465 client.set_timeout(timeout)
466 CarlaDataProvider.set_client(client)
467
468 world = CarlaDataProvider.get_world()
469 if not world:
470 world = client.get_world()
471 map_ = world.get_map() # CarlaDataProvider map not yet set -> set_world
472
473 if map_name and map_.name != "Carla/Maps/" + map_name:
474 world: carla.World = client.load_world(map_name, reset_settings, map_layers)
475 elif reload_world:
476 world = client.reload_world(reset_settings)
477 logger.info("Reloaded world - map_layers ignored.")
478 elif map_name is None:
479 logger.info("Provided map_name is None, skipped loading world. Assuming world is already loaded.")
480 else:
481 logger.info(
482 "skipped loading world '%s', already loaded - map_layers and reset_settings ignored.", map_.name
483 )
484
485 world_settings = world.get_settings()
486 # Apply world settings
487 if sync is not None:
488 if sync:
489 logger.debug("Using synchronous mode.")
490 # apply synchronous mode if wanted
491 world_settings.synchronous_mode = True
492 world_settings.fixed_delta_seconds = 1 / fps # 0.05
493 world.apply_settings(world_settings)
494 else:
495 logger.debug("Using asynchronous mode.")
496 world_settings.synchronous_mode = False
497 world.apply_settings(world_settings)
498 logger.info("World Settings:\n%s", world_settings)
499
500 # Note: set_world loads multiple information. It has to be called after applying the world settings.
501 CarlaDataProvider.set_world(world)
502
503 return client, world, CarlaDataProvider.get_map()
504
[docs]
505 def init_traffic_manager(self, port: Optional[int] = None) -> carla.TrafficManager:
506 """
507 Returns an instance of the :py:class:`carla.TrafficManager`
508 related to the specified port. If it does not exist, this will be created.
509
510 See Also:
511 :py:meth:`carla.Client.get_trafficmanager`
512
513 Parameters:
514 port: The port to use. If :code:`None`, will use the port from
515 :py:meth:`.CarlaDataProvider.get_traffic_manager_port`, which defaults to :code:`8000`.
516 """
517 if port is None:
518 port = CarlaDataProvider.get_traffic_manager_port()
519 traffic_manager = self.client.get_trafficmanager(port)
520 if self._launch_config.handle_ticks:
521 if self._launch_config.sync:
522 traffic_manager.set_synchronous_mode(True)
523 traffic_manager.set_hybrid_physics_mode(True) # Note default 50m
524 traffic_manager.set_hybrid_physics_radius(50.0) # TODO: make a LaunchConfig config variable
525 self.traffic_manager = traffic_manager
526 return traffic_manager
527
[docs]
528 def init_agent_and_interface(
529 self,
530 ego: Optional[carla.Vehicle],
531 agent_class: "type[LunaticAgent]",
532 config: Optional[LunaticAgentSettings] = None,
533 overwrites: Optional[dict[str, Any]] = None,
534 ) -> "tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl]":
535 """
536 Quick setup for the agent and the world model.
537
538 Among others this executes:
539 - :py:meth:`.LunaticAgent.create_world_and_agent`
540 - :py:meth:`.GameFramework.make_controller`
541 - :py:meth:`.WorldModel.tick_server_world`
542
543 .. code-block:: python
544
545 from agents.lunatic_agent import LunaticAgent, LunaticAgentSettings
546
547 ego = world.spawn_actor(world.get_blueprint_library().find("vehicle.audi.tt"))
548 agent, world_model, global_planner, controller = (
549 game_framework.init_agent_and_interface(ego, LunaticAgent)
550 )
551
552 Arguments:
553 ego: The ego vehicle. Can be :code:`None` if the agent is set to use an
554 external actor (:py:attr:`.LaunchConfig.externalActor`).
555 agent_class: The agent class to instantiate.
556 config: The configuration of the agent. If :code:`None` the
557 :py:attr:`.agent_class.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` are used.
558 overwrites: Additional overwrites to the configuration.
559 """
560 if ego is None and not self._launch_config.externalActor:
561 raise ValueError("`ego` must be passed if ``externalActor` is not set.")
562 self.agent, self.world_model, self.global_planner = agent_class.create_world_and_agent(
563 self._launch_config, vehicle=ego, sim_world=self.world, agent_config=config, overwrites=overwrites
564 )
565 self.config = self.agent.config
566 controller = self.make_controller(
567 self.world_model, RSSKeyboardControl, start_in_autopilot=False
568 ) # Note: stores weakref to controller
569 self.world_model.game_framework = weakref.proxy(self)
570 self.world_model.tick_server_world()
571 self.agent.verify_settings(
572 strictness=-1
573 ) # NOTE: Here live info is already available and will throw some errors
574 return self.agent, self.world_model, self.global_planner, controller
575
[docs]
576 def make_world_model(self, config: "LunaticAgentSettings", player: Optional[carla.Vehicle] = None) -> "WorldModel":
577 """
578 Creates a :py:class:`WorldModel` with a backreference to the GameFramework.
579 """
580 self.world_model = WorldModel(config, self._launch_config, player=player)
581 self.world_model.game_framework = weakref.proxy(self)
582 return self.world_model
583
[docs]
584 def make_controller(
585 self, world_model: "WorldModel", controller_class: type[ControllerClassT] = RSSKeyboardControl, **kwargs: Any
586 ) -> ControllerClassT:
587 """
588 Creates a keyboard controller and attaches it to the world model.
589
590 Args:
591 world_model: The world model to attach the controller to.
592 controller_class: The controller class to instantiate. Defaults to :py:class:`.RSSKeyboardControl`.
593 **kwargs: Additional arguments to pass to the controller.
594 """
595 controller = controller_class(
596 world_model,
597 # config=self.config,
598 clock=self.clock,
599 **kwargs,
600 )
601 self.controller = weakref.proxy(controller)
602 if self._launch_config.pygame is False:
603 controller.enable(False)
604 return controller # NOTE: does not return the proxy object.
605
[docs]
606 @staticmethod
607 def hydra_initialized() -> bool:
608 """
609 Checks wether Hydra_ is initialized. This is normally only done
610 when the :py:func:`@hydra.main` decorator is used.
611 """
612 return GlobalHydra.instance().is_initialized()
613
614 @overload
615 @staticmethod
616 def get_hydra_config(raw: Literal[False] = False) -> "HydraConf": ...
617
618 @overload
619 @staticmethod
620 def get_hydra_config(raw: Literal[True]) -> HydraConfig: ...
621
[docs]
622 @staticmethod
623 def get_hydra_config(raw: bool = False) -> "HydraConfig | HydraConf":
624 """
625 Retrieves the Hydra_ configuration object.
626
627 Parameters:
628 raw: If :python:`True`, returns the :py:class:`hydra.conf.HydraConf` dataclass, otherwise the
629 :py:class:`hydra.core.hydra_config.HydraConfig` singleton. Default is :code:`False`.
630
631 Raises:
632 ValueError: If the HydraConfig was not set up yet and :python:`raw=True`.
633 """
634 if raw:
635 return HydraConfig.instance()
636 return HydraConfig.get()
637
638 # ----- Setters -----
639
640 def set_controller(self, controller: KeyboardControl) -> None:
641 """
642 Set the :py:class:`KeyboardControl`
643
644 :meta private:
645 """
646 self.controller = controller # type: ignore # maybe use proxy here too
647
648 def set_config(self, config: DictConfig) -> None:
649 """
650 Change the :py:attr:`config`.
651
652 :meta private:
653 """
654 self.config = config
655
656 # ----- UI Functions -----
657
[docs]
658 def parse_controller_events(self, final_controls: Optional[carla.VehicleControl]):
659 """
660 Parses the keyboard events with the :py:attr:`controller`.
661
662 Raises:
663 AttributeError: If neither :py:attr:`control` nor :py:attr:`world_model.controller<world_model>` are set.
664 """
665 try:
666 controller = self.controller if self.controller or not self.world_model else self.world_model.controller
667 except ReferenceError as e:
668 raise AttributeError("No controller set.") from e
669 if controller is None:
670 raise AttributeError("No controller set.")
671 return controller.parse_events(final_controls)
672
[docs]
673 def render_everything(self):
674 """
675 Update render and hud
676
677 Note:
678 This is the preferred method to update the world and render the camera.
679 """
680 self.world_model.tick(
681 self.clock
682 ) # NOTE: Ticks WorldMODEL not CARLA WORLD! # pyright: ignore[reportOptionalMemberAccess, reportArgumentType]
683 if self.display:
684 self.world_model.render(self.display, finalize=False) # pyright: ignore[reportOptionalMemberAccess]
685 self.controller.render(self.display)
686 # These two types must be in sync:
687 dm_render_conf: "DetectionMatrix.RenderOptions" = self._launch_config.camera.hud.detection_matrix # type: ignore[assignment]
688 if dm_render_conf and self.agent:
689 self.agent.render_detection_matrix(self.display, **dm_render_conf)
690 self.world_model.finalize_render(self.display) # pyright: ignore[reportOptionalMemberAccess]
691
[docs]
692 @staticmethod
693 def skip_rest_of_loop(message: str = "GameFramework.end_loop") -> NoReturn:
694 """
695 Terminates the current iteration and exits the GameFramework by raising a :py:exc:`.ContinueLoopException`.
696
697 Note:
698 It is the users responsibility to manage the agent & local planner
699 before calling this function, i.e. that the agent has a :py:class:`carla.VehicleControl` set.
700
701 Raises:
702 ContinueLoopException: With the given **message**.
703 """
704 # TODO: add option that still allows for rss.
705 raise ContinueLoopException(message)
706
707 # -------- Tools --------
708
[docs]
709 @staticmethod
710 def destroy_actors(actors: Iterable[carla.Actor | CustomSensorInterface]):
711 """
712 Destroys the given actors and customs sensors implemented in this package.
713
714 Removes destroyed actors from the :py:class:`.CarlaDataProvider` actor pool.
715 """
716 batch: "list[carla.command.DestroyActor]" = []
717 for actor in actors:
718 if isinstance(actor, (carla.Sensor, CustomSensorInterface)):
719 actor.stop()
720 if isinstance(actor, carla.Actor):
721 if actor.is_alive:
722 batch.append(carla.command.DestroyActor(actor))
723 else:
724 actor.destroy()
725
726 if batch and CarlaDataProvider._client:
727 try:
728 CarlaDataProvider._client.apply_batch(batch)
729 except RuntimeError as e:
730 if "time-out" in str(e):
731 logger.warning("Timeout while destroying actors.")
732 else:
733 raise
734 else:
735 for command in batch:
736 if CarlaDataProvider.actor_id_exists(command.actor_id):
737 del CarlaDataProvider._carla_actor_pool[
738 command.actor_id
739 ] # remove by batch and not by individual command
740 elif not CarlaDataProvider._client:
741 logger.error("No client available to destroy actors.")
742
743 # -------- Context Manager --------
744
745 def __call__(self, agent: "LunaticAgent"):
746 """
747 Use as context manager to handle the game loop.
748 Pass an agent if None is set yet.
749 """
750 self.agent = agent
751 self.world_model = agent._world_model
752 try:
753 if self.world_model.controller: # weakref.proxy
754 self.controller = self.world_model.controller
755 except ReferenceError:
756 self.controller = None # type: ignore[assignment]
757 if not self.controller:
758 logger.debug("Creating new controller.")
759 self.controller = self.make_controller(
760 self.world_model, start_in_autopilot=self._launch_config.autopilot
761 ) # hard reference
762 self.world_model.controller = self.controller # hard instead of weak reference
763 self.agent._validate_phases = False
764 return self
765
766 def __enter__(self):
767 if self.agent is None:
768 raise ValueError("Agent not initialized.")
769 if self.world_model is None:
770 raise ValueError("World Model not initialized.")
771 if self.controller is None:
772 raise ValueError("Controller not initialized.")
773
774 self.clock.tick() # self.args.fps) # pyright: ignore[reportOptionalMemberAccess]
775 frame = None
776 if self._launch_config.handle_ticks: # i.e. no scenario runner doing it for us
777 if CarlaDataProvider.is_sync_mode():
778 frame = self.world_model.world.tick(self._launch_config.timeout)
779 else:
780 frame = self.world_model.world.wait_for_tick(self._launch_config.timeout).frame
781 CarlaDataProvider.on_carla_tick()
782
783 if CarlaDataProvider.is_sync_mode():
784 # We do this only in sync mode as frames could pass between gathering this information
785 # and an agent calling InformationManager.tick(), which in turn calls global_tick
786 # with possibly a DIFFERENT frame wasting computation.
787 if frame is None:
788 frame = self.get_world().get_snapshot().frame
789 InformationManager.global_tick(frame)
790 return self
791
792 def __exit__(self, exc_type, exc_val, exc_tb):
793 self.cooldown_framework.__exit__(exc_type, exc_val, exc_tb)
794 if exc_val:
795 if isinstance(exc_val, AgentDoneException):
796 self.continue_loop = False
797 elif isinstance(exc_val, ContinueLoopException):
798 logger.error(
799 "ContinueLoopException(%s) should be thrown during `agent.run_step` but caught by GameFramework this should not happen. Skipping this step; no controls are applied!",
800 exc_val,
801 )
802 else:
803 return # skip render and likely terminate.
804 self.render_everything()
805 if self._launch_config.pygame:
806 pygame.display.flip()
807
[docs]
808 @class_or_instance_method
809 def cleanup(cls_or_self: "type[Self] | Self", *, disable_sync: bool = True, quit_pygame: bool = True):
810 """
811 Cleans up resources and actors.
812
813 Args:
814 disable_sync: If True, will disable synchronous mode. This will
815 prevent the freezing of the Unreal Editor.
816 Default is True.
817 quit_pygame: If True, will call :py:func:`pygame.quit`. Default is True.
818
819 Note:
820 - When called from an instance with an attached agent,
821 the :python:`agent.destroy()` method is called.
822 - Otherwise will call :py:obj:`CarlaDataProvider.cleanup() <.CarlaDataProvider>`.
823 """
824 try:
825 # Should only work for instance version, but maybe future Singleton support
826 try:
827 if cls_or_self.agent: # pyright: ignore[reportAttributeAccessIssue]
828 cls_or_self.agent.destroy() # pyright: ignore[reportAttributeAccessIssue]
829 finally:
830 if cls_or_self.world_model: # pyright: ignore[reportAttributeAccessIssue]
831 cls_or_self.world_model.destroy() # pyright: ignore[reportAttributeAccessIssue]
832 if disable_sync and cls_or_self.traffic_manager:
833 with contextlib.suppress(Exception):
834 cls_or_self.traffic_manager.set_synchronous_mode(False)
835 finally:
836 # Prevent freezing of the editor
837 if disable_sync and CarlaDataProvider.get_world() is not None:
838 # Disable Synchronous Mode
839 world_settings = carla.WorldSettings(synchronous_mode=False, fixed_delta_seconds=0.0)
840 cls_or_self._world.apply_settings(world_settings)
841 CarlaDataProvider.cleanup()
842 if quit_pygame:
843 pygame.quit()
844
845 # Include access to our exceptions here
846 exceptions = _exceptions
847 """
848 shortcut to :py:mod:`.exceptions` module containing custom exceptions.
849
850 :meta hide-value:
851 """
852
853
854# ==============================================================================
855# -- World ---------------------------------------------------------------
856# ==============================================================================
857
858
[docs]
859class WorldModel(AccessCarlaMixin, CarlaDataProvider):
860 """
861 Class representing the surrounding environment.
862
863 This class is the interface between the agent, the :external_py_class:`carla.World`, the
864 :py:class:`.HUD`, and the :py:class:`.KeyboardControl`.
865 It handles ticking of the simulator and rendering of the pygame interface.
866
867 If :py:attr:`.LaunchConfig.externalActor` is set, it will look for an actor with the role name
868 :py:attr:`.LaunchConfig.rolename`, if such an actor does not yet exist it will wait for its
869 creation until the calling script continues.
870 """
871
872 controller: Optional[Union[RSSKeyboardControl, weakref.ProxyType[RSSKeyboardControl]]] = None
873 """
874 Set when controller is created. Uses :py:func`weakref.proxy` as backreference.
875 This is not a :py:mod:`weakref` object, when
876 `with gameframework(agent) <GameFramework.__call__>`:py:meth: is used.
877 """
878
879 game_framework: Optional["weakref.CallableProxyType[GameFramework]"] = None
880 """
881 Set when the WorldModel is created via the :py:class:`GameFramework`.
882 Uses :py:func`weakref.proxy` as backreference.
883
884 Attention:
885 Currently not used only only set when using:
886 - py:meth:`GameFramework.init_agent_and_interface`
887 - py:meth:`GameFramework.make_world_model`
888
889 :meta private:
890 """
891
892 player: carla.Vehicle
893 """
894 The linked actor. If :py:attr:`.external_actor` is set this will be the first actor found
895 with that role name.
896 """
897
[docs]
898 def __init__(
899 self,
900 config: "LunaticAgentSettings",
901 args: Union[LaunchConfig, Mapping[str, Any], "os.PathLike[str]", str] = "./conf/launch_config.yaml",
902 agent: Optional[LunaticAgent] = None,
903 *,
904 carla_world: Optional[carla.World] = None,
905 player: Optional[carla.Vehicle] = None,
906 map_inst: Optional[carla.Map] = None,
907 ):
908 """Constructor method"""
909 # Set World
910 if self.get_world() is None:
911 if carla_world is None:
912 raise ValueError("CarlaDataProvider not available and `carla_world` not passed.")
913 self.world = carla_world
914 elif carla_world is not None and self.world != carla_world:
915 raise ValueError("CarlaDataProvider.get_world() and passed `carla_world` are not the same.")
916
917 self.world_settings: carla.WorldSettings = self.world.get_settings()
918 """Object containing data about the simulation such as synchrony or rendering mode."""
919
920 if agent:
921 agent._world_model = self # backreference, if needed; the LunaticAgent sets this as well.
922
923 if self.map is not None: # if this is set accesses CarlaDataProvider
924 if map_inst and self.map != map_inst:
925 raise ValueError("CarlaDataProvider.get_map() and passed map_inst are not the same.")
926 elif map_inst:
927 if isinstance(map_inst, carla.Map):
928 self.map = map_inst
929 else:
930 logger.warning("Warning: Ignoring the given map as it is not a 'carla.Map'")
931 if self.map is None:
932 try:
933 self.set_world(self.world) # CDP function
934 except RuntimeError as error:
935 print(f"RuntimeError: {error}")
936 print(" The server could not send the OpenDRIVE (.xodr) file:")
937 print(" Make sure it exists, has the same name of your town, and is correct.")
938 sys.exit(1)
939
940 self._config = config
941 if not isinstance(args, (Mapping, DictConfig, LaunchConfig)): # TODO: should rather check for string like
942 # Args is expected to be a string here
943 # NOTE: This does NOT INCLUDE CLI OVERWRITES
944 # When passed as path with directory
945 config_dir, config_name = os.path.split(args)
946 try:
947 args = GameFramework.initialize_hydra(config_dir, config_name)
948 except ValueError:
949 # Hydra already initialized
950 args = GameFramework.load_hydra_config(config_name)
951 except Exception:
952 print("Problem with", type(args), args)
953 raise
954 args.externalActor = not (player is not None or agent is not None) # TEMP: Remove to force clean config.
955 self._args: LaunchConfig = typing.cast("LaunchConfig", args)
956
957 self.hud: HUD
958 """The :py:class:`HUD` that is managed."""
959
960 self.world_tick_id: Optional[int]
961 """The ID of the callback tick event for the :py:class:`HUD`."""
962
963 if self._args.pygame:
964 self.hud = HUD(self._args.width, self._args.height, self.world)
965 self.world_tick_id = self.world.on_tick(self.hud.on_world_tick)
966 else:
967 self.hud = MockDummy.create_dummy(HUD)
968 self.world_tick_id = None
969
970 self.sync: Optional[bool] = self._args.sync
971 """Set from :py:attr:`.LaunchConfig.sync`"""
972
973 self.dim = (self._args.width, self._args.height)
974
975 self.external_actor: bool = self._args.externalActor
976 """Set from :py:attr:`.LaunchConfig.externalActor`"""
977
978 self.actor_role_name: Optional[str] = self._args.rolename
979 """Set from :py:attr:`.LaunchConfig.rolename`"""
980
981 self._actor_filter = self._args.filter
982 self._actor_generation: Literal[1, 2, "all"] = self._args.generation
983 self._gamma = self._args.camera.gamma
984
985 # TODO: Unify with CameraManager
986 self.recording = False
987 self._has_recorded = False
988 self._recording_dirs = []
989 self.recording_frame_num = 0
990 self.recording_dir_num = 0
991
992 # From manual controls; used client.start_recorder()
993 #
994 self.recording_enabled: bool = False
995 """
996 Indicator if :py:attr:`carla.Client` recording feature is on or off.
997
998 Experimental & Untested!
999 CTRL + R : toggle recording of simulation (replacing any previous)
1000 CTRL + P : start replaying last recorded simulation
1001
1002 :meta private:
1003 """
1004
1005 self.recording_start = 0
1006 """
1007 CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds)
1008 CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds)
1009
1010 :meta private:
1011 """
1012
1013 if self.external_actor and (player is not None or agent is not None):
1014 raise ValueError("External actor cannot be used with player or agent.")
1015 if player is None and agent is not None:
1016 self.player = agent._vehicle
1017 elif player is not None and agent is not None:
1018 if player != agent._vehicle:
1019 raise ValueError("Passed `player` and `agent._vehicle` are not the same.")
1020 self.player = player
1021 else:
1022 # If player is None here, will set in in restart()
1023 self.player = player # pyright: ignore[reportAttributeAccessIssue]
1024
1025 assert self.player is not None or self.external_actor # Note: Former optional. Player set in restart
1026
1027 self.collision_sensor: CollisionSensor = None # type: ignore # set in restart
1028 self.lane_invasion_sensor: Optional[LaneInvasionSensor] = None # set in restart
1029 self.gnss_sensor: Optional[GnssSensor] = None
1030 self.imu_sensor: Optional[IMUSensor] = None # from interactive
1031 self.radar_sensor: Optional[RadarSensor] = None # from interactive
1032 self.camera_manager: CameraManager = None # type: ignore # set in restart
1033 """
1034 Manages cameras for the user interface and :py:class:`.HUD`.
1035 """
1036
1037 self._weather_presets = CarlaDataProvider.find_weather_presets()
1038 self._weather_index = 0
1039 self.weather: str = "NotSet"
1040 """
1041 Name of currently used weather preset.
1042 See also: :py:class:`CarlaDataProvider.find_weather_presets()<CarlaDataProvider>`
1043
1044 :meta hide-value:
1045 """
1046
1047 self.actors: List[Union[carla.Actor, CustomSensorInterface]] = []
1048 """Actors attached to this instance for the user interface and :py:class:`.HUD`."""
1049
1050 # From interactive:
1051 self.constant_velocity_enabled = NotImplemented #: :meta private:
1052 self.show_vehicle_telemetry = False #: :meta private: # enabled via KeyboardController
1053 self.doors_are_open: bool = False
1054 """
1055 Note:
1056 Only set over the KeyboardController, does not query the actor or simulation
1057
1058 :meta private:
1059 """
1060
1061 self.current_map_layer = 0
1062 self.map_layer_names = [
1063 carla.MapLayer.NONE,
1064 carla.MapLayer.Buildings,
1065 carla.MapLayer.Decals,
1066 carla.MapLayer.Foliage,
1067 carla.MapLayer.Ground,
1068 carla.MapLayer.ParkedVehicles,
1069 carla.MapLayer.Particles,
1070 carla.MapLayer.Props,
1071 carla.MapLayer.StreetLights,
1072 carla.MapLayer.Walls,
1073 carla.MapLayer.All,
1074 ]
1075 # RSS
1076 # set in restart
1077 self.rss_sensor: Optional[RssSensor] = None
1078 self.rss_unstructured_scene_visualizer: RssUnstructuredSceneVisualizer = None # type: ignore[assignment]
1079 self.rss_bounding_box_visualizer: RssBoundingBoxVisualizer = None # type: ignore[assignment]
1080
1081 if config.rss:
1082 if config.rss.enabled and not self._actor_filter.startswith("vehicle."):
1083 print("Error: RSS only supports vehicles as ego. Disable RSS or use a vehicle filter for the actor.")
1084 sys.exit(1)
1085 if AD_RSS_AVAILABLE and config.rss.enabled:
1086 self._restrictor = carla.RssRestrictor()
1087 else:
1088 self._restrictor = None
1089 else:
1090 self._restrictor = None
1091
1092 logger.info("Calling WorldModel.restart()")
1093 self._first_start = True
1094 """Indicate that restart was called for the first time."""
1095 self.restart()
1096 assert self.player is not None
1097 self._vehicle_physics = self.player.get_physics_control()
1098
1099 # Both of these cases should not happen; call_prepare map again.
1100 if CarlaDataProvider._traffic_light_map is None:
1101 logger.error("Traffic light map not set at this point on the ")
1102 CarlaDataProvider.set_world(self._world)
1103 elif not CarlaDataProvider._traffic_light_map:
1104 logger.warning("Traffic light map is empty. Are there no traffic lights in the map?. Checking again.")
1105 CarlaDataProvider.prepare_map()
1106
1139
1140 def toggle_pause(self):
1141 """
1142 Toggle pause_simulation from the KeyboardControls.
1143
1144 :meta private:
1145 """
1146 settings = self.world.get_settings()
1147 self.pause_simulation(not settings.synchronous_mode)
1148
[docs]
1149 def pause_simulation(self, pause: bool):
1150 """
1151 Pauses the simulation by setting the world to synchronous mode.
1152
1153 Attention:
1154 Only works reliable in **asynchronous mode** (:py:attr:`sync=False <.LaunchConfig.sync>`)
1155 and might lead to unexpected behavior in synchronous mode.
1156 """
1157 if self._args.sync:
1158 logger.warning("Pause simulation only works in asynchronous mode.")
1159
1160 settings = self.world.get_settings()
1161 if pause and not settings.synchronous_mode:
1162 settings.synchronous_mode = True
1163 settings.fixed_delta_seconds = 0.05
1164 self.world.apply_settings(settings)
1165 elif not pause and settings.synchronous_mode:
1166 settings.synchronous_mode = False
1167 settings.fixed_delta_seconds = None
1168 self.world.apply_settings(settings)
1169
1170 @staticmethod
1171 def _find_external_actor(
1172 world: carla.World, role_name: str, actor_list: Optional[carla.ActorList] = None
1173 ) -> Optional[carla.Actor]:
1174 """
1175 Looks to find an actor with a matching **role_name**.
1176 """
1177 player = None
1178 for actor in actor_list or world.get_actors():
1179 if actor.attributes.get("role_name") == role_name:
1180 if player is not None:
1181 logger.error(
1182 "Multiple actors with role_name `%s` found. id: %s. Returning the first one found.",
1183 role_name,
1184 actor.id,
1185 )
1186 else:
1187 player = actor
1188 return player
1189
1190 def _wait_for_external_actor(self, timeout: float = 20, sleep: float = 3) -> carla.Actor:
1191 """
1192 Does not resume the script until an external actor with the role name is found.
1193
1194 Raises:
1195 AssertionError: If :py:attr:`actor_role_name` is not set.
1196 SystemExit: If the actor is not found within the time period.
1197 """
1198 assert self.actor_role_name
1199 self.tick_server_world() # Tick the world?
1200 start = time.time()
1201 t = start
1202 while t < start + timeout:
1203 player = self._find_external_actor(self.world, self.actor_role_name)
1204 if player is not None:
1205 return player
1206 logger.info("...External actor not found. Waiting to find external actor named `%s`", self.actor_role_name)
1207 time.sleep(sleep) # Note if on same thread, nothing will happen. Put function into thread?
1208 self.tick_server_world() # Tick the world?
1209 t = time.time()
1210 logger.error(f"External actor `{self.actor_role_name}` not found. Exiting...")
1211 print(f"External actor `{self.actor_role_name}` not found. Exiting...")
1212 sys.exit(1)
1213
[docs]
1214 def restart(self):
1215 """
1216 Restart the world and sets up the :py:class:`.HUD` sensors.
1217 If :py:attr:`player` is not set or :py:attr:`external_actor` is set,
1218 looks for an actor with the role name, or spawns a new actor.
1219
1220 Note:
1221 Called during :py:meth:`__init__`.
1222 """
1223 # Keep same camera config if the camera manager exists.
1224 cam_index = typing.cast("int", self.camera_manager.index if self.camera_manager is not None else 0)
1225 cam_pos_id = self.camera_manager.transform_index if self.camera_manager is not None else 0
1226 if self.external_actor:
1227 # Check whether there is already an actor with defined role name
1228 if not self.actor_role_name:
1229 raise ValueError("When using external actor, rolename must be set.")
1230 actor_list = self.world.get_actors() # In sync mode the actor list could be empty
1231 external_actor = self._find_external_actor(self.world, self.actor_role_name, actor_list)
1232 if self.player is None:
1233 if external_actor:
1234 self.player = typing.cast("carla.Vehicle", external_actor)
1235 else:
1236 self.player = typing.cast(
1237 "carla.Vehicle",
1238 self._wait_for_external_actor(timeout=self._args.timeout + 10),
1239 )
1240 elif (
1241 external_actor and self.player.id != external_actor.id
1242 ): # NOTE: even with same id different instances and hashes.
1243 logger.warning(
1244 "External actor found with role_name `%s` but different id. "
1245 "Keeping the current actor (%s) and ignoring the external actor (%s)",
1246 self.actor_role_name,
1247 self.player.id,
1248 external_actor.id,
1249 )
1250 if TYPE_CHECKING:
1251 self.player = typing.cast("carla.Vehicle", self.player)
1252
1253 else:
1254 # Get a random blueprint.
1255 if self.player is None or self.camera_manager is not None:
1256 # First pass without a player or second pass -> new player
1257 blueprint: carla.ActorBlueprint = typing.cast(
1258 "carla.ActorBlueprint",
1259 random.choice(get_actor_blueprints(self._actor_filter, self._actor_generation)),
1260 )
1261 blueprint.set_attribute("role_name", self.actor_role_name) # type: ignore[arg-type]
1262 if blueprint.has_attribute("color"):
1263 color = random.choice(blueprint.get_attribute("color").recommended_values)
1264 blueprint.set_attribute("color", color)
1265 # From Interactive:
1266 if blueprint.has_attribute("terramechanics"): # For Tire mechanics/Physics? # Todo is that needed?
1267 blueprint.set_attribute("terramechanics", "false")
1268 if blueprint.has_attribute("driver_id"):
1269 driver_id = random.choice(blueprint.get_attribute("driver_id").recommended_values)
1270 blueprint.set_attribute("driver_id", driver_id)
1271 if blueprint.has_attribute("is_invincible"):
1272 blueprint.set_attribute("is_invincible", "true")
1273
1274 # TODO: Make this a config option to choose automatically.
1275 # set the max speed
1276 # if blueprint.has_attribute('speed'):
1277 # self.player_max_speed = float(blueprint.get_attribute('speed').recommended_values[1])
1278 # self.player_max_speed_fast = float(blueprint.get_attribute('speed').recommended_values[2])
1279
1280 # Spawn the player.
1281 if self.player is not None:
1282 spawn_point = self.player.get_transform()
1283 spawn_point.location.z += 2.0
1284 spawn_point.rotation.roll = 0.0
1285 spawn_point.rotation.pitch = 0.0
1286 if self.camera_manager is not None: # None at first start; not None if player was already set before
1287 self.destroy()
1288 self.player = typing.cast("carla.Vehicle", self.world.try_spawn_actor(blueprint, spawn_point)) # pyright: ignore[reportPossiblyUnboundVariable]
1289 self.modify_vehicle_physics(self.player)
1290 while self.player is None:
1291 if not self.map.get_spawn_points():
1292 print("There are no spawn points available in your map/town.")
1293 print("Please add some Vehicle Spawn Point to your UE4 scene.")
1294 sys.exit(1)
1295 spawn_points = self.map.get_spawn_points()
1296 spawn_point: carla.Transform = random.choice(spawn_points) if spawn_points else carla.Transform() # type: ignore
1297 self.player = typing.cast("carla.Vehicle", self.world.try_spawn_actor(blueprint, spawn_point)) # pyright: ignore[reportPossiblyUnboundVariable]
1298 # From Interactive:
1299 # See: https://carla.readthedocs.io/en/latest/tuto_G_control_vehicle_physics/
1300 self.show_vehicle_telemetry = False
1301 self.modify_vehicle_physics(self.player)
1302
1303 # Clean external actors restarting a second time
1304 if self.external_actor and (
1305 # None cleans only first time; False will never clean.
1306 (self._args.restart_clean_sensors is not False and not self._first_start)
1307 or self._args.restart_clean_sensors is True
1308 ):
1309 ego_sensors = [actor for actor in self.world.get_actors() if actor.parent == self.player]
1310
1311 # Remove all old sensors
1312 for ego_sensor in ego_sensors:
1313 if ego_sensor is not None:
1314 ego_sensor.destroy()
1315
1316 # Set up the sensors.
1317 self.collision_sensor = CollisionSensor(self.player, self.hud)
1318 if not HUD.is_dummy(self.hud):
1319 self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud)
1320 self.actors.append(self.lane_invasion_sensor)
1321 else:
1322 self.lane_invasion_sensor = None
1323 self.gnss_sensor = None # GnssSensor(self.player) # TODO: make it optional
1324 self.imu_sensor = None # IMUSensor(self.player)
1325 self.actors.extend([
1326 self.collision_sensor,
1327 ])
1328 if self.gnss_sensor:
1329 self.actors.append(self.gnss_sensor)
1330 if self.imu_sensor:
1331 self.actors.append(self.imu_sensor)
1332
1333 logger.log(0, "Setting up camera manager")
1334 if self._args.pygame:
1335 assert isinstance(self.hud, HUD)
1336 self.camera_manager = CameraManager(self.player, self.hud, self._args)
1337 self.camera_manager.transform_index = cam_pos_id
1338 self.camera_manager.set_sensor(cam_index, notify=False)
1339 assert self.camera_manager.sensor
1340 logger.log(0, "Camera Manager set up")
1341
1342 actor_type = get_actor_display_name(self.player)
1343 self.hud.notification(text=actor_type)
1344
1345 self.rss_unstructured_scene_visualizer = RssUnstructuredSceneVisualizer(
1346 self.player, self.world, self.dim, gamma_correction=self._gamma
1347 ) # TODO: use args instead of gamma
1348 self.rss_bounding_box_visualizer = RssBoundingBoxVisualizer(
1349 self.dim, self.world, self.camera_manager.sensor
1350 )
1351 rss_state_visualizer = self.hud.rss_state_visualizer
1352 else:
1353 if self._args.camera.spectator:
1354 # only use spectator
1355 CameraManager.follow_actor(self.player)
1356 self.camera_manager = MockDummy.create_dummy(CameraManager) # NOTE: Does not end thread
1357 self.rss_bounding_box_visualizer = MockDummy.create_dummy(RssBoundingBoxVisualizer)
1358 self.rss_unstructured_scene_visualizer = MockDummy.create_dummy(RssUnstructuredSceneVisualizer)
1359 rss_state_visualizer: "RssStateVisualizer" = MockDummy.create_dummy()
1360 if AD_RSS_AVAILABLE and self._config.rss and self._config.rss.enabled:
1361 log_level = self._config.rss.log_level
1362 # Assure correct log level type
1363 if not isinstance(log_level, carla.RssLogLevel):
1364 try:
1365 if isinstance(log_level, str):
1366 log_level = carla.RssLogLevel.names[log_level]
1367 else:
1368 log_level = carla.RssLogLevel(log_level)
1369 except Exception as e:
1370 msg = f"Could not convert '{log_level}' to RssLogLevel must be in {list(carla.RssLogLevel.names.keys())}"
1371 raise KeyError(msg) from e
1372 logger.debug("Carla Log level was not a RssLogLevel")
1373 self.rss_sensor = RssSensor(
1374 self.player,
1375 self.rss_unstructured_scene_visualizer,
1376 self.rss_bounding_box_visualizer,
1377 rss_state_visualizer,
1378 visualizer_mode=self._config.rss.debug_visualization_mode,
1379 log_level=log_level,
1380 )
1381 self.rss_set_road_boundaries_mode(self._config.rss.use_stay_on_road_feature)
1382 else:
1383 self.rss_sensor = None
1384 self._first_start = False
1385 self.tick_server_world()
1386
[docs]
1387 def tick_server_world(self) -> "int | carla.WorldSnapshot | None":
1388 """
1389 When :py:attr:`.LaunchConfig.handle_ticks` is :python:`True`
1390 uses :external_py_meth:`carla.World.tick` or :external_py_meth:`carla.World.wait_for_tick`
1391 depending on :py:attr:`.LaunchConfig.sync`.
1392 """
1393 if self._args.handle_ticks:
1394 if self.sync:
1395 # if timeout is used it it might interfere with _wait_for_external_actor
1396 return self.world.tick(self._args.timeout)
1397 return self.world.wait_for_tick(self._args.timeout)
1398 return None
1399
1400 # def tick(self, clock):
1401 # self.hud.tick(self.player, clock) # RSS example. TODO: Check which has to be used!
1402
[docs]
1403 def tick(self, clock: "pygame.time.Clock"):
1404 """Method for every tick"""
1405 self.hud.tick(self, clock, InformationManager.obstacles)
1406
[docs]
1407 def next_weather(self, reverse: bool = False) -> None:
1408 """Get next weather setting"""
1409 self._weather_index += -1 if reverse else 1
1410 self._weather_index %= len(self._weather_presets)
1411 preset = self._weather_presets[self._weather_index]
1412 self.hud.notification(f"Weather: {preset[1]}")
1413 self.player.get_world().set_weather(preset[0])
1414 self.weather = preset[1]
1415
[docs]
1416 def next_map_layer(self, reverse: bool = False) -> None:
1417 self.current_map_layer += -1 if reverse else 1
1418 self.current_map_layer %= len(self.map_layer_names)
1419 selected = self.map_layer_names[self.current_map_layer]
1420 self.hud.notification(f"LayerMap selected: {selected}")
1421
[docs]
1422 def load_map_layer(self, unload: bool = False):
1423 selected = self.map_layer_names[self.current_map_layer]
1424 if unload:
1425 self.hud.notification(f"Unloading map layer: {selected}")
1426 self.world.unload_map_layer(selected)
1427 else:
1428 self.hud.notification(f"Loading map layer: {selected}")
1429 self.world.load_map_layer(selected)
1430
[docs]
1431 def toggle_recording(self):
1432 """
1433 Start recording images from the camera output.
1434
1435 Saved in
1436 :py:attr:`LaunchConfig.camera.recorder.output_path <.CameraConfig.RecorderSettings.output_path>`
1437 with the current frame number.
1438 """
1439 if not self.recording:
1440 self._has_recorded = True
1441 dir_name, filename = os.path.split(self._args.camera.recorder.output_path)
1442 try:
1443 dir_name_formatted = dir_name % self.recording_dir_num
1444 except TypeError:
1445 dir_name += "%04d"
1446 dir_name_formatted = dir_name % self.recording_dir_num
1447 while os.path.exists(dir_name_formatted): # noqa: PTH110
1448 self.recording_dir_num += 1
1449 dir_name_formatted = dir_name % self.recording_dir_num
1450 self.recording_file_format = os.path.join(dir_name, filename) # keep unformatted. # noqa: PTH118
1451 self.recording_frame_num = 0
1452 Path(dir_name_formatted).mkdir(parents=True, exist_ok=True)
1453 self.hud.notification(f"Started recording (folder: {dir_name_formatted})")
1454 self._recording_dirs.append(dir_name_formatted)
1455 else:
1456 dir_name_formatted = os.path.split(self.recording_file_format)[0] % self.recording_dir_num
1457 self.hud.notification(f"Recording finished (folder: {dir_name_formatted})")
1458
1459 self.recording = not self.recording
1460
[docs]
1461 def toggle_radar(self):
1462 """Adds or destroys a radar sensor for the user interface"""
1463 if self.radar_sensor is None:
1464 self.radar_sensor = RadarSensor(self.player)
1465 elif self.radar_sensor.sensor is not None:
1466 self.radar_sensor.sensor.destroy()
1467 self.radar_sensor = None
1468
[docs]
1469 def modify_vehicle_physics(self, actor: carla.Vehicle):
1470 # If actor is not a vehicle, we cannot use the physics control
1471 try:
1472 physics_control = actor.get_physics_control()
1473 physics_control.use_sweep_wheel_collision = True
1474 actor.apply_physics_control(physics_control)
1475 except Exception:
1476 logger.warning("Could not modify vehicle physics for actor: %s", actor)
1477
[docs]
1478 def finalize_render(self, display: pygame.Surface):
1479 """
1480 Draws the HUD and saves the image if recording is enabled.
1481
1482 Attention:
1483 Assumes that :py:meth:`render(..., finalize=False)<render>` was called before and
1484 something should be rendered in between.
1485 Use :py:meth:`finalize_render` as the very last step in the render process.
1486 """
1487 self.hud.render(display)
1488 if self.recording:
1489 try:
1490 pygame.image.save(
1491 display, self.recording_file_format % (self.recording_dir_num, self.recording_frame_num)
1492 )
1493 except Exception as e:
1494 logger.error(
1495 "Could not save image format: `%s` % (self.recording_dir_num, self.recording_frame_num): %s",
1496 self.recording_file_format,
1497 e,
1498 )
1499 self.recording_frame_num += 1
1500
[docs]
1501 def render(self, display: pygame.Surface, finalize: bool = True):
1502 """
1503 Render the world and draw it to the :py:class:`pygame.Surface`.
1504 This function is part of :py:meth:`.GameFramework.render_everything` **which is the
1505 recommended way to handle the rendering process.**
1506
1507 Hint:
1508 Recording of the fully rendered output should be done at the end of the render method,
1509 however :py:meth:`.CameraManager.render` is called first to render the camera.
1510
1511 Call with **finalize=False** to only render the camera but not the :py:class:`.HUD`.
1512
1513 Afterwards apply other render features and call :py:meth:`finalize_render`
1514 to draw the HUD and save the image if recording is enabled.
1515
1516 Note:
1517 This renders the :py:class:`.CameraManger` and the RSS bounding boxes.
1518 If **finalize** is :python:`True`, it will also render the :py:class:`HUD` by calling
1519 :py:meth:`finalize_render`.
1520 """
1521 self.camera_manager.render(display)
1522 self.rss_bounding_box_visualizer.render(display, self.camera_manager.current_frame)
1523 self.rss_unstructured_scene_visualizer.render(display)
1524 if finalize:
1525 self.finalize_render(display)
1526
[docs]
1527 def destroy_sensors(self):
1528 """Destroy sensors"""
1529 if self.rss_sensor:
1530 self.rss_sensor.destroy()
1531 self.rss_sensor = None
1532 if self.rss_unstructured_scene_visualizer:
1533 self.rss_unstructured_scene_visualizer.destroy()
1534 self.rss_unstructured_scene_visualizer = None # type: ignore[assignment]
1535 if self.camera_manager is not None:
1536 self.camera_manager.destroy()
1537 self.camera_manager = None # type: ignore[assignment]
1538 if self.radar_sensor is not None:
1539 self.toggle_radar() # destroys it if not None
1540
[docs]
1541 def destroy(self, destroy_ego: bool = False):
1542 """
1543 Destroys all actors
1544
1545 Parameters:
1546 destroy_ego: If True, will destroy the :py:attr:`player` as well. Else assume that it
1547 is destroyed by someone else.
1548 """
1549 # stop from ticking
1550 if self.world_tick_id and self.world:
1551 self.world.remove_on_tick(self.world_tick_id)
1552 self.destroy_sensors()
1553 if destroy_ego and self.player not in self.actors: # do not destroy external actors.
1554 logger.debug("Adding player to destruction list.")
1555 self.actors.append(self.player)
1556 elif not destroy_ego and self.player in self.actors:
1557 logger.warning(
1558 "destroy_ego=False, but player is in actors list. Destroying the actor from within WorldModel.destroy."
1559 )
1560
1561 # logger.info("to destroy %s", list(map(str, self.actors)))
1562 # Batch destroy in one simulation step
1563 real_actors: Sequence[carla.Actor] = [actor for actor in self.actors if isinstance(actor, carla.Actor)]
1564 GameFramework.destroy_actors(real_actors)
1565
1566 # e.g. CustomSensorInterface
1567 other_actors = [actor for actor in self.actors if actor and not isinstance(actor, carla.Actor)]
1568 while other_actors:
1569 actor = other_actors.pop(0)
1570 if actor is not None:
1571 print("destroying actor: " + str(actor), end=" destroyed=")
1572 try:
1573 actor.stop()
1574 except AttributeError as e:
1575 logger.debug("Error with actor {}: {}", actor, e)
1576 try:
1577 x = actor.destroy() # Non carla instances
1578 print(x)
1579 except RuntimeError:
1580 logger.warning("Could not destroy actor: " + str(actor))
1581 self.actors.clear()
1582 if self._args.handle_ticks and self.get_world():
1583 self.get_world().tick()
1584
1585 if self._has_recorded:
1586 runtime_dir = GameFramework.get_hydra_config().runtime.output_dir
1587 for dir_name_formatted in self._recording_dirs:
1588 dirname = os.path.split(dir_name_formatted)[1]
1589 subprocess.run([
1590 "ffmpeg",
1591 "-an",
1592 "-sn",
1593 "-i",
1594 f"{dir_name_formatted}/%08d.bmp",
1595 "-framerate",
1596 "1",
1597 "-vcodec",
1598 "mpeg4",
1599 "-r",
1600 "60",
1601 f"{Path(runtime_dir) / dirname}.avi",
1602 ]) # noqa: E501
1603 # os.system(f'ffmpeg -an -sn -i "{dir_name_formatted}/%08d.bmp" -framerate 1 -vcodec mpeg4 -r 60 "{os.path.join(runtime_dir, dirname)}.avi"') # noqa: E501
1604 print(f"Recording saved in {Path(runtime_dir) / dirname}.avi")
1605