Source code for AgentGameLoop

  1#!/usr/bin/env python
  2"""
  3Example of a game loop for the :py:class:`.LunaticAgent` class.
  4"""
  5
  6import random
  7from pprint import pprint
  8
  9import hydra
 10from omegaconf import OmegaConf
 11
 12# When you use an .egg file be sure to add it to your $PYTHONPATH
 13try:
 14    import carla
 15except ImportError:
 16    from launch_tools import carla
 17import pygame
 18
 19import launch_tools
 20from agents.lunatic_agent import LunaticAgent
 21from agents.rules import create_default_rules
 22from agents.tools.config_creation import AsDictConfig, LaunchConfig, LunaticAgentSettings
 23from agents.tools.debug_drawing import debug_drawing
 24from agents.tools.logs import logger
 25from classes import exceptions
 26from classes.constants import Phase
 27from classes.ui.keyboard_controls import RSSKeyboardControl  # Alternative: PassiveKeyboardControl
 28from classes.worldmodel import GameFramework, WorldModel
 29
 30# ==============================================================================
 31# Globals
 32# ==============================================================================
 33
 34AMOUNT_ACTORS = 12
 35"""How many other actors to spawn"""
 36
 37EGO_SPAWN_IDX = 3
 38"""Changes the start position of the ego vehicle"""
 39
 40PRINT_RULES = False
 41"""Print the rules to the console. This is messy."""
 42
 43# ==============================================================================
 44# -- Game Loop ---------------------------------------------------------
 45# ==============================================================================
 46
 47
