Source code for classes.keyboard_controls

  1import logging
  2import sys
  3import inspect
  4import signal
  5import weakref
  6from typing import TYPE_CHECKING, Any, Optional, Union
  7
  8import carla
  9import pygame
 10from pygame.locals import (
 11    K_BACKSPACE,
 12    K_DOWN,
 13    K_EQUALS,
 14    K_ESCAPE,
 15    K_F1,
 16    K_F2,
 17    K_F3,
 18    K_F4,
 19    K_F5,
 20    K_F6,
 21    K_LEFT,
 22    K_MINUS,
 23    K_RIGHT,
 24    K_SPACE,
 25    K_TAB,
 26    K_UP,
 27    KMOD_CTRL,
 28    KMOD_SHIFT,
 29    MOUSEBUTTONDOWN,
 30    MOUSEBUTTONUP,
 31    K_a,
 32    K_b,
 33    K_d,
 34    K_g,
 35    K_i,
 36    K_l,
 37    K_n,
 38    K_o,
 39    K_p,
 40    K_q,
 41    K_r,
 42    K_s,
 43    K_t,
 44    K_w,
 45    K_x,
 46    K_z,
 47)
 48from typing_extensions import Literal
 49
 50if TYPE_CHECKING:
 51    from classes.worldmodel import WorldModel
 52
 53
 54# ==============================================================================
 55# -- KeyboardControl -----------------------------------------------------------
 56# ==============================================================================
 57
 58
