1#!/usr/bin/env python
2"""
3Example of a game loop for the :py:class:`.LunaticAgent` class.
4"""
5
6import random
7from pprint import pprint
8
9import hydra
10from omegaconf import OmegaConf
11
12# When you use an .egg file be sure to add it to your $PYTHONPATH
13try:
14 import carla
15except ImportError:
16 from launch_tools import carla
17
18import pygame
19
20import launch_tools
21from agents.lunatic_agent import LunaticAgent
22from agents.rules import create_default_rules
23from agents.tools.config_creation import AsDictConfig, LaunchConfig, LunaticAgentSettings
24from agents.tools.debug_drawing import debug_drawing
25from agents.tools.logs import logger
26from classes import exceptions
27from classes.constants import Phase
28from classes.keyboard_controls import RSSKeyboardControl # Alternative: PassiveKeyboardControl
29from classes.worldmodel import GameFramework, WorldModel
30
31# ==============================================================================
32# Globals
33# ==============================================================================
34
35AMOUNT_ACTORS = 12
36"""How many other actors to spawn"""
37
38EGO_SPAWN_IDX = 3
39"""Changes the start position of the ego vehicle"""
40
41PRINT_RULES = False
42"""Print the rules to the console. This is messy."""
43
44# ==============================================================================
45# -- Game Loop ---------------------------------------------------------
46# ==============================================================================
47
48
[docs]
49def game_loop(args: LaunchConfig):
50 r"""
51 Main loop of the simulation.
52
53 It sets up the simulation, spawns the vehicles, and initializes the agent.
54 In the :python:`while` loop, the agent calculates one `\`carla.VehicleControl\`:py:class:`:external-icon-parse:
55 every iteration.
56
57 The `.GameFramework`:py:class: context manager takes care of the simulation tick,
58 and camera updates.
59 """
60 # Avoid name errors in final block
61 game_framework: GameFramework = None
62 world_model: WorldModel = None
63 agent: LunaticAgent = None
64 ego: carla.Vehicle = None
65
66 # -- Load Settings Agent --
67
68 print("--- Creating settings ---")
69 # Validates and creates the agent settings. Sidenote: agent_config a copy of args.agent,
70 # you can access args.agent as copy of the original settings
71 # To not validate the config against LunaticAgentSettings use OmegaConf.create(args.agent)
72 # to create a copy.
73
74 agent_config: AsDictConfig[LunaticAgentSettings] = LunaticAgentSettings.create(settings=args.agent)
75
76 try:
77 logger.info("Creating Game Framework ...")
78 game_framework = GameFramework(args)
79
80 # -- Spawn Vehicles --
81 spawn_points = launch_tools.csv_tools.csv_to_transformations("examples/highway_example_car_positions.csv")
82
83 ego_bp, car_bp = launch_tools.blueprint_helpers.get_contrasting_blueprints()
84
85 # Spawn Others
86 GameFramework.request_new_batch_actors("vehicle.tesla.model3",
87 AMOUNT_ACTORS,
88 spawn_points=[sp for i, sp in enumerate(spawn_points[:AMOUNT_ACTORS + 1])
89 if i != EGO_SPAWN_IDX],
90 autopilot=True,
91 tick=False)
92
93 # Spawn Ego
94 start: carla.libcarla.Transform = spawn_points[EGO_SPAWN_IDX]
95 ego = game_framework.spawn_actor(ego_bp, start, must_spawn=True) # type: ignore[assignment]
96
97 logger.info("Creating agent and WorldModel ...")
98
99 agent, world_model, global_planner, controller \
100 = game_framework.init_agent_and_interface(ego=None if args.externalActor else ego, # Test externalActor
101 agent_class=LunaticAgent,
102 config=agent_config)
103 logger.debug("Created agent and WorldModel.\n")
104
105 # Add Rules:
106 #agent.add_rules(create_default_rules(game_framework))
107
108 # NOTE: the default rules can be added over the yaml interface. To add them in a functional way, use the following code:
109 if not any("create_default_rules" in rule_config._target_ for rule_config in agent_config.rules):
110 default_rules = create_default_rules()
111 agent.add_rules(default_rules)
112
113 if PRINT_RULES: # NOTE: this can be a bit messy as some attributes have a long repr
114 print("Lunatic Agent Rules")
115 pprint(agent.rules)
116
117 # -- Scenario --
118
119 # Set initial destination
120 wp_start = world_model.map.get_waypoint(start.location)
121
122 next_wps: "list[carla.Waypoint]" = wp_start.next(25)
123 last_wp = next_wps[-1]
124 left_last_wp = last_wp.get_left_lane()
125 print(left_last_wp, world_model.map.get_waypoint(left_last_wp.transform.location))
126
127 # destination = random.choice(all_spawn_points).location
128 destination = left_last_wp.transform.location
129
130 agent.set_destination(last_wp.transform.location)
131
132 """
133 An example how to construct a loop for the agent.
134 """
135 agent.verify_settings() # Validate if the planner.dt is set up, should match the simulation time delta.
136
137 """
138 This is the main loop, raising for example a AgentDoneException inside the
139 with game_framework context will set game_frame.continue_loop to False.
140 """
141 while game_framework.continue_loop:
142
143 """
144 The `with game_framework` context manager takes care of background tasks
145 like world tick, HUD update, and is an exception handler for the agent.
146 """
147 with game_framework:
148 # ------ Run step ------
149
150 """
151 The agent.run_step is the main method in which the agent
152 calculates the next vehicle control object.
153 """
154 planned_control = agent.run_step(debug=True) # noqa: F841
155
156 # ------ Apply / Handle User Input ------
157
158 """
159 Afterwards the user can manipulate these controls.
160 If this is not wanted set allow_user_updates=False.
161
162 The user will then only be able to use the other keyboard
163 hotkeys like quitting the game or changing other settings.
164
165 See the documentation of the keyboard controls for more information.
166 """
167 try:
168 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN
169 agent.parse_keyboard_input(allow_user_updates=True)
170 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.END
171 except exceptions.UserInterruption:
172 return
173
174 # ------ Apply Control ------
175
176 """
177 The final step is to apply the controls to the vehicle.
178 Rules that execute at Phase.EXECUTION | Phase.BEGIN can
179 still manipulate or exchange the final controls.
180 """
181 # > Phase.EXECUTION | Phase.BEGIN
182 agent.apply_control()
183 # > Phase.EXECUTION | Phase.END
184
185 # ------ End of Loop ------
186
187 """Draw route information and junctions"""
188 if game_framework._args.debug:
189 try:
190 debug_drawing(agent, game_framework, destination)
191 except Exception:
192 logger.debug("Error in debug drawing", exc_info=True)
193
194 # -- Stop Loop or Continue when agent is done --
195
196 """
197 Here is checked if the agent has reached its destination and has no
198 no rule or other function has done a replanning. A custom new destination
199 can be set here.
200 """
201 if args.loop and not game_framework.continue_loop and agent.done():
202 print("The target has been reached, searching for another target")
203 world_model.hud.notification("Target reached", seconds=4.0)
204
205 # Option 1 : Choose a random point anywhere
206 #destination = random.choice(spawn_points).location
207
208 # Option 2 : Choose a random point nearby.
209 wp = agent._current_waypoint.next(50)[-1]
210 next_wp = random.choice((wp, wp.get_left_lane(), wp.get_right_lane()))
211 if next_wp is None:
212 next_wp = wp
213 destination = next_wp.transform.location
214
215 """
216 After a new destination has been set, these steps should be executed
217 to allow a smooth continuation of the agent.
218 """
219 agent.set_destination(destination)
220 game_framework.continue_loop = True # if not set to True we will exit the loop
221 agent.execute_phase(Phase.DONE | Phase.END, prior_results=None)
222
223 # Optional: final phase of agents lifetime; could be used for cleanup tasks.
224 agent.execute_phase(Phase.TERMINATING | Phase.END, prior_results=None)
225
226 # Adding exception block
227 except Exception as e:
228 print("ERROR, exception in game loop", e)
229 logger.error("Exception in game loop", exc_info=True)
230 raise
231 finally:
232 """Cleanup the simulation by removing the used actors"""
233 print("Quitting. - Destroying actors and stopping world.")
234 if agent is not None:
235 agent.destroy()
236 if world_model is not None:
237 world_model.destroy(destroy_ego=False)
238 if game_framework is not None:
239 # save world for usage after CDP cleanup
240 game_framework.cleanup() # includes/or is CarlaDataProvider.cleanup()
241
242 pygame.quit()
243
244# ==============================================================================
245# -- main() --------------------------------------------------------------
246# ==============================================================================
247
248
249@hydra.main(version_base=None, config_path="./conf", config_name="launch_config")
250def main(args: LaunchConfig):
251 """
252 This is the main function wrapped with the `@hydra <https://hydra.cc/docs/intro/>`_ method that takes care of the configuration
253 merge and sets up logging.
254
255 Args:
256 args : The configuration object that is created by Hydra_, it
257 contains all the settings from the YAML files merged with the command line
258 arguments. From a high-level perspective is it a dictionary that also allows
259 dot access to its keys, e.g. :python:`args.host` instead of :python:`args["host"]`.
260
261 See Also:
262 - https://hydra.cc/
263 - `Hydra Config Intro <https://hydra.cc/docs/configure_hydra/intro/>`_
264 - `Default List - How config files are merged <https://hydra.cc/docs/advanced/defaults_list/>`_
265 - `Override Grammar <https://hydra.cc/docs/advanced/override_grammar/basic/>`_
266 - `Hydra Tab Completion <https://hydra.cc/docs/tutorials/basic/running_your_app/tab_completion/>`_
267 """
268
269 logger.info('listening to server %s:%s', args.host, args.port)
270
271 print(__doc__)
272 print(RSSKeyboardControl.get_docstring())
273 print("Launch Arguments:\n", OmegaConf.to_yaml(args), sep="")
274
275 # Validate the config for keys
276 args = LaunchConfig.check_config(args, args.get("strict_config", 3), as_dict_config=True)
277
278 try:
279 # Optional: If you return a result object it will be logged by Hydra's on_job_end callback
280 result = game_loop(args)
281 except KeyboardInterrupt:
282 print('\nCancelled by user. Bye!')
283 else:
284 return result
285
286
287# pyright: reportAssignmentType=none
288if __name__ == '__main__':
289 main()