Source code for classes.ui.keyboard_controls
1import logging
2import sys
3import inspect
4import signal
5import weakref
6from typing import TYPE_CHECKING, Any, Optional, Union
7
8import carla
9import pygame
10from pygame.locals import (
11 K_BACKSPACE,
12 K_DOWN,
13 K_EQUALS,
14 K_ESCAPE,
15 K_F1,
16 K_F2,
17 K_F3,
18 K_F4,
19 K_F5,
20 K_F6,
21 K_LEFT,
22 K_MINUS,
23 K_RIGHT,
24 K_SPACE,
25 K_TAB,
26 K_UP,
27 KMOD_CTRL,
28 KMOD_SHIFT,
29 MOUSEBUTTONDOWN,
30 MOUSEBUTTONUP,
31 K_a,
32 K_b,
33 K_d,
34 K_g,
35 K_i,
36 K_l,
37 K_n,
38 K_o,
39 K_p,
40 K_q,
41 K_r,
42 K_s,
43 K_t,
44 K_w,
45 K_x,
46 K_z,
47)
48from typing_extensions import Literal
49
50from classes import CanBeDummy
51
52if TYPE_CHECKING:
53 from classes.worldmodel import WorldModel
54
55
56# ==============================================================================
57# -- KeyboardControl -----------------------------------------------------------
58# ==============================================================================
59
60
[docs]
61class KeyboardControl:
62 """
63 Primitive base for keyboard control classes.
64
65 H/? : toggle help
66 """
67
68 _world_model: "WorldModel"
69 """Reference to the world model that controls the interface."""
70
71 # COMMENT I think this only allows to end the script
[docs]
72 def __init__(self, world_model: "WorldModel", *, show_help_notice=True, clock: Optional[pygame.time.Clock] = None):
73 """
74 Parameters:
75 world_model : WorldModel
76 show_help_notice : bool
77 Show a notice about the help keys.
78 clock : To track the game time.
79 """
80 self.enabled = True
81 self._world_model = world_model
82 # Controller should register itself on the World Model
83 world_model.controller = weakref.proxy(self)
84 if world_model.hud.help.surface is None:
85 world_model.hud.help.create_surface(self.get_docstring())
86 if show_help_notice:
87 world_model.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
88 self._clock = clock
89
92
[docs]
93 @classmethod
94 def get_docstring(cls):
95 """Return the docstring of the class"""
96
97 doc = cls.__doc__
98 # Get doc from parent
99 if doc is None:
100 doc = inspect.getdoc(cls)
101 if doc is not None and doc != object.__doc__:
102 cls.__doc__ = doc
103 else:
104 cls.__doc__ = doc = "No docstring available"
105 if doc == "No docstring available":
106 return "No docstring available"
107
108 return "======== Controls ===========\n" + doc + "\n============================\n"
109
[docs]
110 def parse_events(self, events=None) -> Union[Literal[True], Any]:
111 """
112 Parse the input events and return True if the loop should end.
113 """
114 if not self.enabled:
115 return None
116 events = events if events is not None else pygame.event.get()
117 for event in events: # pylint: disable=unused-variable
118 self._check_help_event(event)
119 return None
120
121 @staticmethod
122 def _is_quit_shortcut(key: int):
123 """Shortcut for quitting"""
124 return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
125
126 def _check_help_event(self, event: pygame.event.Event):
127 """Check if the event is a help event and displays it."""
128 if not hasattr(event, "unicode"): # No KEYUP/DOWN event
129 return None
130 if event.unicode.lower() in ("h", "?"): # type: ignore[attr-defined]
131 self._world_model.hud.help.toggle()
132 return True
133 return False
134
135
[docs]
136class PassiveKeyboardControl(KeyboardControl):
137 """
138 Does not allow to control the vehicle. Only allows to
139 quit the simulation.
140
141 | ESC : quit
142 | H/? : toggle help
143 """
144
[docs]
145 def parse_events(self) -> "Literal[True] | None":
146 if not self.enabled:
147 return None
148 for event in pygame.event.get():
149 if event.type == pygame.QUIT:
150 return True
151 if event.type == pygame.KEYUP:
152 if self._is_quit_shortcut(event.key):
153 return True
154 if self._check_help_event(event):
155 pass
156 return None
157
158
[docs]
159class RSSKeyboardControl(CanBeDummy, KeyboardControl):
160 """
161 Use ARROWS, WASD keys or mouse click and drag for control.
162
163 | W : throttle
164 | S : brake
165 | AD : steer
166 | Q : toggle reverse
167 | Space : hand-brake
168 | P : toggle autopilot (depends on setup)
169
170 | TAB : change view
171 | Backspace : change vehicle (will unset externalActor; experimental)
172
173 | R : toggle recording images to disk
174
175 | F2 : toggle RSS visualization mode
176 | F3 : increase log level
177 | F4 : decrease log level
178 | F5 : increase map log level
179 | F6 : decrease map log level
180 | B : toggle RSS Road Boundaries Mode
181 | G : RSS check drop current route (experimental)
182 | S : toggle RSS (NotImplemented)
183 | T : toggle vehicle's telemetry visualization
184 | O : open/close vehicle's doors
185 | N : pause simulation (not in sync mode)
186
187 | -- Experimental, recording --
188 |
189 | CTRL + R : toggle recording of simulation (replacing any previous)
190 | CTRL + P : start replaying last recorded simulation (untested)
191 | CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds)
192 | CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds)
193
194 | F1 : toggle HUD
195 | H/? : toggle help
196 | ESC : quit
197 """
198
199 MOUSE_STEERING_RANGE = 150
200 """
201 Controls the size of steering area when using the mouse.
202 """
203
204 # TODO: should move this to GameFramework; remnant of original carla code
205 signal_received: "bool | int" = False
206 """
207 Got a signal to stop the simulation. No more events will be parsed if True.
208
209 :meta private:
210 """
211
[docs]
212 def __init__(
213 self,
214 world_model: "WorldModel",
215 *,
216 clock: pygame.time.Clock,
217 start_in_autopilot: bool = False,
218 agent_controlled: bool = True,
219 config=None,
220 ):
221 """
222 Parameters:
223 world_model : WorldModel
224 The world model that controls the interface.
225 clock : pygame.time.Clock
226 The clock to get the time from.
227 start_in_autopilot : bool
228 If CARLA's autopilot is used; or some other system.
229 May not be True if agent_controlled is True.
230 Defaults to :code:`False`.
231 agent_controlled : bool
232 May not be True if start_in_autopilot is True.
233 Defaults to :python:`True`.
234 config : Any
235 Configuration data. Currently unused.
236
237 Note:
238 Creating a controller registers :py:attr:`.WorldModel.controller` to this instance.
239 """
240 if start_in_autopilot and agent_controlled:
241 raise ValueError("Agent controlled and autopilot cannot be active at the same time.")
242 super().__init__(world_model=world_model, clock=clock)
243 if self._clock is None:
244 raise ValueError("Clock must be provided.")
245 elif TYPE_CHECKING:
246 self._clock = self._clock
247
248 self._world_model = world_model
249 self._config = config # Note: currently unused, could use this to disable the controller
250 self._autopilot_enabled = start_in_autopilot
251 self._agent_controlled = agent_controlled
252 self._control: carla.VehicleControl = None # type: ignore[assignment]
253 # self._control = carla.VehicleControl()
254 self._vehicle_physics = world_model.player.get_physics_control()
255 self._lights = carla.VehicleLightState.NONE
256 world_model.player.set_light_state(self._lights)
257 self._steer_cache = 0.0
258 self._mouse_steering_center = None
259
260 self._surface = pygame.Surface((self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE * 2))
261 self._surface.set_colorkey(pygame.Color("black"))
262 self._surface.set_alpha(60)
263
264 line_width = 2
265 pygame.draw.polygon(
266 self._surface,
267 (0, 0, 255),
268 [
269 (0, 0),
270 (0, self.MOUSE_STEERING_RANGE * 2 - line_width),
271 (self.MOUSE_STEERING_RANGE * 2 - line_width, self.MOUSE_STEERING_RANGE * 2 - line_width),
272 (self.MOUSE_STEERING_RANGE * 2 - line_width, 0),
273 (0, 0),
274 ],
275 line_width,
276 )
277 pygame.draw.polygon(
278 self._surface,
279 (0, 0, 255),
280 [(0, self.MOUSE_STEERING_RANGE), (self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE)],
281 line_width,
282 )
283 pygame.draw.polygon(
284 self._surface,
285 (0, 0, 255),
286 [(self.MOUSE_STEERING_RANGE, 0), (self.MOUSE_STEERING_RANGE, self.MOUSE_STEERING_RANGE * 2)],
287 line_width,
288 )
289
290 @property
291 def controlled_externally(self) -> bool:
292 """
293 Returns True if the vehicle is controlled by some system, i.e not manually by the user.
294
295 """
296 return self._autopilot_enabled or self._agent_controlled
297
[docs]
298 def render(self, display: pygame.Surface) -> None:
299 if self._mouse_steering_center:
300 display.blit(
301 self._surface,
302 (
303 self._mouse_steering_center[0] - self.MOUSE_STEERING_RANGE,
304 self._mouse_steering_center[1] - self.MOUSE_STEERING_RANGE,
305 ),
306 )
307
308 @staticmethod
309 def _signal_handler(signum, _):
310 """
311 Signal handler for stopping the simulation, e.g. when pressing Ctrl+C
312 in the terminal.
313
314 Note:
315 If DetectionMatrix is used this signal handler will be overwritten,
316 but still executed.
317
318 :meta private:
319 """
320 if not RSSKeyboardControl.signal_received:
321 print(f"\nReceived signal {signum}. Trigger stopping... In case the program freezes trigger twice more.")
322 RSSKeyboardControl.signal_received = True
323 return
324 # Did not yet terminate
325 if RSSKeyboardControl.signal_received is True:
326 print(f"\nReceived signal {signum}. Abort a 3rd time to terminate the program immediately")
327 RSSKeyboardControl.signal_received = 2
328 return
329 sys.exit(1)
330
[docs]
331 def parse_events(
332 self, control: "Optional[carla.VehicleControl]" = None
333 ) -> "Literal[True] | None":
334 if control:
335 self._control = control # Note this might be the rss updated controls
336 if RSSKeyboardControl.signal_received:
337 print("\nAccepted signal. Stopping loop...")
338 return True
339 if not self.enabled:
340 return None
341 if isinstance(self._control, carla.VehicleControl):
342 current_lights = self._lights
343 can_modify_controls = True
344 else:
345 can_modify_controls = False
346 for event in pygame.event.get():
347 if event.type == pygame.QUIT:
348 return True
349 if event.type == pygame.KEYUP:
350 if self._is_quit_shortcut(event.key):
351 return True
352
353 if event.key == K_BACKSPACE:
354 self._world_model.external_actor = False # delete and respawn
355 if self._autopilot_enabled:
356 self._world_model.player.set_autopilot(False)
357 self._world_model.restart()
358 self._world_model.player.set_autopilot(True)
359 else:
360 self._world_model.restart()
361 elif event.key == K_F1:
362 self._world_model.hud.toggle_info()
363 elif self._check_help_event(event):
364 pass
365 elif event.key == K_TAB:
366 self._world_model.rss_unstructured_scene_visualizer.toggle_camera()
367 elif event.key == K_n:
368 self._world_model.toggle_pause()
369 elif event.key == K_r:
370 self._world_model.toggle_recording()
371 elif event.key == K_F2:
372 if self._world_model and self._world_model.rss_sensor:
373 self._world_model.rss_sensor.toggle_debug_visualization_mode()
374 newmode = self._world_model.rss_sensor.debug_visualizer._visualization_mode
375 self._world_model.hud.notification(f"RSS Debug Visualization Mode: {newmode}")
376 elif event.key == K_F3:
377 if self._world_model and self._world_model.rss_sensor:
378 if not self._world_model._restrictor:
379 print("WARNING: No restrictor available")
380 else:
381 self._world_model.rss_sensor.decrease_log_level()
382 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel))
383 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType]
384 elif event.key == K_F4:
385 if self._world_model and self._world_model.rss_sensor:
386 if not self._world_model._restrictor:
387 print("WARNING: No restrictor available")
388 else:
389 self._world_model.rss_sensor.increase_log_level()
390 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel))
391 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType]
392 elif event.key == K_F5:
393 if self._world_model and self._world_model.rss_sensor:
394 self._world_model.rss_sensor.decrease_map_log_level()
395 elif event.key == K_F6:
396 if self._world_model and self._world_model.rss_sensor:
397 self._world_model.rss_sensor.increase_map_log_level()
398 elif event.key == K_b:
399 if self._world_model and self._world_model.rss_sensor:
400 if self._world_model.rss_sensor.sensor.road_boundaries_mode == carla.RssRoadBoundariesMode.Off:
401 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.On)
402 print("carla.RssRoadBoundariesMode.On")
403 self._world_model.hud.notification("RSS Road Boundaries Mode: On")
404 else:
405 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.Off)
406 print("carla.RssRoadBoundariesMode.Off")
407 self._world_model.hud.notification("RSS Road Boundaries Mode: Off")
408 elif event.key == K_g:
409 if self._world_model and self._world_model.rss_sensor:
410 self._world_model.rss_sensor.drop_route()
411 # --- Experimental, copied from other locations ---
412 elif event.key == K_t:
413 if self._world_model.show_vehicle_telemetry:
414 self._world_model.player.show_debug_telemetry(False)
415 self._world_model.show_vehicle_telemetry = False
416 self._world_model.hud.notification("Disabled Vehicle Telemetry")
417 else:
418 try:
419 self._world_model.player.show_debug_telemetry(True)
420 self._world_model.show_vehicle_telemetry = True
421 self._world_model.hud.notification("Enabled Vehicle Telemetry")
422 except Exception:
423 logging.debug("Could not enable vehicle telemetry")
424 elif event.key == K_o:
425 try:
426 # TODO: Should be set on the agent
427 if self._world_model.doors_are_open:
428 self._world_model.hud.notification("Closing Doors")
429 self._world_model.doors_are_open = False
430 self._world_model.player.close_door(carla.VehicleDoor.All)
431 else:
432 self._world_model.hud.notification("Opening doors")
433 self._world_model.doors_are_open = True
434 self._world_model.player.open_door(carla.VehicleDoor.All)
435 except Exception:
436 logging.warning("Could not open/close doors")
437 # --- Experimental, recording ----
438 elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL):
439 if self._world_model.recording_enabled:
440 self._world_model.get_client().stop_recorder()
441 self._world_model.recording_enabled = False
442 self._world_model.hud.notification("Recorder is OFF")
443 else:
444 self._world_model.get_client().start_recorder("manual_recording.rec")
445 self._world_model.recording_enabled = True
446 self._world_model.hud.notification("Recorder is ON")
447 elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL):
448 # stop recorder
449 self._world_model.get_client().stop_recorder()
450 self._world_model.recording_enabled = False
451 # work around to fix camera at start of replaying
452 current_index = self._world_model.camera_manager.index
453 self._world_model.destroy_sensors()
454 # disable autopilot
455 self._autopilot_enabled = False
456 self._world_model.player.set_autopilot(self._autopilot_enabled)
457 self._world_model.hud.notification("Replaying file 'manual_recording.rec'")
458 # replayer
459 # TODO: This likely needs more cleanup!
460 replay_logs = self._world_model.get_client().replay_file(
461 "manual_recording.rec", self._world_model.recording_start, 0, 0
462 )
463 print("------ Replay logs -------\n", replay_logs)
464 self._world_model.camera_manager.set_sensor(current_index)
465 elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL):
466 if pygame.key.get_mods() & KMOD_SHIFT:
467 self._world_model.recording_start -= 10
468 else:
469 self._world_model.recording_start -= 1
470 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}")
471 elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL):
472 if pygame.key.get_mods() & KMOD_SHIFT:
473 self._world_model.recording_start += 10
474 else:
475 self._world_model.recording_start += 1
476 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}")
477
478 # Modify controls
479 if can_modify_controls:
480 if event.key == K_q:
481 self._control.gear = 1 if self._control.reverse else -1
482 elif event.key == K_p and not pygame.key.get_mods() & KMOD_CTRL:
483 # TODO: should be a toggle between None, Autopilot, Agent
484 self._autopilot_enabled = not self._autopilot_enabled
485 self._world_model.player.set_autopilot(self._autopilot_enabled)
486 self._world_model.hud.notification(
487 "Autopilot %s" % ("On" if self._autopilot_enabled else "Off")
488 )
489 elif event.key == K_l and pygame.key.get_mods() & KMOD_CTRL:
490 current_lights ^= carla.VehicleLightState.Special1
491 elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT:
492 current_lights ^= carla.VehicleLightState.HighBeam
493 elif event.key == K_l:
494 # Use 'L' key to switch between lights:
495 # closed -> position -> low beam -> fog
496 if not self._lights & carla.VehicleLightState.Position:
497 self._world_model.hud.notification("Position lights")
498 current_lights |= carla.VehicleLightState.Position
499 else:
500 self._world_model.hud.notification("Low beam lights")
501 current_lights |= carla.VehicleLightState.LowBeam
502 if self._lights & carla.VehicleLightState.LowBeam:
503 self._world_model.hud.notification("Fog lights")
504 current_lights |= carla.VehicleLightState.Fog
505 if self._lights & carla.VehicleLightState.Fog:
506 self._world_model.hud.notification("Lights off")
507 current_lights ^= carla.VehicleLightState.Position
508 current_lights ^= carla.VehicleLightState.LowBeam
509 current_lights ^= carla.VehicleLightState.Fog
510 elif event.key == K_i:
511 current_lights ^= carla.VehicleLightState.Interior
512 elif event.key == K_z:
513 current_lights ^= carla.VehicleLightState.LeftBlinker
514 elif event.key == K_x:
515 current_lights ^= carla.VehicleLightState.RightBlinker
516 elif event.type == MOUSEBUTTONDOWN:
517 # store current mouse position for mouse-steering
518 if event.button == 1:
519 self._mouse_steering_center = event.pos
520 elif event.type == MOUSEBUTTONUP:
521 if event.button == 1:
522 self._mouse_steering_center = None
523
524 if not self._autopilot_enabled:
525 # Prase mouse events
526 if pygame.mouse.get_pressed()[0]:
527 self._parse_mouse(pygame.mouse.get_pos())
528 if not self._control:
529 self._control = carla.VehicleControl()
530 # prev_steer_cache = self._steer_cache # NOTE: Not used anymore
531 self._parse_vehicle_keys(pygame.key.get_pressed(), self._clock.get_time())
532 self._control.reverse = self._control.gear < 0
533 return None
534 # Moved Code from Carla example to WorldModel
535 return None
536
537 def _parse_vehicle_keys(self, keys, milliseconds):
538 """Handles manual vehicle controls via keyboard."""
539 if keys[K_UP] or keys[K_w]:
540 self._control.throttle = min(self._control.throttle + 0.2, 1)
541 self._control.brake = 0
542 # else:
543 # self._control.throttle = max(self._control.throttle - 0.2, 0)
544
545 if keys[K_DOWN] or keys[K_s]:
546 self._control.brake = min(self._control.brake + 0.2, 1)
547 self._control.throttle = 0
548 # else:
549 # self._control.brake = max(self._control.brake - 0.2, 0)
550
551 self._steer_cache = self._control.steer
552 steer_increment = 5e-4 * milliseconds
553 if keys[K_LEFT] or keys[K_a]:
554 if self._steer_cache > 0:
555 self._steer_cache = 0
556 else:
557 self._steer_cache -= steer_increment
558 elif keys[K_RIGHT] or keys[K_d]:
559 if self._steer_cache < 0:
560 self._steer_cache = 0
561 else:
562 self._steer_cache += steer_increment
563 elif self._steer_cache > 0:
564 self._steer_cache = max(self._steer_cache - steer_increment, 0.0)
565 elif self._steer_cache < 0:
566 self._steer_cache = min(self._steer_cache + steer_increment, 0.0)
567 else:
568 self._steer_cache = 0
569
570 self._steer_cache = min(1.0, max(-1.0, self._steer_cache))
571 self._control.steer = round(self._steer_cache, 1)
572 self._control.hand_brake = keys[K_SPACE]
573
574 def _parse_mouse(self, pos):
575 """Handles steering and throttle/break control via mouse."""
576 if not self._mouse_steering_center:
577 return
578
579 lateral = float(pos[0] - self._mouse_steering_center[0])
580 longitudinal = float(pos[1] - self._mouse_steering_center[1])
581 max_val = self.MOUSE_STEERING_RANGE
582 lateral = -max_val if lateral < -max_val else min(lateral, max_val)
583 longitudinal = -max_val if longitudinal < -max_val else min(longitudinal, max_val)
584 self._control.steer = lateral / max_val
585 if longitudinal < 0.0:
586 self._control.throttle = -longitudinal / max_val
587 self._control.brake = 0.0
588 elif longitudinal > 0.0:
589 self._control.throttle = 0.0
590 self._control.brake = longitudinal / max_val
591
592
593# Stops RSS and allows hard kills if the script is stuck
594signal.signal(signal.SIGINT, RSSKeyboardControl._signal_handler)