[docs] 59class KeyboardControl: 60 """ 61 Primitive base for keyboard control classes. 62 63 H/? : toggle help 64 """ 65 66 _world_model: "WorldModel" 67 """Reference to the world model that controls the interface.""" 68 69 # COMMENT I think this only allows to end the script
[docs] 70 def __init__(self, world: "WorldModel", help_notice=True): 71 """ 72 Parameters: 73 world : WorldModel 74 help_notice : bool 75 Show a notice about the help keys. 76 """ 77 self._world_model = world 78 if world.hud.help.surface is None: 79 world.hud.help.create_surface(self.get_docstring()) 80 if help_notice: 81 world.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
82
[docs] 83 @classmethod 84 def get_docstring(cls): 85 """Return the docstring of the class""" 86 87 doc = cls.__doc__ 88 # Get doc from parent 89 if doc is None: 90 doc = inspect.getdoc(cls) 91 if doc is not None and doc != object.__doc__: 92 cls.__doc__ = doc 93 else: 94 cls.__doc__ = doc = "No docstring available" 95 if doc == "No docstring available": 96 return "No docstring available" 97 98 return "======== Controls ===========\n" + doc + "\n============================\n"
99
[docs] 100 def parse_events(self, events=None) -> Union[Literal[True], Any]: 101 """ 102 Parse the input events and return True if the loop should end. 103 """ 104 events = events if events is not None else pygame.event.get() 105 for event in events: # pylint: disable=unused-variable 106 self._check_help_event(event)
107 108 @staticmethod 109 def _is_quit_shortcut(key: int): 110 """Shortcut for quitting""" 111 return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL) 112 113 def _check_help_event(self, event: pygame.event.Event): 114 """Check if the event is a help event""" 115 if not hasattr(event, 'unicode'): # No KEYUP/DOWN event 116 return None 117 if event.unicode.lower() in ('h', '?'): # type: ignore[attr-defined] 118 self._world_model.hud.help.toggle() 119 return True 120 return False
121 122
[docs] 123class PassiveKeyboardControl(KeyboardControl): 124 """ 125 Does not allow to control the vehicle. Only allows to 126 quit the simulation. 127 128 | ESC : quit 129 | H/? : toggle help 130 """ 131
[docs] 132 def parse_events(self) -> "None | Literal[True]": 133 for event in pygame.event.get(): 134 if event.type == pygame.QUIT: 135 return True 136 if event.type == pygame.KEYUP: 137 if self._is_quit_shortcut(event.key): 138 return True 139 if self._check_help_event(event): 140 pass 141 return None
142 143
[docs] 144class RSSKeyboardControl(KeyboardControl): 145 """ 146 Use ARROWS, WASD keys or mouse click and drag for control. 147 148 | W : throttle 149 | S : brake 150 | AD : steer 151 | Q : toggle reverse 152 | Space : hand-brake 153 | P : toggle autopilot (depends on setup) 154 155 | TAB : change view 156 | Backspace : change vehicle (will unset externalActor; experimental) 157 158 | R : toggle recording images to disk 159 160 | F2 : toggle RSS visualization mode 161 | F3 : increase log level 162 | F4 : decrease log level 163 | F5 : increase map log level 164 | F6 : decrease map log level 165 | B : toggle RSS Road Boundaries Mode 166 | G : RSS check drop current route (experimental) 167 | S : toggle RSS (NotImplemented) 168 | T : toggle vehicle's telemetry visualization 169 | O : open/close vehicle's doors 170 | N : pause simulation (not in sync mode) 171 172 | -- Experimental, recording -- 173 | 174 | CTRL + R : toggle recording of simulation (replacing any previous) 175 | CTRL + P : start replaying last recorded simulation (untested) 176 | CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds) 177 | CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds) 178 179 | F1 : toggle HUD 180 | H/? : toggle help 181 | ESC : quit 182 """ 183 184 MOUSE_STEERING_RANGE = 150 185 """ 186 Controls the size of steering area when using the mouse. 187 """ 188 189 signal_received: "bool | int" = False 190 """ 191 Got a signal to stop the simulation. No more events will be parsed if True. 192 193 :meta private: 194 """ 195 196 # TODO: should be a toggle between None, Autopilot, Agent 197
[docs] 198 def __init__(self, world_model: "WorldModel", start_in_autopilot: bool, agent_controlled: bool = True, clock: pygame.time.Clock = None, config=None): 199 if start_in_autopilot and agent_controlled: 200 raise ValueError("Agent controlled and autopilot cannot be active at the same time.") 201 super().__init__(world_model) 202 203 self._world_model = world_model 204 self._config = config # Note: currently unused 205 self._autopilot_enabled = start_in_autopilot 206 self._agent_controlled = agent_controlled 207 world_model.controller = weakref.proxy(self) 208 self._control: carla.VehicleControl = None 209 #self._control = carla.VehicleControl() 210 self._lights = carla.VehicleLightState.NONE 211 #self._restrictor = carla.RssRestrictor() # Moved to worldmodel 212 self._restrictor: carla.RssRestrictor = None 213 self._vehicle_physics = world_model.player.get_physics_control() 214 world_model.player.set_light_state(self._lights) 215 self._steer_cache = 0.0 216 self._mouse_steering_center = None 217 218 self._surface = pygame.Surface((self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE * 2)) 219 self._surface.set_colorkey(pygame.Color('black')) 220 self._surface.set_alpha(60) 221 assert clock is not None, "Clock must be provided." 222 self._clock = clock 223 224 line_width = 2 225 pygame.draw.polygon(self._surface, 226 (0, 0, 255), 227 [ 228 (0, 0), 229 (0, self.MOUSE_STEERING_RANGE * 2 - line_width), 230 (self.MOUSE_STEERING_RANGE * 2 - line_width, 231 self.MOUSE_STEERING_RANGE * 2 - line_width), 232 (self.MOUSE_STEERING_RANGE * 2 - line_width, 0), 233 (0, 0) 234 ], line_width) 235 pygame.draw.polygon(self._surface, 236 (0, 0, 255), 237 [ 238 (0, self.MOUSE_STEERING_RANGE), 239 (self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE) 240 ], line_width) 241 pygame.draw.polygon(self._surface, 242 (0, 0, 255), 243 [ 244 (self.MOUSE_STEERING_RANGE, 0), 245 (self.MOUSE_STEERING_RANGE, self.MOUSE_STEERING_RANGE * 2) 246 ], line_width) 247 248 world_model.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
249 250 @property 251 def controlled_externally(self): 252 return self._autopilot_enabled or self._agent_controlled 253
[docs] 254 def render(self, display: pygame.Surface) -> None: 255 if self._mouse_steering_center: 256 display.blit( 257 self._surface, (self._mouse_steering_center[0] - self.MOUSE_STEERING_RANGE, self._mouse_steering_center[1] - self.MOUSE_STEERING_RANGE))
258 259 @staticmethod 260 def _signal_handler(signum, _): 261 """ 262 Signal handler for stopping the simulation, e.g. when pressing Ctrl+C 263 in the terminal. 264 265 Note: 266 If DetectionMatrix is used this signal handler will be overwritten, 267 but still executed. 268 269 :meta private: 270 """ 271 if not RSSKeyboardControl.signal_received: 272 print(f'\nReceived signal {signum}. Trigger stopping... In case the program freezes trigger twice more.') 273 RSSKeyboardControl.signal_received = True 274 return 275 # Did not yet terminate 276 if RSSKeyboardControl.signal_received is True: 277 print(f'\nReceived signal {signum}. Abort a 3rd time to terminate the program immediately') 278 RSSKeyboardControl.signal_received = 2 279 return 280 sys.exit(1) 281
[docs] 282 def parse_events(self, control: "Optional[carla.VehicleControl]" = None) -> "None | Literal[True]": 283 if control: 284 self._control = control # Note this might be the rss updated controls 285 if RSSKeyboardControl.signal_received: 286 print('\nAccepted signal. Stopping loop...') 287 return True 288 if isinstance(self._control, carla.VehicleControl): 289 current_lights = self._lights 290 for event in pygame.event.get(): 291 if event.type == pygame.QUIT: 292 return True 293 if event.type == pygame.KEYUP: 294 if self._is_quit_shortcut(event.key): 295 return True 296 297 if event.key == K_BACKSPACE: 298 self._world_model.external_actor = None 299 if self._autopilot_enabled: 300 self._world_model.player.set_autopilot(False) 301 self._world_model.restart() 302 self._world_model.player.set_autopilot(True) 303 else: 304 self._world_model.restart() 305 elif event.key == K_F1: 306 self._world_model.hud.toggle_info() 307 elif self._check_help_event(event): 308 pass 309 elif event.key == K_TAB: 310 self._world_model.rss_unstructured_scene_visualizer.toggle_camera() 311 elif event.key == K_n: 312 self._world_model.toggle_pause() 313 elif event.key == K_r: 314 self._world_model.toggle_recording() 315 elif event.key == K_F2: 316 if self._world_model and self._world_model.rss_sensor: 317 self._world_model.rss_sensor.toggle_debug_visualization_mode() 318 _newmode = self._world_model.rss_sensor.debug_visualizer._visualization_mode 319 self._world_model.hud.notification(f"RSS Debug Visualization Mode: {_newmode}") 320 elif event.key == K_F3: 321 if self._world_model and self._world_model.rss_sensor: 322 if not self._world_model._restrictor: 323 print("WARNING: No restrictor available") 324 else: 325 self._world_model.rss_sensor.decrease_log_level() 326 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel)) 327 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType] 328 elif event.key == K_F4: 329 if self._world_model and self._world_model.rss_sensor: 330 if not self._world_model._restrictor: 331 print("WARNING: No restrictor available") 332 else: 333 self._world_model.rss_sensor.increase_log_level() 334 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel)) 335 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType] 336 elif event.key == K_F5: 337 if self._world_model and self._world_model.rss_sensor: 338 self._world_model.rss_sensor.decrease_map_log_level() 339 elif event.key == K_F6: 340 if self._world_model and self._world_model.rss_sensor: 341 self._world_model.rss_sensor.increase_map_log_level() 342 elif event.key == K_b: 343 if self._world_model and self._world_model.rss_sensor: 344 if self._world_model.rss_sensor.sensor.road_boundaries_mode == carla.RssRoadBoundariesMode.Off: 345 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.On) 346 print("carla.RssRoadBoundariesMode.On") 347 self._world_model.hud.notification("RSS Road Boundaries Mode: On") 348 else: 349 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.Off) 350 print("carla.RssRoadBoundariesMode.Off") 351 self._world_model.hud.notification("RSS Road Boundaries Mode: Off") 352 elif event.key == K_g: 353 if self._world_model and self._world_model.rss_sensor: 354 self._world_model.rss_sensor.drop_route() 355 # --- Experimental, copied from other locations --- 356 elif event.key == K_t: 357 if self._world_model.show_vehicle_telemetry: 358 self._world_model.player.show_debug_telemetry(False) 359 self._world_model.show_vehicle_telemetry = False 360 self._world_model.hud.notification("Disabled Vehicle Telemetry") 361 else: 362 try: 363 self._world_model.player.show_debug_telemetry(True) 364 self._world_model.show_vehicle_telemetry = True 365 self._world_model.hud.notification("Enabled Vehicle Telemetry") 366 except Exception: 367 logging.debug("Could not enable vehicle telemetry") 368 elif event.key == K_o: 369 try: 370 # TODO: Should be set on the agent 371 if self._world_model.doors_are_open: 372 self._world_model.hud.notification("Closing Doors") 373 self._world_model.doors_are_open = False 374 self._world_model.player.close_door(carla.VehicleDoor.All) 375 else: 376 self._world_model.hud.notification("Opening doors") 377 self._world_model.doors_are_open = True 378 self._world_model.player.open_door(carla.VehicleDoor.All) 379 except Exception: 380 logging.warning("Could not open/close doors") 381 # --- Experimental, recording ---- 382 elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL): 383 if (self._world_model.recording_enabled): 384 self._world_model.get_client().stop_recorder() 385 self._world_model.recording_enabled = False 386 self._world_model.hud.notification("Recorder is OFF") 387 else: 388 self._world_model.get_client().start_recorder("manual_recording.rec") 389 self._world_model.recording_enabled = True 390 self._world_model.hud.notification("Recorder is ON") 391 elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL): 392 # stop recorder 393 self._world_model.get_client().stop_recorder() 394 self._world_model.recording_enabled = False 395 # work around to fix camera at start of replaying 396 current_index = self._world_model.camera_manager.index 397 self._world_model.destroy_sensors() 398 # disable autopilot 399 self._autopilot_enabled = False 400 self._world_model.player.set_autopilot(self._autopilot_enabled) 401 self._world_model.hud.notification("Replaying file 'manual_recording.rec'") 402 # replayer 403 # TODO: This likely needs more cleanup! 404 replay_logs = self._world_model.get_client().replay_file("manual_recording.rec", self._world_model.recording_start, 0, 0) 405 print("------ Replay logs -------\n", replay_logs) 406 self._world_model.camera_manager.set_sensor(current_index) 407 elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL): 408 if pygame.key.get_mods() & KMOD_SHIFT: 409 self._world_model.recording_start -= 10 410 else: 411 self._world_model.recording_start -= 1 412 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}") 413 elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL): 414 if pygame.key.get_mods() & KMOD_SHIFT: 415 self._world_model.recording_start += 10 416 else: 417 self._world_model.recording_start += 1 418 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}") 419 420 # Modify controls 421 if isinstance(self._control, carla.VehicleControl): 422 if event.key == K_q: 423 self._control.gear = 1 if self._control.reverse else -1 424 elif event.key == K_p and not pygame.key.get_mods() & KMOD_CTRL: 425 # TODO: should be a toggle between None, Autopilot, Agent 426 self._autopilot_enabled = not self._autopilot_enabled 427 self._world_model.player.set_autopilot(self._autopilot_enabled) 428 self._world_model.hud.notification( 429 'Autopilot %s' % ('On' if self._autopilot_enabled else 'Off')) 430 elif event.key == K_l and pygame.key.get_mods() & KMOD_CTRL: 431 current_lights ^= carla.VehicleLightState.Special1 432 elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT: 433 current_lights ^= carla.VehicleLightState.HighBeam 434 elif event.key == K_l: 435 # Use 'L' key to switch between lights: 436 # closed -> position -> low beam -> fog 437 if not self._lights & carla.VehicleLightState.Position: 438 self._world_model.hud.notification("Position lights") 439 current_lights |= carla.VehicleLightState.Position 440 else: 441 self._world_model.hud.notification("Low beam lights") 442 current_lights |= carla.VehicleLightState.LowBeam 443 if self._lights & carla.VehicleLightState.LowBeam: 444 self._world_model.hud.notification("Fog lights") 445 current_lights |= carla.VehicleLightState.Fog 446 if self._lights & carla.VehicleLightState.Fog: 447 self._world_model.hud.notification("Lights off") 448 current_lights ^= carla.VehicleLightState.Position 449 current_lights ^= carla.VehicleLightState.LowBeam 450 current_lights ^= carla.VehicleLightState.Fog 451 elif event.key == K_i: 452 current_lights ^= carla.VehicleLightState.Interior 453 elif event.key == K_z: 454 current_lights ^= carla.VehicleLightState.LeftBlinker 455 elif event.key == K_x: 456 current_lights ^= carla.VehicleLightState.RightBlinker 457 elif event.type == MOUSEBUTTONDOWN: 458 # store current mouse position for mouse-steering 459 if event.button == 1: 460 self._mouse_steering_center = event.pos 461 elif event.type == MOUSEBUTTONUP: 462 if event.button == 1: 463 self._mouse_steering_center = None 464 465 if not self._autopilot_enabled: 466 #prev_steer_cache = self._steer_cache # NOTE: Not used anymore 467 self._parse_vehicle_keys(pygame.key.get_pressed(), self._clock.get_time()) 468 if pygame.mouse.get_pressed()[0]: 469 self._parse_mouse(pygame.mouse.get_pos()) 470 self._control.reverse = self._control.gear < 0 471 return None 472 # Moved Code from Carla example to WorldModel 473 return None
474 475 def _parse_vehicle_keys(self, keys, milliseconds): 476 """Handles manual vehicle controls via keyboard.""" 477 if keys[K_UP] or keys[K_w]: 478 self._control.throttle = min(self._control.throttle + 0.2, 1) 479 self._control.brake = 0 480 #else: 481 # self._control.throttle = max(self._control.throttle - 0.2, 0) 482 483 if keys[K_DOWN] or keys[K_s]: 484 self._control.brake = min(self._control.brake + 0.2, 1) 485 self._control.throttle = 0 486 #else: 487 # self._control.brake = max(self._control.brake - 0.2, 0) 488 489 self._steer_cache = self._control.steer 490 steer_increment = 5e-4 * milliseconds 491 if keys[K_LEFT] or keys[K_a]: 492 if self._steer_cache > 0: 493 self._steer_cache = 0 494 else: 495 self._steer_cache -= steer_increment 496 elif keys[K_RIGHT] or keys[K_d]: 497 if self._steer_cache < 0: 498 self._steer_cache = 0 499 else: 500 self._steer_cache += steer_increment 501 elif self._steer_cache > 0: 502 self._steer_cache = max(self._steer_cache - steer_increment, 0.0) 503 elif self._steer_cache < 0: 504 self._steer_cache = min(self._steer_cache + steer_increment, 0.0) 505 else: 506 self._steer_cache = 0 507 508 self._steer_cache = min(1.0, max(-1.0, self._steer_cache)) 509 self._control.steer = round(self._steer_cache, 1) 510 self._control.hand_brake = keys[K_SPACE] 511 512 def _parse_mouse(self, pos): 513 """Handles steering and throttle/break control via mouse.""" 514 if not self._mouse_steering_center: 515 return 516 517 lateral = float(pos[0] - self._mouse_steering_center[0]) 518 longitudinal = float(pos[1] - self._mouse_steering_center[1]) 519 max_val = self.MOUSE_STEERING_RANGE 520 lateral = -max_val if lateral < -max_val else min(lateral, max_val) 521 longitudinal = -max_val if longitudinal < -max_val else min(longitudinal, max_val) 522 self._control.steer = lateral / max_val 523 if longitudinal < 0.0: 524 self._control.throttle = -longitudinal / max_val 525 self._control.brake = 0.0 526 elif longitudinal > 0.0: 527 self._control.throttle = 0.0 528 self._control.brake = longitudinal / max_val
529 530 531# Stops RSS and allows hard kills if the script is stuck 532signal.signal(signal.SIGINT, RSSKeyboardControl._signal_handler)