[docs] 48def game_loop(args: LaunchConfig): 49 r""" 50 Main loop of the simulation. 51 52 It sets up the simulation, spawns the vehicles, and initializes the agent. 53 In the :python:`while` loop, the agent calculates one `\`carla.VehicleControl\`:py:class:`:external-icon-parse: 54 every iteration. 55 56 The `.GameFramework`:py:class: context manager takes care of the simulation tick, 57 and camera updates. 58 """ 59 # Avoid name errors in final block 60 game_framework: GameFramework = None 61 world_model: WorldModel = None 62 agent: LunaticAgent = None 63 ego: carla.Vehicle = None 64 65 # -- Load Settings Agent -- 66 67 print("--- Creating settings ---") 68 # Validates and creates the agent settings. Sidenote: agent_config a copy of args.agent, 69 # you can access args.agent as copy of the original settings 70 # To not validate the config against LunaticAgentSettings use OmegaConf.create(args.agent) 71 # to create a copy. 72 73 agent_config: AsDictConfig[LunaticAgentSettings] = LunaticAgentSettings.create(settings=args.agent) 74 75 try: 76 logger.info("Creating Game Framework ...") 77 game_framework = GameFramework(args) 78 79 # -- Spawn Vehicles -- 80 spawn_points = launch_tools.csv_tools.csv_to_transformations("examples/highway_example_car_positions.csv") 81 82 ego_bp, _car_bp = launch_tools.blueprint_helpers.get_contrasting_blueprints() 83 84 # Spawn Others 85 GameFramework.request_new_batch_actors( 86 "vehicle.tesla.model3", 87 AMOUNT_ACTORS, 88 spawn_points=[sp for i, sp in enumerate(spawn_points[: AMOUNT_ACTORS + 1]) if i != EGO_SPAWN_IDX], 89 autopilot=True, 90 tick=False, 91 ) 92 93 # Spawn Ego 94 start: carla.libcarla.Transform = spawn_points[EGO_SPAWN_IDX] 95 ego = game_framework.spawn_actor(ego_bp, start, must_spawn=True) # type: ignore[assignment] 96 97 logger.info("Creating agent and WorldModel ...") 98 99 agent, world_model, _global_planner, _controller = game_framework.init_agent_and_interface( 100 ego=None if args.externalActor else ego, # Test externalActor 101 agent_class=LunaticAgent, 102 config=agent_config, 103 ) 104 logger.debug("Created agent and WorldModel.\n") 105 106 # Add Rules: 107 # agent.add_rules(create_default_rules(game_framework)) 108 109 # NOTE: the default rules can be added over the yaml interface. To add them in a functional way, use the following code: 110 if not any("create_default_rules" in rule_config._target_ for rule_config in agent_config.rules): 111 default_rules = create_default_rules() 112 agent.add_rules(default_rules) 113 114 if PRINT_RULES: # NOTE: this can be a bit messy as some attributes have a long repr 115 print("Lunatic Agent Rules") 116 pprint(agent.rules) 117 118 # -- Scenario -- 119 120 # Set initial destination 121 wp_start = world_model.map.get_waypoint(start.location) 122 123 next_wps: "list[carla.Waypoint]" = wp_start.next(50) 124 last_wp = next_wps[-1] 125 left_last_wp = last_wp.get_left_lane() 126 if left_last_wp is not None: 127 print(left_last_wp, world_model.map.get_waypoint(left_last_wp.transform.location)) 128 129 # destination = random.choice(all_spawn_points).location 130 destination = left_last_wp.transform.location 131 else: 132 destination = last_wp.transform.location 133 134 agent.set_destination(last_wp.transform.location) 135 136 """ 137 An example how to construct a loop for the agent. 138 """ 139 agent.verify_settings() # Validate if the planner.dt is set up, should match the simulation time delta. 140 141 """ 142 This is the main loop, raising for example a AgentDoneException inside the 143 with game_framework context will set game_frame.continue_loop to False. 144 """ 145 while game_framework.continue_loop: 146 """ 147 The `with game_framework` context manager takes care of background tasks 148 like world tick, HUD update, and is an exception handler for the agent. 149 """ 150 with game_framework: 151 # ------ Run step ------ 152 153 """ 154 The agent.run_step is the main method in which the agent 155 calculates the next vehicle control object. 156 """ 157 planned_control = agent.run_step(debug=True) # noqa: F841 158 159 # ------ Apply / Handle User Input ------ 160 161 """ 162 Afterwards the user can manipulate these controls. 163 If this is not wanted set allow_user_updates=False. 164 165 The user will then only be able to use the other keyboard 166 hotkeys like quitting the game or changing other settings. 167 168 See the documentation of the keyboard controls for more information. 169 """ 170 try: 171 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN 172 agent.parse_keyboard_input(allow_user_updates=True) 173 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.END 174 except exceptions.UserInterruption: 175 return 176 177 # ------ Apply Control ------ 178 179 """ 180 The final step is to apply the controls to the vehicle. 181 Rules that execute at Phase.EXECUTION | Phase.BEGIN can 182 still manipulate or exchange the final controls. 183 """ 184 # > Phase.EXECUTION | Phase.BEGIN 185 agent.apply_control() 186 # > Phase.EXECUTION | Phase.END 187 188 # ------ End of Loop ------ 189 190 """Draw route information and junctions""" 191 if game_framework.launch_config.debug: 192 try: 193 debug_drawing(agent, game_framework, destination) 194 except Exception: 195 logger.debug("Error in debug drawing", exc_info=True) 196 197 # -- Stop Loop or Continue when agent is done -- 198 199 """ 200 Here is checked if the agent has reached its destination and has no 201 no rule or other function has done a replanning. A custom new destination 202 can be set here. 203 """ 204 if args.loop and not game_framework.continue_loop and agent.done(): 205 print("The target has been reached, searching for another target") 206 world_model.hud.notification("Target reached", seconds=4.0) 207 208 # Option 1 : Choose a random point anywhere 209 # destination = random.choice(spawn_points).location 210 211 # Option 2 : Choose a random point nearby. 212 wp = agent._current_waypoint.next(50)[-1] 213 next_wp = random.choice((wp, wp.get_left_lane(), wp.get_right_lane())) 214 if next_wp is None: 215 next_wp = wp 216 destination = next_wp.transform.location 217 218 """ 219 After a new destination has been set, these steps should be executed 220 to allow a smooth continuation of the agent. 221 """ 222 agent.set_destination(destination) 223 game_framework.continue_loop = True # if not set to True we will exit the loop 224 agent.execute_phase(Phase.DONE | Phase.END, prior_results=None) 225 226 # Optional: final phase of agents lifetime; could be used for cleanup tasks. 227 agent.execute_phase(Phase.TERMINATING | Phase.END, prior_results=None) 228 229 # Adding exception block 230 except Exception as e: 231 print("ERROR, exception in game loop", e) 232 logger.error("Exception in game loop", exc_info=True) 233 raise 234 finally: 235 """Cleanup the simulation by removing the used actors""" 236 print("Quitting. - Destroying actors and stopping world.") 237 if agent is not None: 238 agent.destroy() 239 if world_model is not None: 240 world_model.destroy(destroy_ego=False) 241 if game_framework is not None: 242 # save world for usage after CDP cleanup 243 game_framework.cleanup() # includes/or is CarlaDataProvider.cleanup() 244 245 pygame.quit()
246 247 248# ============================================================================== 249# -- main() -------------------------------------------------------------- 250# ============================================================================== 251 252 253@hydra.main(version_base=None, config_path="./conf", config_name="launch_config") 254def main(args: LaunchConfig): 255 """ 256 This is the main function wrapped with the `@hydra <https://hydra.cc/docs/intro/>`_ method that takes care of the configuration 257 merge and sets up logging. 258 259 Args: 260 args : The configuration object that is created by Hydra_, it 261 contains all the settings from the YAML files merged with the command line 262 arguments. From a high-level perspective is it a dictionary that also allows 263 dot access to its keys, e.g. :python:`args.host` instead of :python:`args["host"]`. 264 265 See Also: 266 - https://hydra.cc/ 267 - `Hydra Config Intro <https://hydra.cc/docs/configure_hydra/intro/>`_ 268 - `Default List - How config files are merged <https://hydra.cc/docs/advanced/defaults_list/>`_ 269 - `Override Grammar <https://hydra.cc/docs/advanced/override_grammar/basic/>`_ 270 - `Hydra Tab Completion <https://hydra.cc/docs/tutorials/basic/running_your_app/tab_completion/>`_ 271 """ 272 logger.info("listening to server %s:%s", args.host, args.port) 273 274 print(__doc__) 275 print(RSSKeyboardControl.get_docstring()) 276 print("Launch Arguments:\n", OmegaConf.to_yaml(args), sep="") 277 278 # Validate the config for keys 279 args = LaunchConfig.check_config(args, args.get("strict_config", 3), as_dict_config=True) 280 281 try: 282 # Optional: If you return a result object it will be logged by Hydra's on_job_end callback 283 result = game_loop(args) 284 except KeyboardInterrupt: 285 print("\nCancelled by user. Bye!") 286 else: 287 return result 288 289 290# pyright: reportAssignmentType=none 291if __name__ == "__main__": 292 main()