Source code for AgentGameLoop

  1#!/usr/bin/env python
  2"""
  3Example of a game loop for the :py:class:`.LunaticAgent` class.
  4"""
  5
  6import random
  7from pprint import pprint
  8
  9import hydra
 10from omegaconf import OmegaConf
 11
 12# When you use an .egg file be sure to add it to your $PYTHONPATH
 13try:
 14    import carla
 15except ImportError:
 16    from launch_tools import carla
 17
 18import pygame
 19
 20import launch_tools
 21from agents.lunatic_agent import LunaticAgent
 22from agents.rules import create_default_rules
 23from agents.tools.config_creation import AsDictConfig, LaunchConfig, LunaticAgentSettings
 24from agents.tools.debug_drawing import debug_drawing
 25from agents.tools.logs import logger
 26from classes import exceptions
 27from classes.constants import Phase
 28from classes.keyboard_controls import RSSKeyboardControl  # Alternative: PassiveKeyboardControl
 29from classes.worldmodel import GameFramework, WorldModel
 30
 31# ==============================================================================
 32# Globals
 33# ==============================================================================
 34
 35AMOUNT_ACTORS = 12
 36"""How many other actors to spawn"""
 37
 38EGO_SPAWN_IDX = 3
 39"""Changes the start position of the ego vehicle"""
 40
 41PRINT_RULES = False
 42"""Print the rules to the console. This is messy."""
 43
 44# ==============================================================================
 45# -- Game Loop ---------------------------------------------------------
 46# ==============================================================================
 47
 48
