Source code for classes.ui.keyboard_controls

  1import logging
  2import sys
  3import inspect
  4import signal
  5import weakref
  6from typing import TYPE_CHECKING, Any, Optional, Union
  7
  8import carla
  9import pygame
 10from pygame.locals import (
 11    K_BACKSPACE,
 12    K_DOWN,
 13    K_EQUALS,
 14    K_ESCAPE,
 15    K_F1,
 16    K_F2,
 17    K_F3,
 18    K_F4,
 19    K_F5,
 20    K_F6,
 21    K_LEFT,
 22    K_MINUS,
 23    K_RIGHT,
 24    K_SPACE,
 25    K_TAB,
 26    K_UP,
 27    KMOD_CTRL,
 28    KMOD_SHIFT,
 29    MOUSEBUTTONDOWN,
 30    MOUSEBUTTONUP,
 31    K_a,
 32    K_b,
 33    K_d,
 34    K_g,
 35    K_i,
 36    K_l,
 37    K_n,
 38    K_o,
 39    K_p,
 40    K_q,
 41    K_r,
 42    K_s,
 43    K_t,
 44    K_w,
 45    K_x,
 46    K_z,
 47)
 48from typing_extensions import Literal
 49
 50from classes import CanBeDummy
 51
 52if TYPE_CHECKING:
 53    from classes.worldmodel import WorldModel
 54
 55
 56# ==============================================================================
 57# -- KeyboardControl -----------------------------------------------------------
 58# ==============================================================================
 59
 60
