1"""
2Leaderboard_ 2.0 compatible version of the Lunatic Agent
3
4Attention:
5 Command line overrides are currently not supported for this agent,
6 therefore this module allows to define some global constants to
7 that can adjust settings the settings if they are not set to None (default).
8
9 These global settings are only used if :py:meth:`LunaticChallenger.setup`
10 is called with a string pointing to a configuration file. Passing a
11 :py:class:`.LaunchConfig` directly will skip the Hydra_ setup and the global
12 values will not be used.
13"""
14
15from ast import literal_eval
16import contextlib
17import operator
18import os
19from pathlib import Path
20from typing import TYPE_CHECKING, Any, Dict, Sequence, Tuple, Union, cast
21
22import carla
23import pygame
24from hydra import compose, initialize_config_dir
25from hydra.core.utils import configure_log
26from hydra.core.hydra_config import HydraConfig
27from omegaconf import OmegaConf
28from typing_extensions import TypedDict
29
30from agents.tools.debug_drawing import draw_route
31from classes import MockDummy
32from classes.exceptions import UserInterruption
33from launch_tools import CarlaDataProvider, GameTime, singledispatchmethod
34
35try:
36 from leaderboard.autoagents.autonomous_agent import AutonomousAgent, Track # pyright: ignore[reportMissingImports]
37 from leaderboard.utils.route_manipulation import downsample_route # pyright: ignore[reportMissingImports]
38except ModuleNotFoundError:
39 # Leaderboard is not a submodule, cannot use it on readthedocs
40 if "READTHEDOCS" in os.environ and not TYPE_CHECKING:
41
42 class AutonomousAgent:
43 pass # noqa
44
45 else:
46 raise # noqa: E701
47
48from agents.lunatic_agent import LunaticAgent
49from agents.tools.config_creation import LaunchConfig, LunaticAgentSettings
50from agents.tools.logs import logger
51from classes.constants import READTHEDOCS, Phase
52from classes.ui.keyboard_controls import RSSKeyboardControl
53from classes.worldmodel import AD_RSS_AVAILABLE, GameFramework, WorldModel
54
55if TYPE_CHECKING:
56 # from scenario_runner.srunner.autoagents.sensor_interface import SensorInterface
57 from leaderboard.envs.sensor_interface import SensorInterface
58
59 from agents.navigation.local_planner import LocalPlanner
60 from classes.constants import RoadOption
61
62import logging
63
64
65def get_entry_point():
66 """Must return the name of the class to be used"""
67 return "LunaticChallenger"
68
69
70# --- DEBUG OVERWRITES ---
71# as the leaderboard agent cannot accept command line overrides these can be set here.
72DEBUG = False
73
74WORLD_MODEL_DESTROY_SENSORS = True
75
76ENABLE_RSS = AD_RSS_AVAILABLE and literal_eval(os.environ.get("ENABLE_RSS", "None"))
77"""If not :code:`None`, overwrites `.LunaticAgentSettings.rss.enabled`:py:attr:"""
78
79ENABLE_DETECTION_MATRIX = literal_eval(os.environ.get("ENABLE_DETECTION_MATRIX", "None"))
80"""If not :code:`None`, overwrites `.LunaticAgentSettings.detection_matrix.enabled`:py:attr:"""
81
82DATA_MATRIX_ASYNC = literal_eval(os.environ.get("DATA_MATRIX_ASYNC", "None"))
83"""Run the DetectionMatrix update in a separate thread; overwrites `.LunaticAgentSettings.detection_matrix.sync`:py:attr:."""
84
85DATA_MATRIX_INTERVAL = literal_eval(os.environ.get("DATA_MATRIX_INTERVAL", "None"))
86"""
87When running synchronously how many ticks should be between two updates. Overwrites `.LunaticAgentSettings.detection_matrix.sync_interval`:py:attr:
88"""
89
90CAMERA_SPECTATOR = literal_eval(os.environ.get("CAMERA_SPECTATOR", "None"))
91"""Whether to use the camera spectator; overwrites `camera.spectator<.CameraConfig.spectator>`:py:attr:"""
92
93USE_OPEN_DRIVE_DATA = False
94
95DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES = 5
96"""
97The smaller the the value the more exact will the agent stick to the original route,
98BUT ONLY IF the route is provided as a fine-grained route.
99
100NOTE: We should NOT rely on the route to be available in a fine grained manner -> Should work with larger values
101
102Larger values will make the agent cut corners and drive more straight lines.
103Needs extra tools to stick to the road.
104"""
105
106args: LaunchConfig
107"""Global access to the launch config; set in :py:meth:`LunaticChallenger.setup`"""
108
109
[docs]
110class LunaticChallenger(AutonomousAgent, LunaticAgent):
111 """
112 Variant of the :py:class:`.LunaticAgent` that is compatible with the
113 `Leaderboard 2.0 <https://leaderboard.carla.org/>`_ interface.
114
115 Attention:
116 If the :py:class:`LunaticChallenger` is used without the Leaderboard 2.0 framework
117 the :py:meth:`__call__` method should be used instead of :py:meth:`run_step`
118 to acquire the next control.
119 """
120
121 sensor_interface: "SensorInterface" #: :meta private:
122
123 _global_plan: "list[tuple[_GPSDataDict, RoadOption]]" = None # type: ignore[assignment]
124 _global_plan_world_coord: "list[tuple[carla.Transform, RoadOption]]" = None # type: ignore[assignment]
125 _global_plan_waypoints: "list[tuple[carla.Waypoint, RoadOption]]" = None # type: ignore[assignment]
126
127 def __init__(self, carla_host, carla_port, debug=False):
128 """
129 Initializes the LunaticChallenger. This does not yet load the config or calls :py:meth:`LunaticAgent.__init__ <agents.lunatic_agent.LunaticAgent.__init__>`.
130
131 Further initialization is done in :py:meth:`LunaticChallenger.setup`.
132 """
133 print("Initializing LunaticChallenger")
134 # Correct types are set at setup
135 self.world_model: WorldModel = None # type: ignore[assignment]
136 self.game_framework: GameFramework = None # type: ignore[assignment]
137 self._destroyed = False
138 # AutonomousAgent init
139 super().__init__(carla_host, carla_port, debug)
140 self.track = Track.MAP
141 self._opendrive_data = None
142 self._local_planner: "LocalPlanner" = None # type: ignore[assignment]
143
[docs]
144 def setup(self, path_to_conf_file: Union[str, LaunchConfig]):
145 """
146 Initializes the underlying :py:class:`.LunaticAgent` as well as instances of :py:class:`.GameFramework` and :py:class:`.WorldModel`.
147
148 To some extends initializes the Hydra_ framework and load the configuration.
149
150 Parameters:
151 path_to_conf_file:
152 Can either be a string pointing to a configuration file to load a
153 :py:class:`.LaunchConfig` or a :py:class:`.LaunchConfig` to be used directly.
154
155 Note:
156 If a :py:class:`.LaunchConfig` is passed directly the Hydra_ setup will be skipped.
157 """
158 self._destroyed = False
159 self.track = Track.MAP
160 if isinstance(path_to_conf_file, str):
161 print("Setup with conf file", path_to_conf_file)
162 logger.info("Setup with conf file %s", path_to_conf_file)
163 config_dir, config_name = os.path.split(path_to_conf_file)
164 # TODO: Maybe move to init so its available during set_global_plan
165 global args # Unused
166 overrides = ["agent=leaderboard"]
167 if not GameFramework.hydra_initialized():
168 initialize_config_dir(
169 version_base=None, config_dir=os.path.abspath(config_dir), job_name="LeaderboardAgent"
170 )
171 if ENABLE_DETECTION_MATRIX is not None:
172 overrides.append("agent.detection_matrix.enabled=" + str(ENABLE_DETECTION_MATRIX).lower())
173 overrides.append("agent.detection_matrix.sync=" + str(DATA_MATRIX_ASYNC).lower())
174 if ENABLE_RSS is not None:
175 overrides.append("agent.rss.enabled=" + str(ENABLE_RSS).lower())
176 if CAMERA_SPECTATOR is not None:
177 overrides.append("camera.spectator=" + str(CAMERA_SPECTATOR).lower())
178 args = cast(
179 "LaunchConfig",
180 compose(
181 config_name=config_name, return_hydra_config=True, overrides=overrides
182 ), # uses conf/agent/leaderboard
183 )
184 args.debug = DEBUG
185 # Let scenario manager decide
186 if args.map:
187 logger.warning(
188 f"Map should be set by scenario manager and be None in the config file found map is {args.map}."
189 )
190 if args.handle_ticks:
191 logger.warning("When using the leaderboard agent, `handle_ticks` should be False.")
192 args.handle_ticks = False
193 if args.sync is not None:
194 logger.warning("When using the leaderboard agent, `sync` should be None.")
195 args.sync = None
196
197 # Setup Hydra
198 if OmegaConf.is_missing(args.hydra.runtime, "output_dir"):
199 args.hydra.runtime.output_dir = args.hydra.run.dir
200 HydraConfig.instance().set_config(args) # type: ignore[arg-type]
201 Path(args.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True)
202 # Assure that our logger works
203 configure_log(args.hydra.job_logging, logger.name) # type: ignore[arg-type]
204
205 if DATA_MATRIX_ASYNC is not None:
206 args.agent.detection_matrix.sync = not DATA_MATRIX_ASYNC
207 if DATA_MATRIX_INTERVAL is not None:
208 args.agent.detection_matrix.sync_interval = DATA_MATRIX_INTERVAL
209 logger.info(OmegaConf.to_yaml(args))
210 else:
211 args = cast(
212 "LaunchConfig",
213 compose(
214 config_name=config_name, return_hydra_config=True, overrides=overrides
215 ), # uses conf/agent/leaderboard
216 )
217 logger.setLevel(logging.DEBUG)
218 self.args = args
219
220 config = LunaticAgentSettings.create(self.args.agent, assure_copy=True, as_dictconfig=True)
221 else:
222 self.args = path_to_conf_file
223 config = self.args.agent
224 if OmegaConf.is_missing(config.planner, "dt"):
225 config.planner.dt = 1 / 20 # TODO: maybe get from somewhere else
226
227 self.game_framework = GameFramework(self.args, config)
228 print("Game framework setup")
229 # TODO: How to make args optional
230 self.world_model = WorldModel(config, args=self.args)
231 self.game_framework.world_model = self.world_model
232 print("World Model setup")
233 if self.args.pygame:
234 self.controller = self.game_framework.make_controller(
235 self.world_model, RSSKeyboardControl, start_in_autopilot=False
236 ) # Note: stores weakref to controller
237 else:
238 self.controller = MockDummy.create_dummy(RSSKeyboardControl)
239 print("Initializing agent")
240 LunaticAgent.__init__(self=self, settings=config, world_model=self.world_model)
241 # super(AutonomousAgent, self).__init__(self, config, self.world_model)
242 print("LunaticAgent initialized")
243
244 from agents.rules.lane_changes.random_changes import RandomLaneChangeRule # noqa: PLC0415
245
246 for rules in self.rules.values():
247 for rule in rules:
248 if isinstance(rule, RandomLaneChangeRule):
249 rule.enabled = False
250
251 # Set plan
252 if self._global_plan_waypoints:
253 self._local_planner_set_plan(self._global_plan_waypoints)
254
255 self.game_framework.agent = self # TODO: Remove this circular reference
256 self.agent_engaged = False
257 # Print controller docs
258 with contextlib.suppress(AttributeError):
259 if not isinstance(self.controller, MockDummy):
260 print(self.controller.get_docstring())
261
[docs]
262 def sensors(self) -> "list[dict]":
263 """
264 Define the sensor suite required by the agent
265
266 Returns:
267 A list containing the required sensors in the following format
268
269 .. code-block:: python
270
271 [
272 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': -0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
273 'width': 300, 'height': 200, 'fov': 100, 'id': 'Left'},
274
275 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': 0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
276 'width': 300, 'height': 200, 'fov': 100, 'id': 'Right'},
277
278 {'type': 'sensor.lidar.ray_cast', 'x': 0.7, 'y': 0.0, 'z': 1.60, 'yaw': 0.0, 'pitch': 0.0, 'roll': 0.0,
279 'id': 'LIDAR'}
280 ]
281
282 Note:
283 The LunaticChallenger does not use any sensors; the usage of 'sensor.opendrive_map' is experimental, however
284 there is yet no parsing done for the data.
285 """
286 sensors: list = super().sensors() # This should be empty
287
288 # temp; remove
289 try:
290 i = [x["type"] for x in args.leaderboard.sensors].index("sensor.opendrive_map")
291 except (ValueError, AttributeError):
292 pass
293 else:
294 args.leaderboard.sensors[i].use = USE_OPEN_DRIVE_DATA
295
296 # add sensors if they have the use flag in the config
297 with contextlib.suppress(Exception):
298 sensors.extend(filter(operator.itemgetter("use"), args.leaderboard.sensors))
299 logger.info("Using sensors: %s", sensors)
300 return sensors
301
302 @staticmethod
303 def _print_input_data(input_data: Dict[str, Tuple[int, Any]]):
304 """
305 Debug method to view the input data
306 """
307 if not input_data:
308 return None
309 print("=====================>")
310 for key, val in input_data.items():
311 if hasattr(val[1], "shape"):
312 shape = val[1].shape
313 print(f"[{key} -- {val[0]:06d}] with shape {shape}")
314 else:
315 print(f"[{key} -- {val[0]:06d}] ")
316 print("<=====================")
317 return True
318
319 # This allows BlockingRules to pick up the coorect function
[docs]
320 @singledispatchmethod
321 def run_step(self, debug: bool = False, second_pass=False) -> carla.VehicleControl: # noqa: ARG002
322 """
323 Attention:
324 Use :py:meth:`__call__` instead of this method!
325 """
326 # TODO: Possibly singledispatch to __call__ instead
327 return super(AutonomousAgent, self).run_step(debug=self.args.debug, second_pass=second_pass)
328
329 @run_step.register(dict)
330 def _(self, input_data: Dict[str, Tuple[int, Any]], timestamp: float = -1) -> carla.VehicleControl: # noqa: ARG002
331 """Function that is called by leaderboard framework"""
332 try:
333 if self._print_input_data(input_data) and "OpenDRIVE" in input_data:
334 frame, data_value = input_data["OpenDRIVE"]
335 data: str = data_value["opendrive"]
336 if self._opendrive_data != data:
337 if self._opendrive_data is not None:
338 # breakpoint()
339 pass
340 self._opendrive_data = data
341 print(frame, data[:5000])
342 Path("opendrive.xml").write_text(data)
343 else:
344 print("OpenDRIVE data unchanged")
345 self.agent_engaged = True # remove this, if not used
346
347 with self.game_framework:
348 control = self.run_step(self.args.debug) # Call Lunatic Agent run_step
349 # Handle render updates
350
351 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN, prior_results=control)
352 if self.controller.parse_events(self.get_control()):
353 print("Exiting by user input.")
354 raise UserInterruption("Exiting by user input.") # noqa: TRY301
355 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.END, prior_results=None)
356
357 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control)
358 final_controls: carla.VehicleControl = self.get_control() # type: ignore[assignment]
359
360 # TODO: Update HUD controls info
361 except Exception as e:
362 if not isinstance(e, UserInterruption):
363 logger.exception("Error in LunaticChallenger.run_step:")
364 self.destroy()
365 raise
366 else:
367 return final_controls
368
369 # NOTE: to update the doc use
370 # run_step.func.__doc__ += "\n" + LunaticAgent.run_step.__doc__
371
372 @staticmethod
373 def _transform_to_waypoint(
374 transform: "carla.Transform", project_to_road=True, lane_type=carla.LaneType.Driving
375 ) -> "carla.Waypoint":
376 # maybe move to misc / tools
377 return CarlaDataProvider.get_map().get_waypoint(
378 transform.location, project_to_road=project_to_road, lane_type=lane_type
379 ) # pyright: ignore[reportReturnType]
380
381 def _local_planner_set_plan(self, plan):
382 super(AutonomousAgent, self).set_global_plan(plan, stop_waypoint_creation=True, clean_queue=True)
383 if self.game_framework.launch_config.debug:
384 draw_route(
385 CarlaDataProvider.get_world(), plan, vertical_shift=0.5, size=0.15, downsample=1, life_time=1000.0
386 )
387
[docs]
388 def set_global_plan(
389 self,
390 global_plan_gps: Sequence["tuple[_GPSDataDict, RoadOption]"],
391 global_plan_world_coord: Sequence["tuple[carla.Transform, RoadOption]"],
392 ):
393 """
394 Set the plan (route) for the agent
395 """
396 # old:
397 # super().set_global_plan(global_plan_gps, global_plan_world_coord)
398 # new:
399 # print("==============Road updated============")
400 # print("Plan GPS", global_plan_gps[:10])
401 # print("Plan World Coord", global_plan_world_coord[:10])
402
403 ds_ids: "list[int]" = downsample_route(
404 global_plan_world_coord, DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES
405 ) # Downsample to less distance. TODO: should increase this
406 # print("Downsampled ids", ds_ids)
407
408 # Reduce the global plan to the downsampled ids
409 self._global_plan_world_coord = [(global_plan_world_coord[x][0], global_plan_world_coord[x][1]) for x in ds_ids]
410 assert self._global_plan_world_coord == [global_plan_world_coord[x] for x in ds_ids]
411 self._global_plan = [global_plan_gps[x] for x in ds_ids]
412 self._global_plan_waypoints = [
413 (self._transform_to_waypoint(transform), road_option)
414 for transform, road_option in self._global_plan_world_coord
415 ]
416 if self._local_planner is not None:
417 # TODO: maybe waypoints is not necessary as we extract locations
418 self._local_planner_set_plan(self._global_plan_waypoints)
419
[docs]
420 def __call__(self) -> carla.VehicleControl:
421 """
422 Executes the next step and returns the control for the vehicle.
423
424 Attention:
425 Use this function instead of :py:meth:`run_step`!
426 """
427 input_data = self.sensor_interface.get_data(GameTime.get_frame())
428
429 timestamp = GameTime.get_time()
430
431 if args.leaderboard.print_time_info:
432 if not self.wallclock_t0:
433 self.wallclock_t0 = GameTime.get_wallclocktime()
434 wallclock = GameTime.get_wallclocktime()
435 wallclock_diff = (wallclock - self.wallclock_t0).total_seconds()
436 sim_ratio = 0 if wallclock_diff == 0 else timestamp / wallclock_diff
437
438 print(
439 "=== [Agent] -- Wallclock = {} -- System time = {} -- Game time = {} -- Ratio = {}x".format(
440 str(wallclock)[:-3],
441 format(wallclock_diff, ".3f"),
442 format(timestamp, ".3f"),
443 format(sim_ratio, ".3f"),
444 )
445 )
446
447 control = self.run_step(input_data, timestamp)
448 control.manual_gear_shift = False
449
450 return control
451
[docs]
452 def destroy(self):
453 self._destroyed = True
454 print("Destroying Lunatic Challenger")
455 if self._detection_matrix is not None:
456 self._detection_matrix.stop()
457 self._detection_matrix = None
458 super().destroy()
459 if self.world_model:
460 if not WORLD_MODEL_DESTROY_SENSORS:
461 self.world_model.actors.clear()
462 self.world_model.destroy()
463 self.world_model = None # type: ignore[assignment]
464 if self.game_framework:
465 self.game_framework.agent = None
466 self.game_framework = None # type: ignore[assignment]
467 pygame.quit()
468 print("Destroyed", self)
469
470 def __del__(self):
471 if not self._destroyed:
472 self.destroy()
473
474
475# --- Type hints ---
[docs]
476class _GPSDataDict(TypedDict if not READTHEDOCS or TYPE_CHECKING else object):
477 """
478 Bases: :py:class:`typing.TypedDict`
479
480 Type hint for the global plan in GPS coordinates,
481 used in :py:meth:`.LunaticChallenger.set_global_plan`.
482
483 :meta public:
484 """
485
486 lat: float
487 lon: float
488 z: float