[docs] 49def game_loop(args: LaunchConfig): 50 r""" 51 Main loop of the simulation. 52 53 It sets up the simulation, spawns the vehicles, and initializes the agent. 54 In the :python:`while` loop, the agent calculates one `\`carla.VehicleControl\`:py:class:`:external-icon-parse: 55 every iteration. 56 57 The `.GameFramework`:py:class: context manager takes care of the simulation tick, 58 and camera updates. 59 """ 60 # Avoid name errors in final block 61 game_framework: GameFramework = None 62 world_model: WorldModel = None 63 agent: LunaticAgent = None 64 ego: carla.Vehicle = None 65 66 # -- Load Settings Agent -- 67 68 print("--- Creating settings ---") 69 # Validates and creates the agent settings. Sidenote: agent_config a copy of args.agent, 70 # you can access args.agent as copy of the original settings 71 # To not validate the config against LunaticAgentSettings use OmegaConf.create(args.agent) 72 # to create a copy. 73 74 agent_config: AsDictConfig[LunaticAgentSettings] = LunaticAgentSettings.create(settings=args.agent) 75 76 try: 77 logger.info("Creating Game Framework ...") 78 game_framework = GameFramework(args) 79 80 # -- Spawn Vehicles -- 81 spawn_points = launch_tools.csv_tools.csv_to_transformations("examples/highway_example_car_positions.csv") 82 83 ego_bp, car_bp = launch_tools.blueprint_helpers.get_contrasting_blueprints() 84 85 # Spawn Others 86 GameFramework.request_new_batch_actors("vehicle.tesla.model3", 87 AMOUNT_ACTORS, 88 spawn_points=[sp for i, sp in enumerate(spawn_points[:AMOUNT_ACTORS + 1]) 89 if i != EGO_SPAWN_IDX], 90 autopilot=True, 91 tick=False) 92 93 # Spawn Ego 94 start: carla.libcarla.Transform = spawn_points[EGO_SPAWN_IDX] 95 ego = game_framework.spawn_actor(ego_bp, start, must_spawn=True) # type: ignore[assignment] 96 97 logger.info("Creating agent and WorldModel ...") 98 99 agent, world_model, global_planner, controller \ 100 = game_framework.init_agent_and_interface(ego=None if args.externalActor else ego, # Test externalActor 101 agent_class=LunaticAgent, 102 config=agent_config) 103 logger.debug("Created agent and WorldModel.\n") 104 105 # Add Rules: 106 #agent.add_rules(create_default_rules(game_framework)) 107 108 # NOTE: the default rules can be added over the yaml interface. To add them in a functional way, use the following code: 109 if not any("create_default_rules" in rule_config._target_ for rule_config in agent_config.rules): 110 default_rules = create_default_rules() 111 agent.add_rules(default_rules) 112 113 if PRINT_RULES: # NOTE: this can be a bit messy as some attributes have a long repr 114 print("Lunatic Agent Rules") 115 pprint(agent.rules) 116 117 # -- Scenario -- 118 119 # Set initial destination 120 wp_start = world_model.map.get_waypoint(start.location) 121 122 next_wps: "list[carla.Waypoint]" = wp_start.next(25) 123 last_wp = next_wps[-1] 124 left_last_wp = last_wp.get_left_lane() 125 print(left_last_wp, world_model.map.get_waypoint(left_last_wp.transform.location)) 126 127 # destination = random.choice(all_spawn_points).location 128 destination = left_last_wp.transform.location 129 130 agent.set_destination(last_wp.transform.location) 131 132 """ 133 An example how to construct a loop for the agent. 134 """ 135 agent.verify_settings() # Validate if the planner.dt is set up, should match the simulation time delta. 136 137 """ 138 This is the main loop, raising for example a AgentDoneException inside the 139 with game_framework context will set game_frame.continue_loop to False. 140 """ 141 while game_framework.continue_loop: 142 143 """ 144 The `with game_framework` context manager takes care of background tasks 145 like world tick, HUD update, and is an exception handler for the agent. 146 """ 147 with game_framework: 148 # ------ Run step ------ 149 150 """ 151 The agent.run_step is the main method in which the agent 152 calculates the next vehicle control object. 153 """ 154 planned_control = agent.run_step(debug=True) # noqa: F841 155 156 # ------ Apply / Handle User Input ------ 157 158 """ 159 Afterwards the user can manipulate these controls. 160 If this is not wanted set allow_user_updates=False. 161 162 The user will then only be able to use the other keyboard 163 hotkeys like quitting the game or changing other settings. 164 165 See the documentation of the keyboard controls for more information. 166 """ 167 try: 168 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.BEGIN 169 agent.parse_keyboard_input(allow_user_updates=True) 170 # -> Phase.APPLY_MANUAL_CONTROLS | Phase.END 171 except exceptions.UserInterruption: 172 return 173 174 # ------ Apply Control ------ 175 176 """ 177 The final step is to apply the controls to the vehicle. 178 Rules that execute at Phase.EXECUTION | Phase.BEGIN can 179 still manipulate or exchange the final controls. 180 """ 181 # > Phase.EXECUTION | Phase.BEGIN 182 agent.apply_control() 183 # > Phase.EXECUTION | Phase.END 184 185 # ------ End of Loop ------ 186 187 """Draw route information and junctions""" 188 if game_framework._args.debug: 189 try: 190 debug_drawing(agent, game_framework, destination) 191 except Exception: 192 logger.debug("Error in debug drawing", exc_info=True) 193 194 # -- Stop Loop or Continue when agent is done -- 195 196 """ 197 Here is checked if the agent has reached its destination and has no 198 no rule or other function has done a replanning. A custom new destination 199 can be set here. 200 """ 201 if args.loop and not game_framework.continue_loop and agent.done(): 202 print("The target has been reached, searching for another target") 203 world_model.hud.notification("Target reached", seconds=4.0) 204 205 # Option 1 : Choose a random point anywhere 206 #destination = random.choice(spawn_points).location 207 208 # Option 2 : Choose a random point nearby. 209 wp = agent._current_waypoint.next(50)[-1] 210 next_wp = random.choice((wp, wp.get_left_lane(), wp.get_right_lane())) 211 if next_wp is None: 212 next_wp = wp 213 destination = next_wp.transform.location 214 215 """ 216 After a new destination has been set, these steps should be executed 217 to allow a smooth continuation of the agent. 218 """ 219 agent.set_destination(destination) 220 game_framework.continue_loop = True # if not set to True we will exit the loop 221 agent.execute_phase(Phase.DONE | Phase.END, prior_results=None) 222 223 # Optional: final phase of agents lifetime; could be used for cleanup tasks. 224 agent.execute_phase(Phase.TERMINATING | Phase.END, prior_results=None) 225 226 # Adding exception block 227 except Exception as e: 228 print("ERROR, exception in game loop", e) 229 logger.error("Exception in game loop", exc_info=True) 230 raise 231 finally: 232 """Cleanup the simulation by removing the used actors""" 233 print("Quitting. - Destroying actors and stopping world.") 234 if agent is not None: 235 agent.destroy() 236 if world_model is not None: 237 world_model.destroy(destroy_ego=False) 238 if game_framework is not None: 239 # save world for usage after CDP cleanup 240 game_framework.cleanup() # includes/or is CarlaDataProvider.cleanup() 241 242 pygame.quit()
243 244# ============================================================================== 245# -- main() -------------------------------------------------------------- 246# ============================================================================== 247 248 249@hydra.main(version_base=None, config_path="./conf", config_name="launch_config") 250def main(args: LaunchConfig): 251 """ 252 This is the main function wrapped with the `@hydra <https://hydra.cc/docs/intro/>`_ method that takes care of the configuration 253 merge and sets up logging. 254 255 Args: 256 args : The configuration object that is created by Hydra_, it 257 contains all the settings from the YAML files merged with the command line 258 arguments. From a high-level perspective is it a dictionary that also allows 259 dot access to its keys, e.g. :python:`args.host` instead of :python:`args["host"]`. 260 261 See Also: 262 - https://hydra.cc/ 263 - `Hydra Config Intro <https://hydra.cc/docs/configure_hydra/intro/>`_ 264 - `Default List - How config files are merged <https://hydra.cc/docs/advanced/defaults_list/>`_ 265 - `Override Grammar <https://hydra.cc/docs/advanced/override_grammar/basic/>`_ 266 - `Hydra Tab Completion <https://hydra.cc/docs/tutorials/basic/running_your_app/tab_completion/>`_ 267 """ 268 269 logger.info('listening to server %s:%s', args.host, args.port) 270 271 print(__doc__) 272 print(RSSKeyboardControl.get_docstring()) 273 print("Launch Arguments:\n", OmegaConf.to_yaml(args), sep="") 274 275 # Validate the config for keys 276 args = LaunchConfig.check_config(args, args.get("strict_config", 3), as_dict_config=True) 277 278 try: 279 # Optional: If you return a result object it will be logged by Hydra's on_job_end callback 280 result = game_loop(args) 281 except KeyboardInterrupt: 282 print('\nCancelled by user. Bye!') 283 else: 284 return result 285 286 287# pyright: reportAssignmentType=none 288if __name__ == '__main__': 289 main()