1"""
2Interface classes between CARLA, the agent, and the user interface.
3"""
4# pyright: reportOptionalMemberAccess=warning
5
6from __future__ import annotations
7
8import contextlib
9import os
10from pathlib import Path
11import subprocess
12import sys
13import time
14import weakref
15
16from collections.abc import Mapping
17from typing import TYPE_CHECKING, Any, ClassVar, Iterable, List, NoReturn, Optional, Sequence, Union, overload
18from typing import cast as assure_type
19
20import carla
21import hydra
22from numpy import random as nprandom
23import pygame
24import random
25from hydra.core.global_hydra import GlobalHydra
26from hydra.core.hydra_config import HydraConfig
27from hydra.core.utils import configure_log
28from omegaconf import DictConfig, OmegaConf, open_dict
29
30from agents.tools.config_creation import LaunchConfig, RssLogLevel, RssRoadBoundariesMode, config_store
31from classes import exceptions as _exceptions
32from classes.camera_manager import CameraManager
33from classes._sensor_interface import CustomSensorInterface
34from classes.carla_originals.sensors import CollisionSensor, GnssSensor, IMUSensor, LaneInvasionSensor, RadarSensor
35from classes.exceptions import AgentDoneException, ContinueLoopException
36from classes.hud import HUD, get_actor_display_name
37from classes.keyboard_controls import KeyboardControl, RSSKeyboardControl
38from classes.rss_sensor import AD_RSS_AVAILABLE, RssSensor
39from classes.rss_visualization import RssBoundingBoxVisualizer, RssUnstructuredSceneVisualizer
40from classes.information_manager import InformationManager
41from launch_tools import class_or_instance_method
42
43if TYPE_CHECKING:
44 from hydra.conf import HydraConf
45 from typing_extensions import Self
46
47 from agents.lunatic_agent import LunaticAgent
48 from agents.navigation.global_route_planner import GlobalRoutePlanner
49 from agents.tools.config_creation import LunaticAgentSettings, RssRoadBoundariesModeAlias
50 from classes.type_protocols import ControllerClassT
51 from classes.detection_matrix import DetectionMatrix
52
53from agents.tools.logs import logger
54from launch_tools import CarlaDataProvider, Literal
55from launch_tools.blueprint_helpers import get_actor_blueprints
56
57
[docs]
58class AccessCarlaMixin:
59 """
60 Mixin class that delegates the attributes :py:attr:`client`, :py:attr:`map`, and :py:attr:`world`
61 to the :py:class:`.CarlaDataProvider` to keep them in sync.
62
63 Note:
64 This mixin only works for instances, they are not class attributes.
65 """
66
67 @property
68 def client(self) -> carla.Client:
69 return CarlaDataProvider.get_client()
70
71 @client.setter
72 def client(self, value: carla.Client):
73 CarlaDataProvider.set_client(value)
74
75 @property
76 def world(self) -> carla.World:
77 return CarlaDataProvider.get_world()
78
79 @world.setter
80 def world(self, value: carla.World):
81 CarlaDataProvider.set_world(value)
82
83 @property
84 def map(self) -> carla.Map:
85 return CarlaDataProvider.get_map()
86
87 @map.setter
88 def map(self, value: carla.Map):
89 """
90 Avoid setting the map directly. Use :py:meth:`.CarlaDataProvider.set_world` instead.
91
92 Raises:
93 ValueError: If the map is not the same as the one set in :py:attr:`.CarlaDataProvider.get_map`.
94
95 :meta private:
96 """
97 if CarlaDataProvider.get_map() != value:
98 raise ValueError("CarlaDataProvider.get_map() and passed map are not the same.")
99 # Do nothing as map is set when using get_map or set_world
100
[docs]
101 @staticmethod
102 def get_blueprint_library() -> carla.BlueprintLibrary:
103 """
104 Access to a cached version of the blueprint library
105
106 Raises:
107 ValueError: If the blueprint library is not set, which happens when the world is not set.
108 The world must be setup before (:py:class:`CarlaDataProvider.set_world()<.CarlaDataProvider>`).
109 """
110 if CarlaDataProvider._blueprint_library is None:
111 raise ValueError("Blueprint Library not set. Call CarlaDataProvider.set_world() first.")
112 return CarlaDataProvider._blueprint_library
113
114# ==============================================================================
115# -- Game Framework ---------------------------------------------------------------
116# ==============================================================================
117
118
[docs]
119class GameFramework(AccessCarlaMixin, CarlaDataProvider):
120 """
121 A utility class to setup CARLA, pygame, Hydra_, the configuration and parts of the user interface
122 and the agent.
123
124 A :py:class:`GameFramework` instance can be used to control the game loop and work as a
125 handler for the :py:mod:`exceptions` of this project. Furthermore can it manage the cooldown
126 of the :py:class:`.Rule` classes.
127
128 Note:
129 The GameFramework derives from the :py:class:`.CarlaDataProvider` which gives this class
130 more utility methods that currently are not included in this part of the documentation.
131 """
132
133 clock: ClassVar[pygame.time.Clock] = None
134 display: ClassVar[pygame.Surface] = None
135 controller: weakref.ProxyType[RSSKeyboardControl] | RSSKeyboardControl
136 """Parser for keyboard events"""
137
138 traffic_manager: Optional[carla.TrafficManager] = None
139
140 @property
141 def launch_config(self) -> LaunchConfig:
142 """:py:class:`.LaunchConfig` object that was used for the initialization (**args**)"""
143 return self._args
144
145 @property
146 def agent_config(self) -> LunaticAgentSettings:
147 """
148 The configuration of the attached :py:attr:`agent`, if it exists otherwise the
149 **agent** attribute of the stored :py:attr:`launch_config`.
150 """
151 return self.agent.config if self.agent else self._args.agent
152
153 # ----- Init Functions -----
154
[docs]
155 @classmethod
156 def quickstart(cls, launch_config: Optional[LaunchConfig] = None, *, logging: bool = False) -> "Self":
157 """
158 Initializes Hydra_ in a limited way, i.e. does not allow for command line overrides.
159
160 Sets up the :py:class:`carla.Client` and related instances as well as pygame.
161
162 Note:
163 It is recommended that you use a :python:`@hydra.main` decorated main function instead
164 to make full use of the Hydra_ framework.
165
166 Parameters:
167 launch_config: The configuration to use. If :code:`None`, will use the default
168 configuration from :code:`./conf/launch_config.yaml`.
169 logging: If True, change the how logging is done by applying the logger settings from
170 :code:`./conf/config_extensions/job_logging.yaml`.
171 Default is :code:`False`.
172
173 Returns:
174 The initialized :py:class:`GameFramework` instance.
175
176 See Also:
177 This function uses:
178 - :py:meth:`.initialize_hydra`
179 - :py:meth:`.init_carla`
180 - :py:meth:`.init_pygame`
181 """
182 if not launch_config:
183 launch_config = cls.initialize_hydra(logging=logging)
184 if not logging and AD_RSS_AVAILABLE:
185 launch_config.agent.rss.log_level = RssLogLevel.off
186 cls.init_carla(launch_config)
187 cls.init_pygame(launch_config)
188 return cls(launch_config)
189
190 # Hydra Tools
191 # TODO: this could be some launch_tools MixinClass
[docs]
192 @staticmethod
193 def initialize_hydra(config_dir: str = "./conf",
194 config_name: str = "launch_config",
195 version_base=None, *,
196 job_name="LunaticAgentJob",
197 logging=True,
198 structured=True) -> "LaunchConfig":
199 """
200 Use this function only if no hydra.main is available.
201
202 Usage:
203
204 .. code-block:: python
205
206 args = GameFramework.initialize_hydra(config_dir=<abs_path_of_conf>, config_name="launch_config")
207 game_framework = GameFramework(args)
208
209 Args:
210 config_dir: The directory where the hydra configuration is stored.
211 config_name: The name of the configuration file.
212 version_base: The version base of hydra for the configuration. Default is None.
213 job_name: The name of the job.
214 logging: If True, will set up logging.
215 structured: If True will create the config based on :py:class:`.LaunchConfig`,
216 otherwise it will be based on a :python:`dict`. This is useful for runtime
217 type-checks and conversions.
218 If the configs :py:attr:`.LaunchConfig.strict_config` value is < 2, this
219 parameter is ignored. Disable if you experience problems.
220 Default is :python:`True`.
221
222 See Also:
223 Hydra functions:
224 - :py:func:`hydra.initialize_config_dir`
225 - :py:func:`hydra.compose`
226 """
227 config_dir = os.path.abspath(config_dir)
228 hydra_initialized = GameFramework.hydra_initialized()
229 if not hydra_initialized:
230 # Not save-guarding this against multiple calls, expose the hydra error
231 # todo: low-prio check if config dir and the other parameters are the same.
232 hydra.initialize_config_dir(version_base=version_base,
233 config_dir=config_dir,
234 job_name=job_name)
235
236 dict_config = hydra.compose(config_name=config_name,
237 return_hydra_config=not hydra_initialized,
238 overrides=None)
239 if structured and dict_config.get("strict_config", 3) >= 2:
240 # Uses the correct dataclass schemas as values.
241 if config_name == "launch_config":
242 if LaunchConfig._config_path:
243 # Load defined schema from the config Store
244 cn = config_store.load(LaunchConfig._config_path)
245 schema: Optional[DictConfig] = cn.node # type: ignore[assignment]
246 with open_dict(schema):
247 schema.merge_with(dict_config)
248 config = assure_type(LaunchConfig, schema)
249 else:
250 schema = None
251 else:
252 schema = None
253 if schema is None:
254 logger.debug("No schema found for structured init file %s. Falling back to LaunchConfig", config_name)
255 launch_config = LaunchConfig(**dict_config) # type: ignore
256 config: LaunchConfig = OmegaConf.structured(launch_config,
257 flags={'allow_objects': True})
258 else:
259 config = assure_type("LaunchConfig", dict_config)
260
261 if not hydra_initialized:
262 hydra_conf: HydraConfig = GameFramework.get_hydra_config(raw=True)
263 if OmegaConf.is_missing(config.hydra.runtime, "output_dir"):
264 config.hydra.runtime.output_dir = config.hydra.run.dir
265 hydra_conf.set_config(config) # type: ignore
266 Path(config.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True)
267 if logging:
268 # Assure that our logger works
269 configure_log(config.hydra.job_logging, logger.name) # type: ignore
270 with open_dict(config):
271 del config["hydra"]
272 config.agent._set_flag("allow_objects", True)
273 config.agent.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config.
274 return config
275
276 # TODO: Maybe unify these settings; make overrides available in the config.
[docs]
277 @staticmethod
278 def load_hydra_config(config_name: str = "conf/launch_config") -> "LaunchConfig":
279 if GameFramework.hydra_initialized():
280 return assure_type(LaunchConfig, hydra.compose(config_name=config_name))
281 config_dir, config_name = os.path.split(config_name)
282 # Try to get job name from file in the stack.
283 import inspect # noqa
284 frame = inspect.stack()[-1]
285 module = inspect.getmodule(frame[0])
286 name = module.__file__ if module and module.__file__ else "unknown"
287 return GameFramework.initialize_hydra(config_dir, config_name, job_name=name)
288
[docs]
289 def __init__(self, args: "LaunchConfig",
290 config: Optional[DictConfig] = None,
291 timeout: float = 10.0,
292 worker_threads: int = 0,
293 *, map_layers=carla.MapLayer.All):
294 """
295 Parameters:
296 args: Configuration for the GameFramework.
297 config: Optional config for the agent (unused in this project)
298 timeout: Timeout for the :py:class:`carla.Client`.
299 worker_threads: See :py:class:`carla.Client`.
300 map_layers: See :py:meth:`carla.Client.load_world`
301 """
302 if args.seed:
303 random.seed(args.seed)
304 nprandom.seed(args.seed)
305 self._args = args
306 self.world_settings: carla.WorldSettings = self.init_carla(args, timeout, worker_threads, map_layers=map_layers)
307
308 # These are class variables
309 GameFramework.clock, GameFramework.display = self.init_pygame(args)
310
311 self.config = config
312 self.agent = None
313 self.world_model = None
314 self.controller = None
315
316 self.debug = self.world.debug
317 self.continue_loop = True
318 self.traffic_manager: Optional[carla.TrafficManager] = self.init_traffic_manager()
319
320 # Import here to avoid circular imports
321 from classes.rule import BlockingRule, Rule # noqa: PLC0415,RUF100
322 self.cooldown_framework = Rule.CooldownFramework() # used in context manager. # NOTE: Currently can be constant
323 BlockingRule._gameframework = weakref.proxy(self)
324
[docs]
325 @class_or_instance_method
326 def init_pygame(cls_or_self: "Self | type[Self]", launch_config: Optional[LaunchConfig] = None,
327 recreate: bool = False) -> tuple[pygame.time.Clock, pygame.Surface]:
328 """
329 Parameters:
330 launch_config: Will use the :py:attr:`width<.LaunchConfig.width>` and
331 :py:attr:`height<.LaunchConfig.height> attributes of this object if set the
332 :py:mod:`pygame` windows size. Otherwise will use :python:`(1280, 720)`.
333 Defaults to :code:`None`.
334
335 recreate: If :python:`True`, will reinitialize pygame a second time if this function is
336 called.
337
338 .. experimental; returns None, None if launch_config.pygame is False.
339 """
340 if recreate or GameFramework.clock is None or GameFramework.display is None:
341 if launch_config is None:
342 launch_config = getattr(cls_or_self, "_args", None) # get from inst.
343 if getattr(launch_config, "pygame", True):
344 pygame.init()
345 pygame.font.init()
346 GameFramework.clock = pygame.time.Clock()
347 if getattr(launch_config, "pygame", True) and "READTHEDOCS" not in os.environ:
348 GameFramework.display = pygame.display.set_mode(
349 size=(launch_config.width, launch_config.height)
350 if launch_config else (1280, 720),
351 flags=pygame.HWSURFACE | pygame.DOUBLEBUF)
352 return GameFramework.clock, GameFramework.display
353
[docs]
354 @staticmethod
355 def init_carla(args: Optional[LaunchConfig] = None,
356 timeout: Optional[float] = None,
357 worker_threads: int = 0, *,
358 map_layers: carla.MapLayer = carla.MapLayer.All) -> carla.WorldSettings:
359 """
360 Initializes the :py:class:`carla.Client` and the connects it to the simulator.
361
362 Parameters:
363 args: The configuration for the GameFramework.
364 timeout: The timeout for the :py:class:`carla.Client`. Default is 10.0.
365
366 .. deprecated:: _
367 Use the :py:attr:`.LaunchConfig.timeout` attribute instead.
368
369 worker_threads: See :py:class:`carla.Client`.
370 map_layers: The map layers to load. Default is :py:attr:`carla.MapLayer.All`.
371
372 Returns:
373 The settings of the loaded world.
374
375 Note:
376 This function has to be called before :py:attr:`client`, :py:attr:`world`
377 or :py:attr:`map` can be accessed.
378 Alternatively the client and :py:class:`CarlaDataProvider` need to be setup in a
379 different way beforehand.
380
381 See Also:
382 - :py:class:`carla.Client`
383 - :py:meth:`carla.Client.load_world`
384 """
385 # Note: This sets up the CarlaDataProvider
386 if args is None:
387 timeout = timeout or 10.0
388 GameFramework.setup_client_map_and_world(timeout=timeout,
389 worker_threads=worker_threads,
390 map_layers=map_layers)
391 else:
392 GameFramework.setup_client_map_and_world(args.map,
393 args.host, args.port,
394 timeout=timeout or args.timeout,
395 worker_threads=worker_threads,
396 map_layers=map_layers,
397 sync=args.sync,
398 fps=args.fps)
399 return CarlaDataProvider.get_world().get_settings()
400
[docs]
401 @staticmethod
402 def setup_client_map_and_world(
403 map_name: str = "Town04",
404 ip: str = "127.0.0.1",
405 port: int = 2000, *,
406 timeout: float = 10.0,
407 worker_threads: int = 0,
408 reload_world: bool = False,
409 reset_settings: bool = True,
410 map_layers: carla.MapLayer = carla.MapLayer.All,
411 sync: Union[bool, None] = True,
412 fps: int = 20
413 ) -> tuple[carla.Client, carla.World, carla.Map]:
414 """
415 Loads the :py:class:`carla.Client`, the :py:class:`carla.World`, and the :py:class:`carla.Map`.
416
417 This is a subfunction of :py:meth:`.init_carla`
418 that can be used without a :py:class:`LaunchConfig`.
419
420 See Also:
421 :py:meth:`.init_carla`
422 :py:meth:`carla.Client.set_timeout`
423 :py:meth:`carla.Client.reload_world`
424 """
425 client = CarlaDataProvider.get_client()
426 if client is None:
427 client = carla.Client(ip, port, worker_threads)
428 client.set_timeout(timeout)
429 CarlaDataProvider.set_client(client)
430
431 world = CarlaDataProvider.get_world()
432 if not world:
433 world = client.get_world()
434 _map = world.get_map() # CarlaDataProvider map not yet set -> set_world
435
436 if map_name and _map.name != "Carla/Maps/" + map_name:
437 world: carla.World = client.load_world(map_name, reset_settings, map_layers)
438 elif reload_world:
439 world = client.reload_world(reset_settings)
440 logger.info("Reloaded world - map_layers ignored.")
441 elif map_name is None:
442 logger.info("Provided map_name is None, skipped loading world. Assuming world is already loaded.")
443 else:
444 logger.info("skipped loading world '%s', already loaded - map_layers and reset_settings ignored.", _map.name)
445
446 world_settings = world.get_settings()
447 # Apply world settings
448 if sync is not None:
449 if sync:
450 logger.debug("Using synchronous mode.")
451 # apply synchronous mode if wanted
452 world_settings.synchronous_mode = True
453 world_settings.fixed_delta_seconds = 1 / fps # 0.05
454 world.apply_settings(world_settings)
455 else:
456 logger.debug("Using asynchronous mode.")
457 world_settings.synchronous_mode = False
458 world.apply_settings(world_settings)
459 logger.info("World Settings:\n%s", world_settings)
460
461 # Note: set_world loads multiple information. It has to be called after applying the world settings.
462 CarlaDataProvider.set_world(world)
463
464 return client, world, CarlaDataProvider.get_map()
465
[docs]
466 def init_traffic_manager(self, port: Optional[int] = None) -> carla.TrafficManager:
467 """
468 Returns an instance of the :py:class:`carla.TrafficManager`
469 related to the specified port. If it does not exist, this will be created.
470
471 See Also:
472 :py:meth:`carla.Client.get_trafficmanager`
473
474 Parameters:
475 port: The port to use. If :code:`None`, will use the port from
476 :py:meth:`.CarlaDataProvider.get_traffic_manager_port`, which defaults to :code:`8000`.
477 """
478 if port is None:
479 port = CarlaDataProvider.get_traffic_manager_port()
480 traffic_manager = self.client.get_trafficmanager(port)
481 if self._args.handle_ticks:
482 if self._args.sync:
483 traffic_manager.set_synchronous_mode(True)
484 traffic_manager.set_hybrid_physics_mode(True) # Note default 50m
485 traffic_manager.set_hybrid_physics_radius(50.0) # TODO: make a LaunchConfig config variable
486 self.traffic_manager = traffic_manager
487 return traffic_manager
488
[docs]
489 def init_agent_and_interface(self,
490 ego: Optional[carla.Vehicle],
491 agent_class: "type[LunaticAgent]",
492 config: Optional[LunaticAgentSettings] = None,
493 overwrites: Optional[dict[str, Any]] = None
494 ) -> "tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl]":
495 """
496 Quick setup for the agent and the world model.
497
498 Among others this executes:
499 - :py:meth:`.LunaticAgent.create_world_and_agent`
500 - :py:meth:`.GameFramework.make_controller`
501 - :py:meth:`.WorldModel.tick_server_world`
502
503 .. code-block:: python
504
505 from agents.lunatic_agent import LunaticAgent, LunaticAgentSettings
506
507 ego = world.spawn_actor(world.get_blueprint_library().find("vehicle.audi.tt"))
508 agent, world_model, global_planner, controller = (
509 game_framework.init_agent_and_interface(ego, LunaticAgent)
510 )
511
512 Arguments:
513 ego: The ego vehicle. Can be :code:`None` if the agent is set to use an
514 external actor (:py:attr:`.LaunchConfig.externalActor`).
515 agent_class: The agent class to instantiate.
516 config: The configuration of the agent. If :code:`None` the
517 :py:attr:`.agent_class.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` are used.
518 overwrites: Additional overwrites to the configuration.
519 """
520 if ego is None and not self._args.externalActor:
521 raise ValueError("`ego` must be passed if ``externalActor` is not set.")
522 self.agent, self.world_model, self.global_planner \
523 = agent_class.create_world_and_agent(self._args,
524 vehicle=ego,
525 sim_world=self.world,
526 agent_config=config,
527 overwrites=overwrites)
528 self.config = self.agent.config
529 controller = self.make_controller(self.world_model, RSSKeyboardControl, start_in_autopilot=False) # Note: stores weakref to controller
530 self.world_model.game_framework = weakref.proxy(self)
531 self.world_model.tick_server_world()
532 self.agent.verify_settings(strictness=-1) # NOTE: Here live info is already available and will throw some errors
533 return self.agent, self.world_model, self.global_planner, controller
534
[docs]
535 def make_world_model(self, config: "LunaticAgentSettings",
536 player: Optional[carla.Vehicle] = None) -> "WorldModel":
537 """
538 Creates a :py:class:`WorldModel` with a backreference to the GameFramework.
539 """
540 self.world_model = WorldModel(config, self._args, player=player)
541 self.world_model.game_framework = weakref.proxy(self)
542 return self.world_model
543
[docs]
544 def make_controller(self,
545 world_model: "WorldModel",
546 controller_class: type[ControllerClassT] = RSSKeyboardControl,
547 **kwargs):
548 """
549 Creates a keyboard controller and attaches it to the world model.
550
551 Args:
552 world_model: The world model to attach the controller to.
553 controller_class: The controller class to instantiate. Defaults to :py:class:`.RSSKeyboardControl`.
554 **kwargs: Additional arguments to pass to the controller.
555 """
556 controller = controller_class(world_model,
557 config=self.config,
558 clock=self.clock,
559 **kwargs)
560 self.controller = weakref.proxy(controller)
561 return controller # NOTE: does not return the proxy object.
562
[docs]
563 @staticmethod
564 def hydra_initialized() -> bool:
565 """
566 Checks wether Hydra_ is initialized. This is normally only done
567 when the :py:func:`@hydra.main` decorator is used.
568 """
569 return GlobalHydra.instance().is_initialized()
570
571 @overload
572 @staticmethod
573 def get_hydra_config(raw: Literal[False] = False) -> "HydraConf": ...
574
575 @overload
576 @staticmethod
577 def get_hydra_config(raw: Literal[True]) -> HydraConfig: ...
578
[docs]
579 @staticmethod
580 def get_hydra_config(raw: bool = False) -> "HydraConfig | HydraConf":
581 """
582 Retrieves the Hydra_ configuration object.
583
584 Parameters:
585 raw: If :python:`True`, returns the :py:class:`hydra.conf.HydraConf` dataclass, otherwise the
586 :py:class:`hydra.core.hydra_config.HydraConfig` singleton. Default is :code:`False`.
587
588 Raises:
589 ValueError: If the HydraConfig was not set up yet and :python:`raw=True`.
590 """
591 if raw:
592 return HydraConfig.instance()
593 return HydraConfig.get()
594
595 # ----- Setters -----
596
597 def set_controller(self, controller: KeyboardControl) -> None:
598 """
599 Set the :py:class:`KeyboardControl`
600
601 :meta private:
602 """
603 self.controller = controller # type: ignore ; maybe use proxy
604
605 def set_config(self, config: DictConfig) -> None:
606 """
607 Change the :py:attr:`config`.
608
609 :meta private:
610 """
611 self.config = config
612
613 # ----- UI Functions -----
614
617
[docs]
618 def render_everything(self):
619 """
620 Update render and hud
621
622 Note:
623 This is the preferred method to update the world and render the camera.
624 """
625 self.world_model.tick(self.clock) # NOTE: Ticks WorldMODEL not CARLA WORLD! # pyright: ignore[reportOptionalMemberAccess]
626 self.world_model.render(self.display, finalize=False) # pyright: ignore[reportOptionalMemberAccess]
627 self.controller.render(self.display)
628 # These two types must be in sync:
629 dm_render_conf: "DetectionMatrix.RenderOptions" = self._args.camera.hud.detection_matrix # type: ignore[assignment]
630 if dm_render_conf and self.agent:
631 self.agent.render_detection_matrix(self.display, **dm_render_conf)
632 self.world_model.finalize_render(self.display) # pyright: ignore[reportOptionalMemberAccess]
633
[docs]
634 @staticmethod
635 def skip_rest_of_loop(message="GameFramework.end_loop") -> NoReturn:
636 """
637 Terminates the current iteration and exits the GameFramework by raising a :py:exc:`.ContinueLoopException`.
638
639 Note:
640 It is the users responsibility to manage the agent & local planner
641 before calling this function, i.e. that the agent has a :py:class:`carla.VehicleControl` set.
642
643 Raises:
644 ContinueLoopException: With the given **message**.
645 """
646 # TODO: add option that still allows for rss.
647 raise ContinueLoopException(message)
648
649 # -------- Tools --------
650
[docs]
651 @staticmethod
652 def destroy_actors(actors: Iterable[carla.Actor | CustomSensorInterface]):
653 """
654 Destroys the given actors and customs sensors implemented in this package.
655
656 Removes destroyed actors from the :py:class:`.CarlaDataProvider` actor pool.
657 """
658 batch: "list[carla.command.DestroyActor]" = []
659 for actor in actors:
660 if isinstance(actor, (carla.Sensor, CustomSensorInterface)):
661 actor.stop()
662 if isinstance(actor, carla.Actor):
663 if actor.is_alive:
664 batch.append(carla.command.DestroyActor(actor))
665 else:
666 actor.destroy()
667
668 if batch and CarlaDataProvider._client:
669 try:
670 CarlaDataProvider._client.apply_batch(batch)
671 except RuntimeError as e:
672 if "time-out" in str(e):
673 logger.warning("Timeout while destroying actors.")
674 else:
675 raise
676 else:
677 for command in batch:
678 if CarlaDataProvider.actor_id_exists(command.actor_id):
679 del CarlaDataProvider._carla_actor_pool[command.actor_id] # remove by batch and not by individual command
680 elif not CarlaDataProvider._client:
681 logger.error("No client available to destroy actors.")
682
683 # -------- Context Manager --------
684
685 def __call__(self, agent: "LunaticAgent"):
686 """
687 Use as context manager to handle the game loop.
688 Pass an agent if None is set yet.
689 """
690 self.agent = agent
691 self.world_model = agent._world_model
692 try:
693 if self.world_model.controller: # weakref.proxy
694 self.controller = self.world_model.controller
695 except ReferenceError:
696 self.controller = None # type: ignore[assignment]
697 if not self.controller:
698 logger.debug("Creating new controller.")
699 self.controller = self.make_controller(self.world_model, start_in_autopilot=self._args.autopilot) # hard reference # type: ignore
700 self.world_model.controller = self.controller # hard instead of weak reference
701 self.agent._validate_phases = False
702 return self
703
704 def __enter__(self):
705 if self.agent is None:
706 raise ValueError("Agent not initialized.")
707 if self.world_model is None:
708 raise ValueError("World Model not initialized.")
709 if self.controller is None:
710 raise ValueError("Controller not initialized.")
711
712 self.clock.tick() # self.args.fps)
713 frame = None
714 if self._args.handle_ticks: # i.e. no scenario runner doing it for us
715 if CarlaDataProvider.is_sync_mode():
716 frame = self.world_model.world.tick(self._args.timeout)
717 else:
718 frame = self.world_model.world.wait_for_tick(self._args.timeout).frame
719 CarlaDataProvider.on_carla_tick()
720
721 if CarlaDataProvider.is_sync_mode():
722 # We do this only in sync mode as frames could pass between gathering this information
723 # and an agent calling InformationManager.tick(), which in turn calls global_tick
724 # with possibly a DIFFERENT frame wasting computation.
725 if frame is None:
726 frame = self.get_world().get_snapshot().frame
727 InformationManager.global_tick(frame)
728 return self
729
730 def __exit__(self, exc_type, exc_val, exc_tb):
731 self.cooldown_framework.__exit__(exc_type, exc_val, exc_tb)
732 if exc_val:
733 if isinstance(exc_val, AgentDoneException):
734 self.continue_loop = False
735 elif isinstance(exc_val, ContinueLoopException):
736 logger.error("ContinueLoopException(%s) should be thrown during `agent.run_step` but caught by GameFramework this should not happen. Skipping this step; no controls are applied!", exc_val)
737 else:
738 return # skip render and likely terminate.
739 self.render_everything()
740 pygame.display.flip()
741
[docs]
742 @class_or_instance_method
743 def cleanup(cls_or_self: "type[Self] | Self",
744 *,
745 disable_sync: bool = True,
746 quit_pygame: bool = True):
747 """
748 Cleans up resources and actors.
749
750 Args:
751 disable_sync: If True, will disable synchronous mode. This will
752 prevent the freezing of the Unreal Editor.
753 Default is True.
754 quit_pygame: If True, will call :py:func:`pygame.quit`. Default is True.
755
756 Note:
757 - When called from an instance with an attached agent,
758 the :python:`agent.destroy()` method is called.
759 - Otherwise will call :py:obj:`CarlaDataProvider.cleanup() <.CarlaDataProvider>`.
760 """
761 try:
762 # Should only work for instance version, but maybe future Singleton support
763 try:
764 if cls_or_self.agent: # pyright: ignore[reportAttributeAccessIssue]
765 cls_or_self.agent.destroy() # pyright: ignore[reportAttributeAccessIssue]
766 finally:
767 if cls_or_self.world_model: # pyright: ignore[reportAttributeAccessIssue]
768 cls_or_self.world_model.destroy() # pyright: ignore[reportAttributeAccessIssue]
769 if disable_sync and cls_or_self.traffic_manager:
770 with contextlib.suppress(Exception):
771 cls_or_self.traffic_manager.set_synchronous_mode(False)
772 finally:
773 # Prevent freezing of the editor
774 if disable_sync and CarlaDataProvider.get_world() is not None:
775 # Disable Synchronous Mode
776 world_settings = carla.WorldSettings(synchronous_mode=False,
777 fixed_delta_seconds=0.0)
778 cls_or_self._world.apply_settings(world_settings)
779 CarlaDataProvider.cleanup()
780 if quit_pygame:
781 pygame.quit()
782
783 # Include access to our exceptions here
784 exceptions = _exceptions
785 """
786 shortcut to :py:mod:`.exceptions` module containing custom exceptions.
787
788 :meta hide-value:
789 """
790
791
792# ==============================================================================
793# -- World ---------------------------------------------------------------
794# ==============================================================================
795
[docs]
796class WorldModel(AccessCarlaMixin, CarlaDataProvider):
797 """
798 Class representing the surrounding environment.
799
800 This class is the interface between the agent, the :external_py_class:`carla.World`, the
801 :py:class:`.HUD`, and the :py:class:`.KeyboardControl`.
802 It handles ticking of the simulator and rendering of the pygame interface.
803
804 If :py:attr:`.LaunchConfig.externalActor` is set, it will look for an actor with the role name
805 :py:attr:`.LaunchConfig.rolename`, if such an actor does not yet exist it will wait for its
806 creation until the calling script continues.
807 """
808
809 controller: Optional[Union[RSSKeyboardControl, weakref.ProxyType[RSSKeyboardControl]]] = None
810 """
811 Set when controller is created. Uses weakref.proxy as backreference.
812 This is not a :py:mod:`weakref` object, when
813 `with gameframework(agent) <GameFramework.__call__>`:py:meth: is used.
814 """
815
816 game_framework: Optional["weakref.CallableProxyType[GameFramework]"] = None
817 """
818 Set when world created via GameFramework. Uses weakref.proxy as backreference
819
820 Attention:
821 Currently not used and not initialized.
822
823 :meta private:
824 """
825
826 player: carla.Vehicle
827 """
828 The linked actor. If :py:attr:`.external_actor` is set this will be the first actor found
829 with that role name.
830 """
831
[docs]
832 def __init__(self, config: "LunaticAgentSettings",
833 args: Union[LaunchConfig, Mapping[str, Any], "os.PathLike[str]", str] = "./conf/launch_config.yaml",
834 agent: Optional[LunaticAgent] = None,
835 *,
836 carla_world: Optional[carla.World] = None,
837 player: Optional[carla.Vehicle] = None,
838 map_inst: Optional[carla.Map] = None):
839 """Constructor method"""
840 # Set World
841 if self.get_world() is None:
842 if carla_world is None:
843 raise ValueError("CarlaDataProvider not available and `carla_world` not passed.")
844 self.world = carla_world
845 elif carla_world is not None and self.world != carla_world:
846 raise ValueError("CarlaDataProvider.get_world() and passed `carla_world` are not the same.")
847
848 self.world_settings = self.world.get_settings()
849 """Object containing some data about the simulation such as synchrony between client and server or rendering mode."""
850
851 if agent:
852 agent._world_model = self # backreference, if needed; the LunaticAgent sets this as well.
853
854 if self.map is not None: # if this is set accesses CarlaDataProvider
855 if map_inst and self.map != map_inst:
856 raise ValueError("CarlaDataProvider.get_map() and passed map_inst are not the same.")
857 elif map_inst:
858 if isinstance(map_inst, carla.Map):
859 self.map = map_inst
860 else:
861 logger.warning("Warning: Ignoring the given map as it is not a 'carla.Map'")
862 if self.map is None:
863 try:
864 self.set_world(self.world) # CDP function
865 except RuntimeError as error:
866 print(f'RuntimeError: {error}')
867 print(' The server could not send the OpenDRIVE (.xodr) file:')
868 print(' Make sure it exists, has the same name of your town, and is correct.')
869 sys.exit(1)
870
871 self._config = config
872 if not isinstance(args, (Mapping, DictConfig, LaunchConfig)): # TODO: should rather check for string like
873 # Args is expected to be a string here
874 # NOTE: This does NOT INCLUDE CLI OVERWRITES
875 # When passed as path with directory
876 config_dir, config_name = os.path.split(args)
877 try:
878 args = GameFramework.initialize_hydra(config_dir, config_name)
879 except ValueError:
880 # Hydra already initialized
881 args = GameFramework.load_hydra_config(config_name)
882 except Exception:
883 print("Problem with", type(args), args)
884 raise
885 args.externalActor = not (player is not None or agent is not None) # TEMP: Remove to force clean config.
886 self._args: LaunchConfig = assure_type(LaunchConfig, args)
887
888 self.hud: HUD = HUD(self._args.width, self._args.height, self.world)
889 """The :py:class:`HUD` that is managed."""
890
891 self.sync: Optional[bool] = self._args.sync
892 """Set from :py:attr:`.LaunchConfig.sync`"""
893
894 self.dim = (self._args.width, self._args.height)
895
896 self.external_actor: bool = self._args.externalActor
897 """Set from :py:attr:`.LaunchConfig.externalActor`"""
898
899 self.actor_role_name: Optional[str] = self._args.rolename
900 """Set from :py:attr:`.LaunchConfig.rolename`"""
901
902 self._actor_filter = self._args.filter
903 self._actor_generation: Literal[1, 2, "all"] = self._args.generation
904 self._gamma = self._args.camera.gamma
905
906 # TODO: Unify with CameraManager
907 self.recording = False
908 self._has_recorded = False
909 self._recording_dirs = []
910 self.recording_frame_num = 0
911 self.recording_dir_num = 0
912
913 # From manual controls; used client.start_recorder()
914 #
915 self.recording_enabled: bool = False
916 """
917 Indicator if :py:attr:`carla.Client` recording feature is on or off.
918
919 Experimental & Untested!
920 CTRL + R : toggle recording of simulation (replacing any previous)
921 CTRL + P : start replaying last recorded simulation
922
923 :meta private:
924 """
925
926 self.recording_start = 0
927 """
928 CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds)
929 CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds)
930
931 :meta private:
932 """
933
934 if self.external_actor and (player is not None or agent is not None):
935 raise ValueError("External actor cannot be used with player or agent.")
936 if player is None and agent is not None:
937 self.player = agent._vehicle
938 elif player is not None and agent is not None:
939 if player != agent._vehicle:
940 raise ValueError("Passed `player` and `agent._vehicle` are not the same.")
941 self.player = player
942 else:
943 # If player is None here, will set in in restart()
944 self.player = player # pyright: ignore[reportAttributeAccessIssue]
945
946 assert self.player is not None or self.external_actor # Note: Former optional. Player set in restart
947
948 self.collision_sensor: CollisionSensor = None # type: ignore # set in restart
949 self.lane_invasion_sensor: LaneInvasionSensor = None # type: ignore # set in restart
950 self.gnss_sensor: Optional[GnssSensor] = None
951 self.imu_sensor: Optional[IMUSensor] = None # from interactive
952 self.radar_sensor: Optional[RadarSensor] = None # from interactive
953 self.camera_manager: CameraManager = None # type: ignore # set in restart
954 """
955 Manages cameras for the user interface and :py:class:`.HUD`.
956 """
957
958 self._weather_presets = CarlaDataProvider.find_weather_presets()
959 self._weather_index = 0
960 self.weather: str = "NotSet"
961 """
962 Name of currently used weather preset.
963 See also: :py:class:`CarlaDataProvider.find_weather_presets()<CarlaDataProvider>`
964
965 :meta hide-value:
966 """
967
968 self.actors: List[Union[carla.Actor, CustomSensorInterface]] = []
969 """Actors attached to this instance for the user interface and :py:class:`.HUD`."""
970
971 # From interactive:
972 self.constant_velocity_enabled = NotImplemented #: :meta private:
973 self.show_vehicle_telemetry = False #: :meta private: # enabled via KeyboardController
974 self.doors_are_open: bool = False
975 """
976 Note:
977 Only set over the KeyboardController, does not query the actor or simulation
978
979 :meta private:
980 """
981
982 self.current_map_layer = 0
983 self.map_layer_names = [
984 carla.MapLayer.NONE,
985 carla.MapLayer.Buildings,
986 carla.MapLayer.Decals,
987 carla.MapLayer.Foliage,
988 carla.MapLayer.Ground,
989 carla.MapLayer.ParkedVehicles,
990 carla.MapLayer.Particles,
991 carla.MapLayer.Props,
992 carla.MapLayer.StreetLights,
993 carla.MapLayer.Walls,
994 carla.MapLayer.All
995 ]
996 # RSS
997 # set in restart
998 self.rss_sensor: Optional[RssSensor] = None
999 self.rss_unstructured_scene_visualizer: RssUnstructuredSceneVisualizer = None # type: ignore[assignment]
1000 self.rss_bounding_box_visualizer: RssBoundingBoxVisualizer = None # type: ignore[assignment]
1001
1002 if config.rss:
1003 if config.rss.enabled and not self._actor_filter.startswith("vehicle."):
1004 print('Error: RSS only supports vehicles as ego. Disable RSS or use a vehicle filter for the actor.')
1005 sys.exit(1)
1006 if AD_RSS_AVAILABLE and config.rss.enabled:
1007 self._restrictor = carla.RssRestrictor()
1008 else:
1009 self._restrictor = None
1010 else:
1011 self._restrictor = None
1012
1013 logger.info("Calling WorldModel.restart()")
1014 self._first_start = True
1015 """Indicate that restart was called for the first time."""
1016 self.restart()
1017 assert self.player is not None
1018 self._vehicle_physics = self.player.get_physics_control()
1019 self.world_tick_id = self.world.on_tick(self.hud.on_world_tick)
1020
1021 if CarlaDataProvider._traffic_light_map is None:
1022 logger.error("Traffic light map not set at this point") # should not have happened
1023 CarlaDataProvider.set_world(self._world)
1024 elif not CarlaDataProvider._traffic_light_map:
1025 logger.error("Traffic light map is empty") # should not have happened
1026 CarlaDataProvider.prepare_map()
1027
1060
1061 def toggle_pause(self):
1062 """
1063 Toggle pause_simulation from the KeyboardControls.
1064
1065 :meta private:
1066 """
1067 settings = self.world.get_settings()
1068 self.pause_simulation(not settings.synchronous_mode)
1069
[docs]
1070 def pause_simulation(self, pause: bool):
1071 """
1072 Pauses the simulation by setting the world to synchronous mode.
1073
1074 Attention:
1075 Only works reliable in **asynchronous mode** (:py:attr:`sync=False <.LaunchConfig.sync>`)
1076 and might lead to unexpected behavior in synchronous mode.
1077 """
1078 if self._args.sync:
1079 logger.warning("Pause simulation only works in asynchronous mode.")
1080
1081 settings = self.world.get_settings()
1082 if pause and not settings.synchronous_mode:
1083 settings.synchronous_mode = True
1084 settings.fixed_delta_seconds = 0.05
1085 self.world.apply_settings(settings)
1086 elif not pause and settings.synchronous_mode:
1087 settings.synchronous_mode = False
1088 settings.fixed_delta_seconds = None
1089 self.world.apply_settings(settings)
1090
1091 @staticmethod
1092 def _find_external_actor(world: carla.World,
1093 role_name: str,
1094 actor_list: Optional[carla.ActorList] = None) -> Optional[carla.Actor]:
1095 """
1096 Looks to find an actor with a matching **role_name**.
1097 """
1098 player = None
1099 for actor in actor_list or world.get_actors():
1100 if actor.attributes.get('role_name') == role_name:
1101 if player is not None:
1102 logger.error("Multiple actors with role_name `%s` found. id: %s. "
1103 "Returning the first one found.", role_name, actor.id)
1104 else:
1105 player = actor
1106 return player
1107
1108 def _wait_for_external_actor(self, timeout: float = 20, sleep: float = 3) -> carla.Actor:
1109 """
1110 Does not resume the script until an external actor with the role name is found.
1111
1112 Raises:
1113 AssertionError: If :py:attr:`actor_role_name` is not set.
1114 SystemExit: If the actor is not found within the time period.
1115 """
1116 assert self.actor_role_name
1117 self.tick_server_world() # Tick the world?
1118 start = time.time()
1119 t = start
1120 while t < start + timeout:
1121 player = self._find_external_actor(self.world, self.actor_role_name)
1122 if player is not None:
1123 return player
1124 logger.info("...External actor not found. Waiting to find external actor named `%s`",
1125 self.actor_role_name)
1126 time.sleep(sleep) # Note if on same thread, nothing will happen. Put function into thread?
1127 self.tick_server_world() # Tick the world?
1128 t = time.time()
1129 logger.error(f"External actor `{self.actor_role_name}` not found. Exiting...")
1130 print(f"External actor `{self.actor_role_name}` not found. Exiting...")
1131 sys.exit(1)
1132
[docs]
1133 def restart(self):
1134 """
1135 Restart the world and sets up the :py:class:`.HUD` sensors.
1136 If :py:attr:`player` is not set or :py:attr:`external_actor` is set,
1137 looks for an actor with the role name, or spawns a new actor.
1138
1139 Note:
1140 Called during :py:meth:`__init__`.
1141 """
1142 # Keep same camera config if the camera manager exists.
1143 cam_index = assure_type(int, self.camera_manager.index
1144 if self.camera_manager is not None
1145 else 0)
1146 cam_pos_id = (self.camera_manager.transform_index
1147 if self.camera_manager is not None
1148 else 0)
1149 if self.external_actor:
1150 # Check whether there is already an actor with defined role name
1151 if not self.actor_role_name:
1152 raise ValueError("When using external actor, rolename must be set.")
1153 actor_list = self.world.get_actors() # In sync mode the actor list could be empty
1154 external_actor = self._find_external_actor(self.world, self.actor_role_name, actor_list)
1155 if self.player is None:
1156 if external_actor:
1157 self.player = assure_type(carla.Vehicle, external_actor)
1158 else:
1159 self.player = assure_type(carla.Vehicle,
1160 self._wait_for_external_actor(timeout=self._args.timeout + 10))
1161 elif external_actor and self.player.id != external_actor.id: # NOTE: even with same id different instances and hashes.
1162 logger.warning("External actor found with role_name `%s` but different id. "
1163 "Keeping the current actor (%s) and ignoring the external actor (%s)",
1164 self.actor_role_name, self.player.id, external_actor.id)
1165 if TYPE_CHECKING:
1166 self.player = assure_type(carla.Vehicle, self.player)
1167
1168 else:
1169 # Get a random blueprint.
1170 if self.player is None or self.camera_manager is not None:
1171 # First pass without a player or second pass -> new player
1172 blueprint: carla.ActorBlueprint = assure_type(carla.ActorBlueprint,
1173 random.choice(get_actor_blueprints(self._actor_filter, self._actor_generation)))
1174 blueprint.set_attribute('role_name', self.actor_role_name) # type: ignore[arg-type]
1175 if blueprint.has_attribute('color'):
1176 color = random.choice(blueprint.get_attribute('color').recommended_values)
1177 blueprint.set_attribute('color', color)
1178 # From Interactive:
1179 if blueprint.has_attribute('terramechanics'): # For Tire mechanics/Physics? # Todo is that needed?
1180 blueprint.set_attribute('terramechanics', 'false')
1181 if blueprint.has_attribute('driver_id'):
1182 driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values)
1183 blueprint.set_attribute('driver_id', driver_id)
1184 if blueprint.has_attribute('is_invincible'):
1185 blueprint.set_attribute('is_invincible', 'true')
1186
1187 # TODO: Make this a config option to choose automatically.
1188 # set the max speed
1189 #if blueprint.has_attribute('speed'):
1190 # self.player_max_speed = float(blueprint.get_attribute('speed').recommended_values[1])
1191 # self.player_max_speed_fast = float(blueprint.get_attribute('speed').recommended_values[2])
1192
1193 # Spawn the player.
1194 if self.player is not None:
1195 spawn_point = self.player.get_transform()
1196 spawn_point.location.z += 2.0
1197 spawn_point.rotation.roll = 0.0
1198 spawn_point.rotation.pitch = 0.0
1199 if self.camera_manager is not None: # None at first start; not None if player was already set before
1200 self.destroy()
1201 self.player = assure_type(carla.Vehicle, self.world.try_spawn_actor(blueprint, spawn_point))
1202 self.modify_vehicle_physics(self.player)
1203 while self.player is None:
1204 if not self.map.get_spawn_points():
1205 print('There are no spawn points available in your map/town.')
1206 print('Please add some Vehicle Spawn Point to your UE4 scene.')
1207 sys.exit(1)
1208 spawn_points = self.map.get_spawn_points()
1209 spawn_point: carla.Transform = random.choice(spawn_points) if spawn_points else carla.Transform() # type: ignore
1210 self.player = assure_type(carla.Vehicle,
1211 self.world.try_spawn_actor(blueprint, spawn_point))
1212 # From Interactive:
1213 # See: https://carla.readthedocs.io/en/latest/tuto_G_control_vehicle_physics/
1214 self.show_vehicle_telemetry = False
1215 self.modify_vehicle_physics(self.player)
1216
1217 # Clean external actors restarting a second time
1218 if self.external_actor and (
1219 # None cleans only first time; False will never clean.
1220 (self._args.restart_clean_sensors is not False and not self._first_start)
1221 or self._args.restart_clean_sensors is True
1222 ):
1223 ego_sensors = [actor for actor in self.world.get_actors() if actor.parent == self.player]
1224
1225 # Remove all old sensors
1226 for ego_sensor in ego_sensors:
1227 if ego_sensor is not None:
1228 ego_sensor.destroy()
1229
1230 # Set up the sensors.
1231 self.collision_sensor = CollisionSensor(self.player, self.hud)
1232 self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud)
1233 self.gnss_sensor = None # GnssSensor(self.player) # TODO: make it optional
1234 self.imu_sensor = None # IMUSensor(self.player)
1235 self.actors.extend([
1236 self.collision_sensor,
1237 self.lane_invasion_sensor,
1238 ])
1239 if self.gnss_sensor:
1240 self.actors.append(self.gnss_sensor)
1241 if self.imu_sensor:
1242 self.actors.append(self.imu_sensor)
1243
1244 logger.log(0, "Setting up camera manager")
1245 self.camera_manager = CameraManager(self.player, self.hud, self._args)
1246 self.camera_manager.transform_index = cam_pos_id
1247 self.camera_manager.set_sensor(cam_index, notify=False)
1248 logger.log(0, "Camera Manager set up")
1249
1250 actor_type = get_actor_display_name(self.player)
1251 self.hud.notification(text=actor_type)
1252
1253 self.rss_unstructured_scene_visualizer = RssUnstructuredSceneVisualizer(self.player,
1254 self.world,
1255 self.dim,
1256 gamma_correction=self._gamma) # TODO: use args instead of gamma
1257 self.rss_bounding_box_visualizer = RssBoundingBoxVisualizer(self.dim,
1258 self.world,
1259 self.camera_manager.sensor)
1260 if AD_RSS_AVAILABLE and self._config.rss and self._config.rss.enabled:
1261 log_level = self._config.rss.log_level
1262 if not isinstance(log_level, carla.RssLogLevel):
1263 try:
1264 if isinstance(log_level, str):
1265 log_level = carla.RssLogLevel.names[log_level]
1266 else:
1267 log_level = carla.RssLogLevel(log_level)
1268 except Exception as e:
1269 msg = f"Could not convert '{log_level}' to RssLogLevel must be in {list(carla.RssLogLevel.names.keys())}"
1270 raise KeyError(msg) from e
1271 logger.info("Carla Log level was not a RssLogLevel")
1272 self.rss_sensor = RssSensor(self.player,
1273 self.rss_unstructured_scene_visualizer,
1274 self.rss_bounding_box_visualizer,
1275 self.hud.rss_state_visualizer,
1276 visualizer_mode=self._config.rss.debug_visualization_mode,
1277 log_level=log_level)
1278 self.rss_set_road_boundaries_mode(self._config.rss.use_stay_on_road_feature)
1279 else:
1280 self.rss_sensor = None
1281 self._first_start = False
1282 self.tick_server_world()
1283
[docs]
1284 def tick_server_world(self) -> "int | carla.WorldSnapshot | None":
1285 """
1286 When :py:attr:`.LaunchConfig.handle_ticks` is :python:`True`
1287 uses :external_py_meth:`carla.World.tick` or :external_py_meth:`carla.World.wait_for_tick`
1288 depending on :py:attr:`.LaunchConfig.sync`.
1289 """
1290 if self._args.handle_ticks:
1291 if self.sync:
1292 # if timeout is used it it might interfere with _wait_for_external_actor
1293 return self.world.tick(self._args.timeout)
1294 return self.world.wait_for_tick(self._args.timeout)
1295 return None
1296
1297 #def tick(self, clock):
1298 # self.hud.tick(self.player, clock) # RSS example. TODO: Check which has to be used!
1299
[docs]
1300 def tick(self, clock: "pygame.time.Clock"):
1301 """Method for every tick"""
1302 self.hud.tick(self, clock, InformationManager.obstacles)
1303
[docs]
1304 def next_weather(self, reverse: bool = False) -> None:
1305 """Get next weather setting"""
1306 self._weather_index += -1 if reverse else 1
1307 self._weather_index %= len(self._weather_presets)
1308 preset = self._weather_presets[self._weather_index]
1309 self.hud.notification(f'Weather: {preset[1]}')
1310 self.player.get_world().set_weather(preset[0])
1311 self.weather = preset[1]
1312
[docs]
1313 def next_map_layer(self, reverse: bool = False) -> None:
1314 self.current_map_layer += -1 if reverse else 1
1315 self.current_map_layer %= len(self.map_layer_names)
1316 selected = self.map_layer_names[self.current_map_layer]
1317 self.hud.notification(f'LayerMap selected: {selected}')
1318
[docs]
1319 def load_map_layer(self, unload: bool = False):
1320 selected = self.map_layer_names[self.current_map_layer]
1321 if unload:
1322 self.hud.notification(f'Unloading map layer: {selected}')
1323 self.world.unload_map_layer(selected)
1324 else:
1325 self.hud.notification(f'Loading map layer: {selected}')
1326 self.world.load_map_layer(selected)
1327
[docs]
1328 def toggle_recording(self):
1329 """
1330 Start recording images from the camera output.
1331
1332 Saved in
1333 :py:attr:`LaunchConfig.camera.recorder.output_path <.CameraConfig.RecorderSettings.output_path>`
1334 with the current frame number.
1335 """
1336 if not self.recording:
1337 self._has_recorded = True
1338 dir_name, filename = os.path.split(self._args.camera.recorder.output_path)
1339 try:
1340 dir_name_formatted = dir_name % self.recording_dir_num
1341 except TypeError:
1342 dir_name += "%04d"
1343 dir_name_formatted = dir_name % self.recording_dir_num
1344 while os.path.exists(dir_name_formatted): # noqa: PTH110
1345 self.recording_dir_num += 1
1346 dir_name_formatted = dir_name % self.recording_dir_num
1347 self.recording_file_format = os.path.join(dir_name, filename) # keep unformatted. # noqa: PTH118
1348 self.recording_frame_num = 0
1349 Path(dir_name_formatted).mkdir(parents=True, exist_ok=True)
1350 self.hud.notification(f'Started recording (folder: {dir_name_formatted})')
1351 self._recording_dirs.append(dir_name_formatted)
1352 else:
1353 dir_name_formatted = os.path.split(self.recording_file_format)[0] % self.recording_dir_num
1354 self.hud.notification(f'Recording finished (folder: {dir_name_formatted})')
1355
1356 self.recording = not self.recording
1357
[docs]
1358 def toggle_radar(self):
1359 """Adds or destroys a radar sensor for the user interface"""
1360 if self.radar_sensor is None:
1361 self.radar_sensor = RadarSensor(self.player)
1362 elif self.radar_sensor.sensor is not None:
1363 self.radar_sensor.sensor.destroy()
1364 self.radar_sensor = None
1365
[docs]
1366 def modify_vehicle_physics(self, actor: carla.Vehicle):
1367 # If actor is not a vehicle, we cannot use the physics control
1368 try:
1369 physics_control = actor.get_physics_control()
1370 physics_control.use_sweep_wheel_collision = True
1371 actor.apply_physics_control(physics_control)
1372 except Exception:
1373 logger.warning("Could not modify vehicle physics for actor: %s", actor)
1374
[docs]
1375 def finalize_render(self, display: pygame.Surface):
1376 """
1377 Draws the HUD and saves the image if recording is enabled.
1378
1379 Assumes render(..., finalize=False) was called before,
1380 use this function if you want to render something in between.
1381 """
1382 self.hud.render(display)
1383 if self.recording:
1384 try:
1385 pygame.image.save(display, self.recording_file_format % (self.recording_dir_num, self.recording_frame_num))
1386 except Exception as e:
1387 logger.error("Could not save image format: `%s` % (self.recording_dir_num, self.recording_frame_num): %s", self.recording_file_format, e)
1388 self.recording_frame_num += 1
1389
[docs]
1390 def render(self, display: pygame.Surface, finalize: bool = True):
1391 """
1392 Render world
1393
1394 Recording should be done at the end of the render method,
1395 however camera_manager.render is called first to render the camera.
1396
1397 Call with finalize=False to only render the camera.
1398
1399 Afterwards applying other render features call `finalize_render`
1400 to draw the HUD and save the image if recording is enabled.
1401 """
1402 self.camera_manager.render(display)
1403 self.rss_bounding_box_visualizer.render(display, self.camera_manager.current_frame)
1404 self.rss_unstructured_scene_visualizer.render(display)
1405 if finalize:
1406 self.finalize_render(display)
1407
[docs]
1408 def destroy_sensors(self):
1409 """Destroy sensors"""
1410 if self.rss_sensor:
1411 self.rss_sensor.destroy()
1412 self.rss_sensor = None
1413 if self.rss_unstructured_scene_visualizer:
1414 self.rss_unstructured_scene_visualizer.destroy()
1415 self.rss_unstructured_scene_visualizer = None # type: ignore[assignment]
1416 if self.camera_manager is not None:
1417 self.camera_manager.destroy()
1418 self.camera_manager = None # type: ignore[assignment]
1419 if self.radar_sensor is not None:
1420 self.toggle_radar() # destroys it if not None
1421
[docs]
1422 def destroy(self, destroy_ego: bool = False):
1423 """
1424 Destroys all actors
1425
1426 Parameters:
1427 destroy_ego: If True, will destroy the :py:attr:`player` as well. Else assume that it
1428 is destroyed by someone else.
1429 """
1430 # stop from ticking
1431 if self.world_tick_id and self.world:
1432 self.world.remove_on_tick(self.world_tick_id)
1433 self.destroy_sensors()
1434 if destroy_ego and self.player not in self.actors: # do not destroy external actors.
1435 logger.debug("Adding player to destruction list.")
1436 self.actors.append(self.player)
1437 elif not destroy_ego and self.player in self.actors:
1438 logger.warning("destroy_ego=False, but player is in actors list. "
1439 "Destroying the actor from within WorldModel.destroy.")
1440
1441 #logger.info("to destroy %s", list(map(str, self.actors)))
1442 # Batch destroy in one simulation step
1443 real_actors: Sequence[carla.Actor] = [actor for actor in self.actors
1444 if isinstance(actor, carla.Actor)]
1445 GameFramework.destroy_actors(real_actors)
1446
1447 # e.g. CustomSensorInterface
1448 other_actors = [actor for actor in self.actors
1449 if not isinstance(actor, carla.Actor)]
1450 while other_actors:
1451 actor = other_actors.pop(0)
1452 if actor is not None:
1453 print("destroying actor: " + str(actor), end=" destroyed=")
1454 try:
1455 actor.stop()
1456 except AttributeError as e:
1457 logger.debug("Error with actor {}: {}", actor, e)
1458 try:
1459 x = actor.destroy() # Non carla instances
1460 print(x)
1461 except RuntimeError:
1462 logger.warning("Could not destroy actor: " + str(actor))
1463 self.actors.clear()
1464 if self._args.handle_ticks and self.get_world():
1465 self.get_world().tick()
1466
1467 if self._has_recorded:
1468 runtime_dir = GameFramework.get_hydra_config().runtime.output_dir
1469 for dir_name_formatted in self._recording_dirs:
1470 dirname = os.path.split(dir_name_formatted)[1]
1471 subprocess.run(["ffmpeg", "-an", "-sn", "-i", f"{dir_name_formatted}/%08d.bmp", "-framerate", "1", "-vcodec", "mpeg4", "-r", "60", f"{Path(runtime_dir) / dirname}.avi"]) # noqa: E501
1472 # os.system(f'ffmpeg -an -sn -i "{dir_name_formatted}/%08d.bmp" -framerate 1 -vcodec mpeg4 -r 60 "{os.path.join(runtime_dir, dirname)}.avi"') # noqa: E501
1473 print(f"Recording saved in {Path(runtime_dir) / dirname}.avi")
1474