1#!/usr/bin/env python
2"""
3Example of a game loop for the :py:class:`.LunaticAgent` class.
4"""
5
6import random
7from pprint import pprint
8
9import hydra
10from omegaconf import OmegaConf
11
12# When you use an .egg file be sure to add it to your $PYTHONPATH
13try:
14 import carla
15except ImportError:
16 from launch_tools import carla
17import pygame
18
19import launch_tools
20from agents.lunatic_agent import LunaticAgent
21from agents.rules import create_default_rules
22from agents.tools.config_creation import AsDictConfig, LaunchConfig, LunaticAgentSettings
23from agents.tools.debug_drawing import debug_drawing
24from agents.tools.logs import logger
25from classes import exceptions
26from classes.constants import Phase
27from classes.ui.keyboard_controls import RSSKeyboardControl # Alternative: PassiveKeyboardControl
28from classes.worldmodel import GameFramework, WorldModel
29
30# ==============================================================================
31# Globals
32# ==============================================================================
33
34AMOUNT_ACTORS = 12
35"""How many other actors to spawn"""
36
37EGO_SPAWN_IDX = 3
38"""Changes the start position of the ego vehicle"""
39
40PRINT_RULES = False
41"""Print the rules to the console. This is messy."""
42
43# ==============================================================================
44# -- Game Loop ---------------------------------------------------------
45# ==============================================================================
46
47
[docs]
48def game_loop(args: LaunchConfig):
49 r"""
50 Main loop of the simulation.
51
52 It sets up the simulation, spawns the vehicles, and initializes the agent.
53 In the :python:`while` loop, the agent calculates one `\`carla.VehicleControl\`:py:class:`:external-icon-parse:
54 every iteration.
55
56 The `.GameFramework`:py:class: context manager takes care of the simulation tick,
57 and camera updates.
58 """
59 # Avoid name errors in final block
60 game_framework: GameFramework = None
61 world_model: WorldModel = None
62 agent: LunaticAgent = None
63 ego: carla.Vehicle = None
64
65 # -- Load Settings Agent --
66
67 print("--- Creating settings ---")
68 # Validates and creates the agent settings. Sidenote: agent_config a copy of args.agent,
69 # you can access args.agent as copy of the original settings
70 # To not validate the config against LunaticAgentSettings use OmegaConf.create(args.agent)
71 # to create a copy.
72
73 agent_config: AsDictConfig[LunaticAgentSettings] = LunaticAgentSettings.create(settings=args.agent)
74
75 try:
76 logger.info("Creating Game Framework ...")
77 game_framework = GameFramework(args)
78
79 # -- Spawn Vehicles --
80 spawn_points = launch_tools.csv_tools.csv_to_transformations("examples/highway_example_car_positions.csv")
81
82 ego_bp, _car_bp = launch_tools.blueprint_helpers.get_contrasting_blueprints()
83
84 # Spawn Others
85 GameFramework.request_new_batch_actors(
86 "vehicle.tesla.model3",
87 AMOUNT_ACTORS,
88 spawn_points=[sp for i, sp in enumerate(spawn_points[: AMOUNT_ACTORS + 1]) if i != EGO_SPAWN_IDX],
89 autopilot=True,
90 tick=False,
91 )
92
93 # Spawn Ego
94 start: carla.libcarla.Transform = spawn_points[EGO_SPAWN_IDX]
95 ego = game_framework.spawn_actor(ego_bp, start, must_spawn=True) # type: ignore[assignment]
96
97 logger.info("Creating agent and WorldModel ...")
98
99 agent, world_model, _global_planner, _controller = game_framework.init_agent_and_interface(
100 ego=None if args.externalActor else ego, # Test externalActor
101 agent_class=LunaticAgent,
102 config=agent_config,
103 )
104 logger.debug("Created agent and WorldModel.\n")
105
106 # Add Rules:
107 # agent.add_rules(create_default_rules(game_framework))
108
109 # NOTE: the default rules can be added over the yaml interface. To add them in a functional way, use the following code:
110 if not any("create_default_rules" in rule_config._target_ for rule_config in agent_config.rules):
111 default_rules = create_default_rules()
112 agent.add_rules(default_rules)
113
114 if PRINT_RULES: # NOTE: this can be a bit messy as some attributes have a long repr
115 print("Lunatic Agent Rules")
116 pprint(agent.rules)
117
118 # -- Scenario --
119
120 # Set initial destination
121 wp_start = world_model.map.get_waypoint(start.location)
122
123 next_wps: "list[carla.Waypoint]" = wp_start.next(50)
124 last_wp = next_wps[-1]
125 left_last_wp = last_wp.get_left_lane()
126 if left_last_wp is not None:
127 print(left_last_wp, world_model.map.get_waypoint(left_last_wp.transform.location))
128
129 # destination = random.choice(all_spawn_points).location
130 destination = left_last_wp.transform.location
131 else:
132 destination = last_wp.transform.location
133
134 agent.set_destination(last_wp.transform.location)
135
136 """
137 An example how to construct a loop for the agent.
138 """
139 agent.verify_settings() # Validate if the planner.dt is set up, should match the simulation time delta.
140
141 """
142 This is the main loop, raising for example a AgentDoneException inside the
143 with game_framework context will set game_frame.continue_loop to False.
144 """
145 while game_framework.continue_loop:
146 """
147 The `with game_framework` context manager takes care of background tasks
148 like world tick, HUD update, and is an exception handler for the agent.
149 """
150 with game_framework:
151 # ------ Run step ------
152
153 """
154 The agent.run_step is the main method in which the agent
155 calculates the next vehicle control object.
156 """
157 planned_control = agent.run_step(debug=True) # noqa: F841
158
159 # ------ Apply / Handle User Input ------
160
161 """
162 Afterwards the user can manipulate these controls.
163 If this is not wanted set allow_user_updates=False.
164
165 The user will then only be able to use the other keyboard
166 hotkeys like quitting the game or changing other settings.
167
168 See the documentation of the keyboard controls for more information.
169 """
170 try:
171 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN
172 agent.parse_keyboard_input(allow_user_updates=True)
173 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.END
174 except exceptions.UserInterruption:
175 return
176
177 # ------ Apply Control ------
178
179 """
180 The final step is to apply the controls to the vehicle.
181 Rules that execute at Phase.EXECUTION | Phase.BEGIN can
182 still manipulate or exchange the final controls.
183 """
184 # > Phase.EXECUTION | Phase.BEGIN
185 agent.apply_control()
186 # > Phase.EXECUTION | Phase.END
187
188 # ------ End of Loop ------
189
190 """Draw route information and junctions"""
191 if game_framework.launch_config.debug:
192 try:
193 debug_drawing(agent, game_framework, destination)
194 except Exception:
195 logger.debug("Error in debug drawing", exc_info=True)
196
197 # -- Stop Loop or Continue when agent is done --
198
199 """
200 Here is checked if the agent has reached its destination and has no
201 no rule or other function has done a replanning. A custom new destination
202 can be set here.
203 """
204 if args.loop and not game_framework.continue_loop and agent.done():
205 print("The target has been reached, searching for another target")
206 world_model.hud.notification("Target reached", seconds=4.0)
207
208 # Option 1 : Choose a random point anywhere
209 # destination = random.choice(spawn_points).location
210
211 # Option 2 : Choose a random point nearby.
212 wp = agent._current_waypoint.next(50)[-1]
213 next_wp = random.choice((wp, wp.get_left_lane(), wp.get_right_lane()))
214 if next_wp is None:
215 next_wp = wp
216 destination = next_wp.transform.location
217
218 """
219 After a new destination has been set, these steps should be executed
220 to allow a smooth continuation of the agent.
221 """
222 agent.set_destination(destination)
223 game_framework.continue_loop = True # if not set to True we will exit the loop
224 agent.execute_phase(Phase.DONE | Phase.END, prior_results=None)
225
226 # Optional: final phase of agents lifetime; could be used for cleanup tasks.
227 agent.execute_phase(Phase.TERMINATING | Phase.END, prior_results=None)
228
229 # Adding exception block
230 except Exception as e:
231 print("ERROR, exception in game loop", e)
232 logger.error("Exception in game loop", exc_info=True)
233 raise
234 finally:
235 """Cleanup the simulation by removing the used actors"""
236 print("Quitting. - Destroying actors and stopping world.")
237 if agent is not None:
238 agent.destroy()
239 if world_model is not None:
240 world_model.destroy(destroy_ego=False)
241 if game_framework is not None:
242 # save world for usage after CDP cleanup
243 game_framework.cleanup() # includes/or is CarlaDataProvider.cleanup()
244
245 pygame.quit()
246
247
248# ==============================================================================
249# -- main() --------------------------------------------------------------
250# ==============================================================================
251
252
253@hydra.main(version_base=None, config_path="./conf", config_name="launch_config")
254def main(args: LaunchConfig):
255 """
256 This is the main function wrapped with the `@hydra <https://hydra.cc/docs/intro/>`_ method that takes care of the configuration
257 merge and sets up logging.
258
259 Args:
260 args : The configuration object that is created by Hydra_, it
261 contains all the settings from the YAML files merged with the command line
262 arguments. From a high-level perspective is it a dictionary that also allows
263 dot access to its keys, e.g. :python:`args.host` instead of :python:`args["host"]`.
264
265 See Also:
266 - https://hydra.cc/
267 - `Hydra Config Intro <https://hydra.cc/docs/configure_hydra/intro/>`_
268 - `Default List - How config files are merged <https://hydra.cc/docs/advanced/defaults_list/>`_
269 - `Override Grammar <https://hydra.cc/docs/advanced/override_grammar/basic/>`_
270 - `Hydra Tab Completion <https://hydra.cc/docs/tutorials/basic/running_your_app/tab_completion/>`_
271 """
272 logger.info("listening to server %s:%s", args.host, args.port)
273
274 print(__doc__)
275 print(RSSKeyboardControl.get_docstring())
276 print("Launch Arguments:\n", OmegaConf.to_yaml(args), sep="")
277
278 # Validate the config for keys
279 args = LaunchConfig.check_config(args, args.get("strict_config", 3), as_dict_config=True)
280
281 try:
282 # Optional: If you return a result object it will be logged by Hydra's on_job_end callback
283 result = game_loop(args)
284 except KeyboardInterrupt:
285 print("\nCancelled by user. Bye!")
286 else:
287 return result
288
289
290# pyright: reportAssignmentType=none
291if __name__ == "__main__":
292 main()