1"""
2Leaderboard_ 2.0 compatible version of the Lunatic Agent
3
4Attention:
5 Command line overrides are currently not supported for this agent,
6 therefore this module allows to define some global constants to
7 that can adjust settings the settings if they are not set to None (default).
8
9 These global settings are only used if :py:meth:`LunaticChallenger.setup`
10 is called with a string pointing to a configuration file. Passing a
11 :py:class:`.LaunchConfig` directly will skip the Hydra_ setup and the global
12 values will not be used.
13"""
14from ast import literal_eval
15import contextlib
16import operator
17import os
18from pathlib import Path
19from typing import TYPE_CHECKING, Any, Dict, Sequence, Tuple, Union, cast
20
21import carla
22import pygame
23from hydra import compose, initialize_config_dir
24from hydra.core.utils import configure_log
25from hydra.core.hydra_config import HydraConfig
26from omegaconf import OmegaConf
27from typing_extensions import TypedDict
28
29from agents.tools.debug_drawing import draw_route
30from classes.exceptions import UserInterruption
31from launch_tools import CarlaDataProvider, GameTime, singledispatchmethod
32
33try:
34 from leaderboard.autoagents.autonomous_agent import AutonomousAgent, Track # pyright: ignore[reportMissingImports]
35 from leaderboard.utils.route_manipulation import downsample_route # pyright: ignore[reportMissingImports]
36except ModuleNotFoundError:
37 # Leaderboard is not a submodule, cannot use it on readthedocs
38 if "READTHEDOCS" in os.environ and not TYPE_CHECKING:
39 class AutonomousAgent: pass # noqa
40 else: raise # noqa: E701
41
42from agents.lunatic_agent import LunaticAgent
43from agents.tools.config_creation import LaunchConfig, LunaticAgentSettings
44from agents.tools.logs import logger
45from classes.constants import READTHEDOCS, Phase
46from classes.keyboard_controls import RSSKeyboardControl
47from classes.worldmodel import AD_RSS_AVAILABLE, GameFramework, WorldModel
48
49if TYPE_CHECKING:
50 #from scenario_runner.srunner.autoagents.sensor_interface import SensorInterface
51 from leaderboard.envs.sensor_interface import SensorInterface
52
53 from agents.navigation.local_planner import LocalPlanner
54 from classes.constants import RoadOption
55
56import logging
57
58
59def get_entry_point():
60 """Must return the name of the class to be used"""
61 return "LunaticChallenger"
62
63
64# --- DEBUG OVERWRITES ---
65# as the leaderboard agent cannot accept command line overrides these can be set here.
66DEBUG = False
67
68WORLD_MODEL_DESTROY_SENSORS = True
69
70ENABLE_RSS = AD_RSS_AVAILABLE and literal_eval(os.environ.get("ENABLE_RSS", "None"))
71"""If not :code:`None`, overwrites `.LunaticAgentSettings.rss.enabled`:py:attr:"""
72
73ENABLE_DETECTION_MATRIX = literal_eval(os.environ.get("ENABLE_DETECTION_MATRIX", "None"))
74"""If not :code:`None`, overwrites `.LunaticAgentSettings.detection_matrix.enabled`:py:attr:"""
75
76DATA_MATRIX_ASYNC = literal_eval(os.environ.get("DATA_MATRIX_ASYNC", "None"))
77"""Run the DetectionMatrix update in a separate thread; overwrites `.LunaticAgentSettings.detection_matrix.sync`:py:attr:."""
78
79DATA_MATRIX_INTERVAL = literal_eval(os.environ.get("DATA_MATRIX_INTERVAL", "None"))
80"""
81When running synchronously how many ticks should be between two updates. Overwrites `.LunaticAgentSettings.detection_matrix.sync_interval`:py:attr:
82"""
83
84CAMERA_SPECTATOR = literal_eval(os.environ.get("CAMERA_SPECTATOR", "None"))
85"""Whether to use the camera spectator; overwrites `camera.spectator<.CameraConfig.spectator>`:py:attr:"""
86
87USE_OPEN_DRIVE_DATA = False
88
89DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES = 5
90"""
91The smaller the the value the more exact will the agent stick to the original route,
92BUT ONLY IF the route is provided as a fine-grained route.
93
94NOTE: We should NOT rely on the route to be available in a fine grained manner -> Should work with larger values
95
96Larger values will make the agent cut corners and drive more straight lines.
97Needs extra tools to stick to the road.
98"""
99
100args: LaunchConfig
101"""Global access to the launch config; set in :py:meth:`LunaticChallenger.setup`"""
102
103
[docs]
104class LunaticChallenger(AutonomousAgent, LunaticAgent):
105 """
106 Variant of the :py:class:`.LunaticAgent` that is compatible with the
107 `Leaderboard 2.0 <https://leaderboard.carla.org/>`_ interface.
108
109 Attention:
110 If the :py:class:`LunaticChallenger` is used without the Leaderboard 2.0 framework
111 the :py:meth:`__call__` method should be used instead of :py:meth:`run_step`
112 to acquire the next control.
113 """
114
115 sensor_interface: "SensorInterface" #: :meta private:
116
117 _global_plan: "list[tuple[_GPSDataDict, RoadOption]]" = None # type: ignore[assignment]
118 _global_plan_world_coord: "list[tuple[carla.Transform, RoadOption]]" = None # type: ignore[assignment]
119 _global_plan_waypoints: "list[tuple[carla.Waypoint, RoadOption]]" = None # type: ignore[assignment]
120
121 def __init__(self, carla_host, carla_port, debug=False):
122 """
123 Initializes the LunaticChallenger. This does not yet load the config or calls :py:meth:`LunaticAgent.__init__ <agents.lunatic_agent.LunaticAgent.__init__>`.
124
125 Further initialization is done in :py:meth:`LunaticChallenger.setup`.
126 """
127 print("Initializing LunaticChallenger")
128 # Correct types are set at setup
129 self.world_model: WorldModel = None # type: ignore[assignment]
130 self.game_framework: GameFramework = None # type: ignore[assignment]
131 self._destroyed = False
132 # AutonomousAgent init
133 super().__init__(carla_host, carla_port, debug)
134 self.track = Track.MAP
135 self._opendrive_data = None
136 self._local_planner: "LocalPlanner" = None # type: ignore[assignment]
137
[docs]
138 def setup(self, path_to_conf_file: Union[str, LaunchConfig]):
139 """
140 Initializes the underlying :py:class:`.LunaticAgent` as well as instances of :py:class:`.GameFramework` and :py:class:`.WorldModel`.
141
142 To some extends initializes the Hydra_ framework and load the configuration.
143
144 Parameters:
145 path_to_conf_file:
146 Can either be a string pointing to a configuration file to load a
147 :py:class:`.LaunchConfig` or a :py:class:`.LaunchConfig` to be used directly.
148
149 Note:
150 If a :py:class:`.LaunchConfig` is passed directly the Hydra_ setup will be skipped.
151 """
152 self._destroyed = False
153 self.track = Track.MAP
154 if isinstance(path_to_conf_file, str):
155 print("Setup with conf file", path_to_conf_file)
156 logger.info("Setup with conf file %s", path_to_conf_file)
157 config_dir, config_name = os.path.split(path_to_conf_file)
158 # TODO: Maybe move to init so its available during set_global_plan
159 global args # Unused
160 overrides = ["agent=leaderboard"]
161 if not GameFramework.hydra_initialized():
162 initialize_config_dir(version_base=None,
163 config_dir=os.path.abspath(config_dir),
164 job_name="LeaderboardAgent")
165 if ENABLE_DETECTION_MATRIX is not None:
166 overrides.append("agent.detection_matrix.enabled=" + str(ENABLE_DETECTION_MATRIX).lower())
167 overrides.append("agent.detection_matrix.sync=" + str(DATA_MATRIX_ASYNC).lower())
168 if ENABLE_RSS is not None:
169 overrides.append("agent.rss.enabled=" + str(ENABLE_RSS).lower())
170 if CAMERA_SPECTATOR is not None:
171 overrides.append("camera.spectator=" + str(CAMERA_SPECTATOR).lower())
172 args = cast(LaunchConfig,
173 compose(config_name=config_name,
174 return_hydra_config=True,
175 overrides=overrides) # uses conf/agent/leaderboard
176 )
177 args.debug = DEBUG
178 # Let scenario manager decide
179 if args.map:
180 logger.warning(f"Map should be set by scenario manager and be None in the config file found map is {args.map}.")
181 if args.handle_ticks:
182 logger.warning("When using the leaderboard agent, `handle_ticks` should be False.")
183 args.handle_ticks = False
184 if args.sync is not None:
185 logger.warning("When using the leaderboard agent, `sync` should be None.")
186 args.sync = None
187
188 # Setup Hydra
189 if OmegaConf.is_missing(args.hydra.runtime, "output_dir"):
190 args.hydra.runtime.output_dir = args.hydra.run.dir
191 HydraConfig.instance().set_config(args) # type: ignore[arg-type]
192 Path(args.hydra.runtime.output_dir).mkdir(parents=True, exist_ok=True)
193 # Assure that our logger works
194 configure_log(args.hydra.job_logging, logger.name) # type: ignore[arg-type]
195
196 if DATA_MATRIX_ASYNC is not None:
197 args.agent.detection_matrix.sync = not DATA_MATRIX_ASYNC
198 if DATA_MATRIX_INTERVAL is not None:
199 args.agent.detection_matrix.sync_interval = DATA_MATRIX_INTERVAL
200 logger.info(OmegaConf.to_yaml(args))
201 else:
202 args = cast(LaunchConfig,
203 compose(config_name=config_name,
204 return_hydra_config=True,
205 overrides=overrides) # uses conf/agent/leaderboard
206 )
207 logger.setLevel(logging.DEBUG)
208 self.args = args
209
210 config = LunaticAgentSettings.create(self.args.agent, assure_copy=True, as_dictconfig=True)
211 else:
212 self.args = path_to_conf_file
213 config = self.args.agent
214 if OmegaConf.is_missing(config.planner, "dt"):
215 config.planner.dt = 1 / 20 # TODO: maybe get from somewhere else
216
217 self.game_framework = GameFramework(self.args, config)
218 print("Game framework setup")
219 # TODO: How to make args optional
220 self.world_model = WorldModel(config, args=self.args)
221 self.game_framework.world_model = self.world_model
222 print("World Model setup")
223 self.controller = self.game_framework.make_controller(self.world_model, RSSKeyboardControl, start_in_autopilot=False) # Note: stores weakref to controller
224 print("Initializing agent")
225 LunaticAgent.__init__(self=self, settings=config, world_model=self.world_model)
226 # super(AutonomousAgent, self).__init__(self, config, self.world_model)
227 print("LunaticAgent initialized")
228
229 from agents.rules.lane_changes.random_changes import RandomLaneChangeRule # noqa: PLC0415
230 for rules in self.rules.values():
231 for rule in rules:
232 if isinstance(rule, RandomLaneChangeRule):
233 rule.enabled = False
234
235 # Set plan
236 if self._global_plan_waypoints:
237 self._local_planner_set_plan(self._global_plan_waypoints)
238
239 self.game_framework.agent = self # TODO: Remove this circular reference
240 self.agent_engaged = False
241 # Print controller docs
242 with contextlib.suppress(AttributeError):
243 print(self.controller.get_docstring())
244
[docs]
245 def sensors(self) -> "list[dict]":
246 """
247 Define the sensor suite required by the agent
248
249 Returns:
250 A list containing the required sensors in the following format
251
252 .. code-block:: python
253
254 [
255 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': -0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
256 'width': 300, 'height': 200, 'fov': 100, 'id': 'Left'},
257
258 {'type': 'sensor.camera.rgb', 'x': 0.7, 'y': 0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
259 'width': 300, 'height': 200, 'fov': 100, 'id': 'Right'},
260
261 {'type': 'sensor.lidar.ray_cast', 'x': 0.7, 'y': 0.0, 'z': 1.60, 'yaw': 0.0, 'pitch': 0.0, 'roll': 0.0,
262 'id': 'LIDAR'}
263 ]
264
265 Note:
266 The LunaticChallenger does not use any sensors; the usage of 'sensor.opendrive_map' is experimental, however
267 there is yet no parsing done for the data.
268 """
269 sensors: list = super().sensors() # This should be empty
270
271 # temp; remove
272 try:
273 i = [x['type'] for x in args.leaderboard.sensors].index('sensor.opendrive_map')
274 except (ValueError, AttributeError):
275 pass
276 else:
277 args.leaderboard.sensors[i].use = USE_OPEN_DRIVE_DATA
278
279 # add sensors if they have the use flag in the config
280 with contextlib.suppress(Exception):
281 sensors.extend(filter(operator.itemgetter('use'), args.leaderboard.sensors))
282 logger.info("Using sensors: %s", sensors)
283 return sensors
284
285 @staticmethod
286 def _print_input_data(input_data: Dict[str, Tuple[int, Any]]):
287 """
288 Debug method to view the input data
289 """
290 if not input_data:
291 return None
292 print("=====================>")
293 for key, val in input_data.items():
294 if hasattr(val[1], 'shape'):
295 shape = val[1].shape
296 print(f"[{key} -- {val[0]:06d}] with shape {shape}")
297 else:
298 print(f"[{key} -- {val[0]:06d}] ")
299 print("<=====================")
300 return True
301
302 # This allows BlockingRules to pick up the coorect function
[docs]
303 @singledispatchmethod
304 def run_step(self, debug: bool = False, second_pass=False) -> carla.VehicleControl: # noqa: ARG002
305 """
306 Attention:
307 Use :py:meth:`__call__` instead of this method!
308 """
309 # TODO: Possibly singledispatch to __call__ instead
310 return super(AutonomousAgent, self).run_step(debug=self.args.debug, second_pass=second_pass)
311
312 @run_step.register(dict)
313 def _(self, input_data: Dict[str, Tuple[int, Any]], timestamp: float = -1) -> carla.VehicleControl: # noqa: ARG002
314 """Function that is called by leaderboard framework"""
315 try:
316 if self._print_input_data(input_data) and "OpenDRIVE" in input_data:
317 frame, data_value = input_data["OpenDRIVE"]
318 data: str = data_value["opendrive"]
319 if self._opendrive_data != data:
320 if self._opendrive_data is not None:
321 #breakpoint()
322 pass
323 self._opendrive_data = data
324 print(frame, data[:5000])
325 Path("opendrive.xml").write_text(data)
326 else:
327 print("OpenDRIVE data unchanged")
328 self.agent_engaged = True # remove this, if not used
329
330 with self.game_framework:
331 control = self.run_step(self.args.debug) # Call Lunatic Agent run_step
332 # Handle render updates
333
334 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN, prior_results=control)
335 if self.controller.parse_events(self.get_control()):
336 print("Exiting by user input.")
337 raise UserInterruption("Exiting by user input.") # noqa: TRY301
338 self.execute_phase(Phase.APPLY_MANUAL_CONTROLS | Phase.END, prior_results=None)
339
340 self.execute_phase(Phase.EXECUTION | Phase.BEGIN, prior_results=control)
341 final_controls: carla.VehicleControl = self.get_control() # type: ignore[assignment]
342
343 # TODO: Update HUD controls info
344 except Exception as e:
345 if not isinstance(e, UserInterruption):
346 logger.exception("Error in LunaticChallenger.run_step:")
347 self.destroy()
348 raise
349 else:
350 return final_controls
351
352 # NOTE: to update the doc use
353 #run_step.func.__doc__ += "\n" + LunaticAgent.run_step.__doc__
354
355 @staticmethod
356 def _transform_to_waypoint(transform: "carla.Transform", project_to_road=True, lane_type=carla.LaneType.Driving) -> "carla.Waypoint":
357 # maybe move to misc / tools
358 return CarlaDataProvider.get_map().get_waypoint(transform.location, project_to_road=project_to_road, lane_type=lane_type) # pyright: ignore[reportReturnType]
359
360 def _local_planner_set_plan(self, plan):
361 super(AutonomousAgent, self).set_global_plan(plan, stop_waypoint_creation=True, clean_queue=True)
362 if self.game_framework._args.debug:
363 draw_route(CarlaDataProvider.get_world(), plan, vertical_shift=0.5, size=0.15, downsample=1, life_time=1000.0)
364
[docs]
365 def set_global_plan(self, global_plan_gps: Sequence["tuple[_GPSDataDict, RoadOption]"], global_plan_world_coord: Sequence["tuple[carla.Transform, RoadOption]"]):
366 """
367 Set the plan (route) for the agent
368 """
369 # old:
370 # super().set_global_plan(global_plan_gps, global_plan_world_coord)
371 # new:
372 #print("==============Road updated============")
373 #print("Plan GPS", global_plan_gps[:10])
374 #print("Plan World Coord", global_plan_world_coord[:10])
375
376 ds_ids: "list[int]" = downsample_route(global_plan_world_coord, DOWNSAMPLING_FACTOR_OF_ROUTE_COORDINATES) # Downsample to less distance. TODO: should increase this
377 #print("Downsampled ids", ds_ids)
378
379 # Reduce the global plan to the downsampled ids
380 self._global_plan_world_coord = [(global_plan_world_coord[x][0], global_plan_world_coord[x][1]) for x in ds_ids]
381 assert self._global_plan_world_coord == [global_plan_world_coord[x] for x in ds_ids]
382 self._global_plan = [global_plan_gps[x] for x in ds_ids]
383 self._global_plan_waypoints = [(self._transform_to_waypoint(transform), road_option) for transform, road_option in self._global_plan_world_coord]
384 if self._local_planner is not None:
385 # TODO: maybe waypoints is not necessary as we extract locations
386 self._local_planner_set_plan(self._global_plan_waypoints)
387
[docs]
388 def __call__(self) -> carla.VehicleControl:
389 """
390 Executes the next step and returns the control for the vehicle.
391
392 Attention:
393 Use this function instead of :py:meth:`run_step`!
394 """
395 input_data = self.sensor_interface.get_data(GameTime.get_frame())
396
397 timestamp = GameTime.get_time()
398
399 if args.leaderboard.print_time_info:
400 if not self.wallclock_t0:
401 self.wallclock_t0 = GameTime.get_wallclocktime()
402 wallclock = GameTime.get_wallclocktime()
403 wallclock_diff = (wallclock - self.wallclock_t0).total_seconds()
404 sim_ratio = 0 if wallclock_diff == 0 else timestamp / wallclock_diff
405
406 print('=== [Agent] -- Wallclock = {} -- System time = {} -- Game time = {} -- Ratio = {}x'.format(
407 str(wallclock)[:-3], format(wallclock_diff, '.3f'), format(timestamp, '.3f'), format(sim_ratio, '.3f')))
408
409 control = self.run_step(input_data, timestamp)
410 control.manual_gear_shift = False
411
412 return control
413
[docs]
414 def destroy(self):
415 self._destroyed = True
416 print("Destroying Lunatic Challenger")
417 if self._detection_matrix is not None:
418 self._detection_matrix.stop()
419 self._detection_matrix = None
420 super().destroy()
421 if self.world_model:
422 if not WORLD_MODEL_DESTROY_SENSORS:
423 self.world_model.actors.clear()
424 self.world_model.destroy()
425 self.world_model = None # type: ignore[assignment]
426 if self.game_framework:
427 self.game_framework.agent = None
428 self.game_framework = None # type: ignore[assignment]
429 pygame.quit()
430 print("Destroyed", self)
431
432 def __del__(self):
433 if not self._destroyed:
434 self.destroy()
435
436
437# --- Type hints ---
[docs]
438class _GPSDataDict(TypedDict if not READTHEDOCS or TYPE_CHECKING else object):
439 """
440 Bases: :py:class:`typing.TypedDict`
441
442 Type hint for the global plan in GPS coordinates,
443 used in :py:meth:`.LunaticChallenger.set_global_plan`.
444
445 :meta public:
446 """
447 lat: float
448 lon: float
449 z: float