[docs] 61class KeyboardControl: 62 """ 63 Primitive base for keyboard control classes. 64 65 H/? : toggle help 66 """ 67 68 _world_model: "WorldModel" 69 """Reference to the world model that controls the interface.""" 70 71 # COMMENT I think this only allows to end the script
[docs] 72 def __init__(self, world_model: "WorldModel", *, show_help_notice=True, clock: Optional[pygame.time.Clock] = None): 73 """ 74 Parameters: 75 world_model : WorldModel 76 show_help_notice : bool 77 Show a notice about the help keys. 78 clock : To track the game time. 79 """ 80 self.enabled = True 81 self._world_model = world_model 82 # Controller should register itself on the World Model 83 world_model.controller = weakref.proxy(self) 84 if world_model.hud.help.surface is None: 85 world_model.hud.help.create_surface(self.get_docstring()) 86 if show_help_notice: 87 world_model.hud.notification("Press 'H' or '?' for help.", seconds=4.0) 88 self._clock = clock
89
[docs] 90 def enable(self, value: bool = True): 91 self.enabled = value
92
[docs] 93 @classmethod 94 def get_docstring(cls): 95 """Return the docstring of the class""" 96 97 doc = cls.__doc__ 98 # Get doc from parent 99 if doc is None: 100 doc = inspect.getdoc(cls) 101 if doc is not None and doc != object.__doc__: 102 cls.__doc__ = doc 103 else: 104 cls.__doc__ = doc = "No docstring available" 105 if doc == "No docstring available": 106 return "No docstring available" 107 108 return "======== Controls ===========\n" + doc + "\n============================\n"
109
[docs] 110 def parse_events(self, events=None) -> Union[Literal[True], Any]: 111 """ 112 Parse the input events and return True if the loop should end. 113 """ 114 if not self.enabled: 115 return None 116 events = events if events is not None else pygame.event.get() 117 for event in events: # pylint: disable=unused-variable 118 self._check_help_event(event) 119 return None
120 121 @staticmethod 122 def _is_quit_shortcut(key: int): 123 """Shortcut for quitting""" 124 return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL) 125 126 def _check_help_event(self, event: pygame.event.Event): 127 """Check if the event is a help event and displays it.""" 128 if not hasattr(event, "unicode"): # No KEYUP/DOWN event 129 return None 130 if event.unicode.lower() in ("h", "?"): # type: ignore[attr-defined] 131 self._world_model.hud.help.toggle() 132 return True 133 return False
134 135
[docs] 136class PassiveKeyboardControl(KeyboardControl): 137 """ 138 Does not allow to control the vehicle. Only allows to 139 quit the simulation. 140 141 | ESC : quit 142 | H/? : toggle help 143 """ 144
[docs] 145 def parse_events(self) -> "Literal[True] | None": 146 if not self.enabled: 147 return None 148 for event in pygame.event.get(): 149 if event.type == pygame.QUIT: 150 return True 151 if event.type == pygame.KEYUP: 152 if self._is_quit_shortcut(event.key): 153 return True 154 if self._check_help_event(event): 155 pass 156 return None
157 158
[docs] 159class RSSKeyboardControl(CanBeDummy, KeyboardControl): 160 """ 161 Use ARROWS, WASD keys or mouse click and drag for control. 162 163 | W : throttle 164 | S : brake 165 | AD : steer 166 | Q : toggle reverse 167 | Space : hand-brake 168 | P : toggle autopilot (depends on setup) 169 170 | TAB : change view 171 | Backspace : change vehicle (will unset externalActor; experimental) 172 173 | R : toggle recording images to disk 174 175 | F2 : toggle RSS visualization mode 176 | F3 : increase log level 177 | F4 : decrease log level 178 | F5 : increase map log level 179 | F6 : decrease map log level 180 | B : toggle RSS Road Boundaries Mode 181 | G : RSS check drop current route (experimental) 182 | S : toggle RSS (NotImplemented) 183 | T : toggle vehicle's telemetry visualization 184 | O : open/close vehicle's doors 185 | N : pause simulation (not in sync mode) 186 187 | -- Experimental, recording -- 188 | 189 | CTRL + R : toggle recording of simulation (replacing any previous) 190 | CTRL + P : start replaying last recorded simulation (untested) 191 | CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds) 192 | CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds) 193 194 | F1 : toggle HUD 195 | H/? : toggle help 196 | ESC : quit 197 """ 198 199 MOUSE_STEERING_RANGE = 150 200 """ 201 Controls the size of steering area when using the mouse. 202 """ 203 204 # TODO: should move this to GameFramework; remnant of original carla code 205 signal_received: "bool | int" = False 206 """ 207 Got a signal to stop the simulation. No more events will be parsed if True. 208 209 :meta private: 210 """ 211
[docs] 212 def __init__( 213 self, 214 world_model: "WorldModel", 215 *, 216 clock: pygame.time.Clock, 217 start_in_autopilot: bool = False, 218 agent_controlled: bool = True, 219 config=None, 220 ): 221 """ 222 Parameters: 223 world_model : WorldModel 224 The world model that controls the interface. 225 clock : pygame.time.Clock 226 The clock to get the time from. 227 start_in_autopilot : bool 228 If CARLA's autopilot is used; or some other system. 229 May not be True if agent_controlled is True. 230 Defaults to :code:`False`. 231 agent_controlled : bool 232 May not be True if start_in_autopilot is True. 233 Defaults to :python:`True`. 234 config : Any 235 Configuration data. Currently unused. 236 237 Note: 238 Creating a controller registers :py:attr:`.WorldModel.controller` to this instance. 239 """ 240 if start_in_autopilot and agent_controlled: 241 raise ValueError("Agent controlled and autopilot cannot be active at the same time.") 242 super().__init__(world_model=world_model, clock=clock) 243 if self._clock is None: 244 raise ValueError("Clock must be provided.") 245 elif TYPE_CHECKING: 246 self._clock = self._clock 247 248 self._world_model = world_model 249 self._config = config # Note: currently unused, could use this to disable the controller 250 self._autopilot_enabled = start_in_autopilot 251 self._agent_controlled = agent_controlled 252 self._control: carla.VehicleControl = None # type: ignore[assignment] 253 # self._control = carla.VehicleControl() 254 self._vehicle_physics = world_model.player.get_physics_control() 255 self._lights = carla.VehicleLightState.NONE 256 world_model.player.set_light_state(self._lights) 257 self._steer_cache = 0.0 258 self._mouse_steering_center = None 259 260 self._surface = pygame.Surface((self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE * 2)) 261 self._surface.set_colorkey(pygame.Color("black")) 262 self._surface.set_alpha(60) 263 264 line_width = 2 265 pygame.draw.polygon( 266 self._surface, 267 (0, 0, 255), 268 [ 269 (0, 0), 270 (0, self.MOUSE_STEERING_RANGE * 2 - line_width), 271 (self.MOUSE_STEERING_RANGE * 2 - line_width, self.MOUSE_STEERING_RANGE * 2 - line_width), 272 (self.MOUSE_STEERING_RANGE * 2 - line_width, 0), 273 (0, 0), 274 ], 275 line_width, 276 ) 277 pygame.draw.polygon( 278 self._surface, 279 (0, 0, 255), 280 [(0, self.MOUSE_STEERING_RANGE), (self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE)], 281 line_width, 282 ) 283 pygame.draw.polygon( 284 self._surface, 285 (0, 0, 255), 286 [(self.MOUSE_STEERING_RANGE, 0), (self.MOUSE_STEERING_RANGE, self.MOUSE_STEERING_RANGE * 2)], 287 line_width, 288 )
289 290 @property 291 def controlled_externally(self) -> bool: 292 """ 293 Returns True if the vehicle is controlled by some system, i.e not manually by the user. 294 295 """ 296 return self._autopilot_enabled or self._agent_controlled 297
[docs] 298 def render(self, display: pygame.Surface) -> None: 299 if self._mouse_steering_center: 300 display.blit( 301 self._surface, 302 ( 303 self._mouse_steering_center[0] - self.MOUSE_STEERING_RANGE, 304 self._mouse_steering_center[1] - self.MOUSE_STEERING_RANGE, 305 ), 306 )
307 308 @staticmethod 309 def _signal_handler(signum, _): 310 """ 311 Signal handler for stopping the simulation, e.g. when pressing Ctrl+C 312 in the terminal. 313 314 Note: 315 If DetectionMatrix is used this signal handler will be overwritten, 316 but still executed. 317 318 :meta private: 319 """ 320 if not RSSKeyboardControl.signal_received: 321 print(f"\nReceived signal {signum}. Trigger stopping... In case the program freezes trigger twice more.") 322 RSSKeyboardControl.signal_received = True 323 return 324 # Did not yet terminate 325 if RSSKeyboardControl.signal_received is True: 326 print(f"\nReceived signal {signum}. Abort a 3rd time to terminate the program immediately") 327 RSSKeyboardControl.signal_received = 2 328 return 329 sys.exit(1) 330
[docs] 331 def parse_events( 332 self, control: "Optional[carla.VehicleControl]" = None 333 ) -> "Literal[True] | None": 334 if control: 335 self._control = control # Note this might be the rss updated controls 336 if RSSKeyboardControl.signal_received: 337 print("\nAccepted signal. Stopping loop...") 338 return True 339 if not self.enabled: 340 return None 341 if isinstance(self._control, carla.VehicleControl): 342 current_lights = self._lights 343 can_modify_controls = True 344 else: 345 can_modify_controls = False 346 for event in pygame.event.get(): 347 if event.type == pygame.QUIT: 348 return True 349 if event.type == pygame.KEYUP: 350 if self._is_quit_shortcut(event.key): 351 return True 352 353 if event.key == K_BACKSPACE: 354 self._world_model.external_actor = False # delete and respawn 355 if self._autopilot_enabled: 356 self._world_model.player.set_autopilot(False) 357 self._world_model.restart() 358 self._world_model.player.set_autopilot(True) 359 else: 360 self._world_model.restart() 361 elif event.key == K_F1: 362 self._world_model.hud.toggle_info() 363 elif self._check_help_event(event): 364 pass 365 elif event.key == K_TAB: 366 self._world_model.rss_unstructured_scene_visualizer.toggle_camera() 367 elif event.key == K_n: 368 self._world_model.toggle_pause() 369 elif event.key == K_r: 370 self._world_model.toggle_recording() 371 elif event.key == K_F2: 372 if self._world_model and self._world_model.rss_sensor: 373 self._world_model.rss_sensor.toggle_debug_visualization_mode() 374 newmode = self._world_model.rss_sensor.debug_visualizer._visualization_mode 375 self._world_model.hud.notification(f"RSS Debug Visualization Mode: {newmode}") 376 elif event.key == K_F3: 377 if self._world_model and self._world_model.rss_sensor: 378 if not self._world_model._restrictor: 379 print("WARNING: No restrictor available") 380 else: 381 self._world_model.rss_sensor.decrease_log_level() 382 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel)) 383 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType] 384 elif event.key == K_F4: 385 if self._world_model and self._world_model.rss_sensor: 386 if not self._world_model._restrictor: 387 print("WARNING: No restrictor available") 388 else: 389 self._world_model.rss_sensor.increase_log_level() 390 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel)) 391 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType] 392 elif event.key == K_F5: 393 if self._world_model and self._world_model.rss_sensor: 394 self._world_model.rss_sensor.decrease_map_log_level() 395 elif event.key == K_F6: 396 if self._world_model and self._world_model.rss_sensor: 397 self._world_model.rss_sensor.increase_map_log_level() 398 elif event.key == K_b: 399 if self._world_model and self._world_model.rss_sensor: 400 if self._world_model.rss_sensor.sensor.road_boundaries_mode == carla.RssRoadBoundariesMode.Off: 401 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.On) 402 print("carla.RssRoadBoundariesMode.On") 403 self._world_model.hud.notification("RSS Road Boundaries Mode: On") 404 else: 405 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.Off) 406 print("carla.RssRoadBoundariesMode.Off") 407 self._world_model.hud.notification("RSS Road Boundaries Mode: Off") 408 elif event.key == K_g: 409 if self._world_model and self._world_model.rss_sensor: 410 self._world_model.rss_sensor.drop_route() 411 # --- Experimental, copied from other locations --- 412 elif event.key == K_t: 413 if self._world_model.show_vehicle_telemetry: 414 self._world_model.player.show_debug_telemetry(False) 415 self._world_model.show_vehicle_telemetry = False 416 self._world_model.hud.notification("Disabled Vehicle Telemetry") 417 else: 418 try: 419 self._world_model.player.show_debug_telemetry(True) 420 self._world_model.show_vehicle_telemetry = True 421 self._world_model.hud.notification("Enabled Vehicle Telemetry") 422 except Exception: 423 logging.debug("Could not enable vehicle telemetry") 424 elif event.key == K_o: 425 try: 426 # TODO: Should be set on the agent 427 if self._world_model.doors_are_open: 428 self._world_model.hud.notification("Closing Doors") 429 self._world_model.doors_are_open = False 430 self._world_model.player.close_door(carla.VehicleDoor.All) 431 else: 432 self._world_model.hud.notification("Opening doors") 433 self._world_model.doors_are_open = True 434 self._world_model.player.open_door(carla.VehicleDoor.All) 435 except Exception: 436 logging.warning("Could not open/close doors") 437 # --- Experimental, recording ---- 438 elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL): 439 if self._world_model.recording_enabled: 440 self._world_model.get_client().stop_recorder() 441 self._world_model.recording_enabled = False 442 self._world_model.hud.notification("Recorder is OFF") 443 else: 444 self._world_model.get_client().start_recorder("manual_recording.rec") 445 self._world_model.recording_enabled = True 446 self._world_model.hud.notification("Recorder is ON") 447 elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL): 448 # stop recorder 449 self._world_model.get_client().stop_recorder() 450 self._world_model.recording_enabled = False 451 # work around to fix camera at start of replaying 452 current_index = self._world_model.camera_manager.index 453 self._world_model.destroy_sensors() 454 # disable autopilot 455 self._autopilot_enabled = False 456 self._world_model.player.set_autopilot(self._autopilot_enabled) 457 self._world_model.hud.notification("Replaying file 'manual_recording.rec'") 458 # replayer 459 # TODO: This likely needs more cleanup! 460 replay_logs = self._world_model.get_client().replay_file( 461 "manual_recording.rec", self._world_model.recording_start, 0, 0 462 ) 463 print("------ Replay logs -------\n", replay_logs) 464 self._world_model.camera_manager.set_sensor(current_index) 465 elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL): 466 if pygame.key.get_mods() & KMOD_SHIFT: 467 self._world_model.recording_start -= 10 468 else: 469 self._world_model.recording_start -= 1 470 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}") 471 elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL): 472 if pygame.key.get_mods() & KMOD_SHIFT: 473 self._world_model.recording_start += 10 474 else: 475 self._world_model.recording_start += 1 476 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}") 477 478 # Modify controls 479 if can_modify_controls: 480 if event.key == K_q: 481 self._control.gear = 1 if self._control.reverse else -1 482 elif event.key == K_p and not pygame.key.get_mods() & KMOD_CTRL: 483 # TODO: should be a toggle between None, Autopilot, Agent 484 self._autopilot_enabled = not self._autopilot_enabled 485 self._world_model.player.set_autopilot(self._autopilot_enabled) 486 self._world_model.hud.notification( 487 "Autopilot %s" % ("On" if self._autopilot_enabled else "Off") 488 ) 489 elif event.key == K_l and pygame.key.get_mods() & KMOD_CTRL: 490 current_lights ^= carla.VehicleLightState.Special1 491 elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT: 492 current_lights ^= carla.VehicleLightState.HighBeam 493 elif event.key == K_l: 494 # Use 'L' key to switch between lights: 495 # closed -> position -> low beam -> fog 496 if not self._lights & carla.VehicleLightState.Position: 497 self._world_model.hud.notification("Position lights") 498 current_lights |= carla.VehicleLightState.Position 499 else: 500 self._world_model.hud.notification("Low beam lights") 501 current_lights |= carla.VehicleLightState.LowBeam 502 if self._lights & carla.VehicleLightState.LowBeam: 503 self._world_model.hud.notification("Fog lights") 504 current_lights |= carla.VehicleLightState.Fog 505 if self._lights & carla.VehicleLightState.Fog: 506 self._world_model.hud.notification("Lights off") 507 current_lights ^= carla.VehicleLightState.Position 508 current_lights ^= carla.VehicleLightState.LowBeam 509 current_lights ^= carla.VehicleLightState.Fog 510 elif event.key == K_i: 511 current_lights ^= carla.VehicleLightState.Interior 512 elif event.key == K_z: 513 current_lights ^= carla.VehicleLightState.LeftBlinker 514 elif event.key == K_x: 515 current_lights ^= carla.VehicleLightState.RightBlinker 516 elif event.type == MOUSEBUTTONDOWN: 517 # store current mouse position for mouse-steering 518 if event.button == 1: 519 self._mouse_steering_center = event.pos 520 elif event.type == MOUSEBUTTONUP: 521 if event.button == 1: 522 self._mouse_steering_center = None 523 524 if not self._autopilot_enabled: 525 # Prase mouse events 526 if pygame.mouse.get_pressed()[0]: 527 self._parse_mouse(pygame.mouse.get_pos()) 528 if not self._control: 529 self._control = carla.VehicleControl() 530 # prev_steer_cache = self._steer_cache # NOTE: Not used anymore 531 self._parse_vehicle_keys(pygame.key.get_pressed(), self._clock.get_time()) 532 self._control.reverse = self._control.gear < 0 533 return None 534 # Moved Code from Carla example to WorldModel 535 return None
536 537 def _parse_vehicle_keys(self, keys, milliseconds): 538 """Handles manual vehicle controls via keyboard.""" 539 if keys[K_UP] or keys[K_w]: 540 self._control.throttle = min(self._control.throttle + 0.2, 1) 541 self._control.brake = 0 542 # else: 543 # self._control.throttle = max(self._control.throttle - 0.2, 0) 544 545 if keys[K_DOWN] or keys[K_s]: 546 self._control.brake = min(self._control.brake + 0.2, 1) 547 self._control.throttle = 0 548 # else: 549 # self._control.brake = max(self._control.brake - 0.2, 0) 550 551 self._steer_cache = self._control.steer 552 steer_increment = 5e-4 * milliseconds 553 if keys[K_LEFT] or keys[K_a]: 554 if self._steer_cache > 0: 555 self._steer_cache = 0 556 else: 557 self._steer_cache -= steer_increment 558 elif keys[K_RIGHT] or keys[K_d]: 559 if self._steer_cache < 0: 560 self._steer_cache = 0 561 else: 562 self._steer_cache += steer_increment 563 elif self._steer_cache > 0: 564 self._steer_cache = max(self._steer_cache - steer_increment, 0.0) 565 elif self._steer_cache < 0: 566 self._steer_cache = min(self._steer_cache + steer_increment, 0.0) 567 else: 568 self._steer_cache = 0 569 570 self._steer_cache = min(1.0, max(-1.0, self._steer_cache)) 571 self._control.steer = round(self._steer_cache, 1) 572 self._control.hand_brake = keys[K_SPACE] 573 574 def _parse_mouse(self, pos): 575 """Handles steering and throttle/break control via mouse.""" 576 if not self._mouse_steering_center: 577 return 578 579 lateral = float(pos[0] - self._mouse_steering_center[0]) 580 longitudinal = float(pos[1] - self._mouse_steering_center[1]) 581 max_val = self.MOUSE_STEERING_RANGE 582 lateral = -max_val if lateral < -max_val else min(lateral, max_val) 583 longitudinal = -max_val if longitudinal < -max_val else min(longitudinal, max_val) 584 self._control.steer = lateral / max_val 585 if longitudinal < 0.0: 586 self._control.throttle = -longitudinal / max_val 587 self._control.brake = 0.0 588 elif longitudinal > 0.0: 589 self._control.throttle = 0.0 590 self._control.brake = longitudinal / max_val
591 592 593# Stops RSS and allows hard kills if the script is stuck 594signal.signal(signal.SIGINT, RSSKeyboardControl._signal_handler)