Source code for classes.keyboard_controls
1import logging
2import sys
3import inspect
4import signal
5import weakref
6from typing import TYPE_CHECKING, Any, Optional, Union
7
8import carla
9import pygame
10from pygame.locals import (
11 K_BACKSPACE,
12 K_DOWN,
13 K_EQUALS,
14 K_ESCAPE,
15 K_F1,
16 K_F2,
17 K_F3,
18 K_F4,
19 K_F5,
20 K_F6,
21 K_LEFT,
22 K_MINUS,
23 K_RIGHT,
24 K_SPACE,
25 K_TAB,
26 K_UP,
27 KMOD_CTRL,
28 KMOD_SHIFT,
29 MOUSEBUTTONDOWN,
30 MOUSEBUTTONUP,
31 K_a,
32 K_b,
33 K_d,
34 K_g,
35 K_i,
36 K_l,
37 K_n,
38 K_o,
39 K_p,
40 K_q,
41 K_r,
42 K_s,
43 K_t,
44 K_w,
45 K_x,
46 K_z,
47)
48from typing_extensions import Literal
49
50if TYPE_CHECKING:
51 from classes.worldmodel import WorldModel
52
53
54# ==============================================================================
55# -- KeyboardControl -----------------------------------------------------------
56# ==============================================================================
57
58
[docs]
59class KeyboardControl:
60 """
61 Primitive base for keyboard control classes.
62
63 H/? : toggle help
64 """
65
66 _world_model: "WorldModel"
67 """Reference to the world model that controls the interface."""
68
69 # COMMENT I think this only allows to end the script
[docs]
70 def __init__(self, world: "WorldModel", help_notice=True):
71 """
72 Parameters:
73 world : WorldModel
74 help_notice : bool
75 Show a notice about the help keys.
76 """
77 self._world_model = world
78 if world.hud.help.surface is None:
79 world.hud.help.create_surface(self.get_docstring())
80 if help_notice:
81 world.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
82
[docs]
83 @classmethod
84 def get_docstring(cls):
85 """Return the docstring of the class"""
86
87 doc = cls.__doc__
88 # Get doc from parent
89 if doc is None:
90 doc = inspect.getdoc(cls)
91 if doc is not None and doc != object.__doc__:
92 cls.__doc__ = doc
93 else:
94 cls.__doc__ = doc = "No docstring available"
95 if doc == "No docstring available":
96 return "No docstring available"
97
98 return "======== Controls ===========\n" + doc + "\n============================\n"
99
[docs]
100 def parse_events(self, events=None) -> Union[Literal[True], Any]:
101 """
102 Parse the input events and return True if the loop should end.
103 """
104 events = events if events is not None else pygame.event.get()
105 for event in events: # pylint: disable=unused-variable
106 self._check_help_event(event)
107
108 @staticmethod
109 def _is_quit_shortcut(key: int):
110 """Shortcut for quitting"""
111 return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
112
113 def _check_help_event(self, event: pygame.event.Event):
114 """Check if the event is a help event"""
115 if not hasattr(event, 'unicode'): # No KEYUP/DOWN event
116 return None
117 if event.unicode.lower() in ('h', '?'): # type: ignore[attr-defined]
118 self._world_model.hud.help.toggle()
119 return True
120 return False
121
122
[docs]
123class PassiveKeyboardControl(KeyboardControl):
124 """
125 Does not allow to control the vehicle. Only allows to
126 quit the simulation.
127
128 | ESC : quit
129 | H/? : toggle help
130 """
131
[docs]
132 def parse_events(self) -> "None | Literal[True]":
133 for event in pygame.event.get():
134 if event.type == pygame.QUIT:
135 return True
136 if event.type == pygame.KEYUP:
137 if self._is_quit_shortcut(event.key):
138 return True
139 if self._check_help_event(event):
140 pass
141 return None
142
143
[docs]
144class RSSKeyboardControl(KeyboardControl):
145 """
146 Use ARROWS, WASD keys or mouse click and drag for control.
147
148 | W : throttle
149 | S : brake
150 | AD : steer
151 | Q : toggle reverse
152 | Space : hand-brake
153 | P : toggle autopilot (depends on setup)
154
155 | TAB : change view
156 | Backspace : change vehicle (will unset externalActor; experimental)
157
158 | R : toggle recording images to disk
159
160 | F2 : toggle RSS visualization mode
161 | F3 : increase log level
162 | F4 : decrease log level
163 | F5 : increase map log level
164 | F6 : decrease map log level
165 | B : toggle RSS Road Boundaries Mode
166 | G : RSS check drop current route (experimental)
167 | S : toggle RSS (NotImplemented)
168 | T : toggle vehicle's telemetry visualization
169 | O : open/close vehicle's doors
170 | N : pause simulation (not in sync mode)
171
172 | -- Experimental, recording --
173 |
174 | CTRL + R : toggle recording of simulation (replacing any previous)
175 | CTRL + P : start replaying last recorded simulation (untested)
176 | CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds)
177 | CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds)
178
179 | F1 : toggle HUD
180 | H/? : toggle help
181 | ESC : quit
182 """
183
184 MOUSE_STEERING_RANGE = 150
185 """
186 Controls the size of steering area when using the mouse.
187 """
188
189 signal_received: "bool | int" = False
190 """
191 Got a signal to stop the simulation. No more events will be parsed if True.
192
193 :meta private:
194 """
195
196 # TODO: should be a toggle between None, Autopilot, Agent
197
[docs]
198 def __init__(self, world_model: "WorldModel", start_in_autopilot: bool, agent_controlled: bool = True, clock: pygame.time.Clock = None, config=None):
199 if start_in_autopilot and agent_controlled:
200 raise ValueError("Agent controlled and autopilot cannot be active at the same time.")
201 super().__init__(world_model)
202
203 self._world_model = world_model
204 self._config = config # Note: currently unused
205 self._autopilot_enabled = start_in_autopilot
206 self._agent_controlled = agent_controlled
207 world_model.controller = weakref.proxy(self)
208 self._control: carla.VehicleControl = None
209 #self._control = carla.VehicleControl()
210 self._lights = carla.VehicleLightState.NONE
211 #self._restrictor = carla.RssRestrictor() # Moved to worldmodel
212 self._restrictor: carla.RssRestrictor = None
213 self._vehicle_physics = world_model.player.get_physics_control()
214 world_model.player.set_light_state(self._lights)
215 self._steer_cache = 0.0
216 self._mouse_steering_center = None
217
218 self._surface = pygame.Surface((self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE * 2))
219 self._surface.set_colorkey(pygame.Color('black'))
220 self._surface.set_alpha(60)
221 assert clock is not None, "Clock must be provided."
222 self._clock = clock
223
224 line_width = 2
225 pygame.draw.polygon(self._surface,
226 (0, 0, 255),
227 [
228 (0, 0),
229 (0, self.MOUSE_STEERING_RANGE * 2 - line_width),
230 (self.MOUSE_STEERING_RANGE * 2 - line_width,
231 self.MOUSE_STEERING_RANGE * 2 - line_width),
232 (self.MOUSE_STEERING_RANGE * 2 - line_width, 0),
233 (0, 0)
234 ], line_width)
235 pygame.draw.polygon(self._surface,
236 (0, 0, 255),
237 [
238 (0, self.MOUSE_STEERING_RANGE),
239 (self.MOUSE_STEERING_RANGE * 2, self.MOUSE_STEERING_RANGE)
240 ], line_width)
241 pygame.draw.polygon(self._surface,
242 (0, 0, 255),
243 [
244 (self.MOUSE_STEERING_RANGE, 0),
245 (self.MOUSE_STEERING_RANGE, self.MOUSE_STEERING_RANGE * 2)
246 ], line_width)
247
248 world_model.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
249
250 @property
251 def controlled_externally(self):
252 return self._autopilot_enabled or self._agent_controlled
253
[docs]
254 def render(self, display: pygame.Surface) -> None:
255 if self._mouse_steering_center:
256 display.blit(
257 self._surface, (self._mouse_steering_center[0] - self.MOUSE_STEERING_RANGE, self._mouse_steering_center[1] - self.MOUSE_STEERING_RANGE))
258
259 @staticmethod
260 def _signal_handler(signum, _):
261 """
262 Signal handler for stopping the simulation, e.g. when pressing Ctrl+C
263 in the terminal.
264
265 Note:
266 If DetectionMatrix is used this signal handler will be overwritten,
267 but still executed.
268
269 :meta private:
270 """
271 if not RSSKeyboardControl.signal_received:
272 print(f'\nReceived signal {signum}. Trigger stopping... In case the program freezes trigger twice more.')
273 RSSKeyboardControl.signal_received = True
274 return
275 # Did not yet terminate
276 if RSSKeyboardControl.signal_received is True:
277 print(f'\nReceived signal {signum}. Abort a 3rd time to terminate the program immediately')
278 RSSKeyboardControl.signal_received = 2
279 return
280 sys.exit(1)
281
[docs]
282 def parse_events(self, control: "Optional[carla.VehicleControl]" = None) -> "None | Literal[True]":
283 if control:
284 self._control = control # Note this might be the rss updated controls
285 if RSSKeyboardControl.signal_received:
286 print('\nAccepted signal. Stopping loop...')
287 return True
288 if isinstance(self._control, carla.VehicleControl):
289 current_lights = self._lights
290 for event in pygame.event.get():
291 if event.type == pygame.QUIT:
292 return True
293 if event.type == pygame.KEYUP:
294 if self._is_quit_shortcut(event.key):
295 return True
296
297 if event.key == K_BACKSPACE:
298 self._world_model.external_actor = None
299 if self._autopilot_enabled:
300 self._world_model.player.set_autopilot(False)
301 self._world_model.restart()
302 self._world_model.player.set_autopilot(True)
303 else:
304 self._world_model.restart()
305 elif event.key == K_F1:
306 self._world_model.hud.toggle_info()
307 elif self._check_help_event(event):
308 pass
309 elif event.key == K_TAB:
310 self._world_model.rss_unstructured_scene_visualizer.toggle_camera()
311 elif event.key == K_n:
312 self._world_model.toggle_pause()
313 elif event.key == K_r:
314 self._world_model.toggle_recording()
315 elif event.key == K_F2:
316 if self._world_model and self._world_model.rss_sensor:
317 self._world_model.rss_sensor.toggle_debug_visualization_mode()
318 _newmode = self._world_model.rss_sensor.debug_visualizer._visualization_mode
319 self._world_model.hud.notification(f"RSS Debug Visualization Mode: {_newmode}")
320 elif event.key == K_F3:
321 if self._world_model and self._world_model.rss_sensor:
322 if not self._world_model._restrictor:
323 print("WARNING: No restrictor available")
324 else:
325 self._world_model.rss_sensor.decrease_log_level()
326 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel))
327 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType]
328 elif event.key == K_F4:
329 if self._world_model and self._world_model.rss_sensor:
330 if not self._world_model._restrictor:
331 print("WARNING: No restrictor available")
332 else:
333 self._world_model.rss_sensor.increase_log_level()
334 assert isinstance(self._world_model.rss_sensor.log_level, (int, carla.RssLogLevel))
335 self._world_model._restrictor.set_log_level(self._world_model.rss_sensor.log_level) # pyright: ignore[reportArgumentType]
336 elif event.key == K_F5:
337 if self._world_model and self._world_model.rss_sensor:
338 self._world_model.rss_sensor.decrease_map_log_level()
339 elif event.key == K_F6:
340 if self._world_model and self._world_model.rss_sensor:
341 self._world_model.rss_sensor.increase_map_log_level()
342 elif event.key == K_b:
343 if self._world_model and self._world_model.rss_sensor:
344 if self._world_model.rss_sensor.sensor.road_boundaries_mode == carla.RssRoadBoundariesMode.Off:
345 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.On)
346 print("carla.RssRoadBoundariesMode.On")
347 self._world_model.hud.notification("RSS Road Boundaries Mode: On")
348 else:
349 self._world_model.rss_set_road_boundaries_mode(carla.RssRoadBoundariesMode.Off)
350 print("carla.RssRoadBoundariesMode.Off")
351 self._world_model.hud.notification("RSS Road Boundaries Mode: Off")
352 elif event.key == K_g:
353 if self._world_model and self._world_model.rss_sensor:
354 self._world_model.rss_sensor.drop_route()
355 # --- Experimental, copied from other locations ---
356 elif event.key == K_t:
357 if self._world_model.show_vehicle_telemetry:
358 self._world_model.player.show_debug_telemetry(False)
359 self._world_model.show_vehicle_telemetry = False
360 self._world_model.hud.notification("Disabled Vehicle Telemetry")
361 else:
362 try:
363 self._world_model.player.show_debug_telemetry(True)
364 self._world_model.show_vehicle_telemetry = True
365 self._world_model.hud.notification("Enabled Vehicle Telemetry")
366 except Exception:
367 logging.debug("Could not enable vehicle telemetry")
368 elif event.key == K_o:
369 try:
370 # TODO: Should be set on the agent
371 if self._world_model.doors_are_open:
372 self._world_model.hud.notification("Closing Doors")
373 self._world_model.doors_are_open = False
374 self._world_model.player.close_door(carla.VehicleDoor.All)
375 else:
376 self._world_model.hud.notification("Opening doors")
377 self._world_model.doors_are_open = True
378 self._world_model.player.open_door(carla.VehicleDoor.All)
379 except Exception:
380 logging.warning("Could not open/close doors")
381 # --- Experimental, recording ----
382 elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL):
383 if (self._world_model.recording_enabled):
384 self._world_model.get_client().stop_recorder()
385 self._world_model.recording_enabled = False
386 self._world_model.hud.notification("Recorder is OFF")
387 else:
388 self._world_model.get_client().start_recorder("manual_recording.rec")
389 self._world_model.recording_enabled = True
390 self._world_model.hud.notification("Recorder is ON")
391 elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL):
392 # stop recorder
393 self._world_model.get_client().stop_recorder()
394 self._world_model.recording_enabled = False
395 # work around to fix camera at start of replaying
396 current_index = self._world_model.camera_manager.index
397 self._world_model.destroy_sensors()
398 # disable autopilot
399 self._autopilot_enabled = False
400 self._world_model.player.set_autopilot(self._autopilot_enabled)
401 self._world_model.hud.notification("Replaying file 'manual_recording.rec'")
402 # replayer
403 # TODO: This likely needs more cleanup!
404 replay_logs = self._world_model.get_client().replay_file("manual_recording.rec", self._world_model.recording_start, 0, 0)
405 print("------ Replay logs -------\n", replay_logs)
406 self._world_model.camera_manager.set_sensor(current_index)
407 elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL):
408 if pygame.key.get_mods() & KMOD_SHIFT:
409 self._world_model.recording_start -= 10
410 else:
411 self._world_model.recording_start -= 1
412 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}")
413 elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL):
414 if pygame.key.get_mods() & KMOD_SHIFT:
415 self._world_model.recording_start += 10
416 else:
417 self._world_model.recording_start += 1
418 self._world_model.hud.notification(f"Recording start time is {self._world_model.recording_start}")
419
420 # Modify controls
421 if isinstance(self._control, carla.VehicleControl):
422 if event.key == K_q:
423 self._control.gear = 1 if self._control.reverse else -1
424 elif event.key == K_p and not pygame.key.get_mods() & KMOD_CTRL:
425 # TODO: should be a toggle between None, Autopilot, Agent
426 self._autopilot_enabled = not self._autopilot_enabled
427 self._world_model.player.set_autopilot(self._autopilot_enabled)
428 self._world_model.hud.notification(
429 'Autopilot %s' % ('On' if self._autopilot_enabled else 'Off'))
430 elif event.key == K_l and pygame.key.get_mods() & KMOD_CTRL:
431 current_lights ^= carla.VehicleLightState.Special1
432 elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT:
433 current_lights ^= carla.VehicleLightState.HighBeam
434 elif event.key == K_l:
435 # Use 'L' key to switch between lights:
436 # closed -> position -> low beam -> fog
437 if not self._lights & carla.VehicleLightState.Position:
438 self._world_model.hud.notification("Position lights")
439 current_lights |= carla.VehicleLightState.Position
440 else:
441 self._world_model.hud.notification("Low beam lights")
442 current_lights |= carla.VehicleLightState.LowBeam
443 if self._lights & carla.VehicleLightState.LowBeam:
444 self._world_model.hud.notification("Fog lights")
445 current_lights |= carla.VehicleLightState.Fog
446 if self._lights & carla.VehicleLightState.Fog:
447 self._world_model.hud.notification("Lights off")
448 current_lights ^= carla.VehicleLightState.Position
449 current_lights ^= carla.VehicleLightState.LowBeam
450 current_lights ^= carla.VehicleLightState.Fog
451 elif event.key == K_i:
452 current_lights ^= carla.VehicleLightState.Interior
453 elif event.key == K_z:
454 current_lights ^= carla.VehicleLightState.LeftBlinker
455 elif event.key == K_x:
456 current_lights ^= carla.VehicleLightState.RightBlinker
457 elif event.type == MOUSEBUTTONDOWN:
458 # store current mouse position for mouse-steering
459 if event.button == 1:
460 self._mouse_steering_center = event.pos
461 elif event.type == MOUSEBUTTONUP:
462 if event.button == 1:
463 self._mouse_steering_center = None
464
465 if not self._autopilot_enabled:
466 #prev_steer_cache = self._steer_cache # NOTE: Not used anymore
467 self._parse_vehicle_keys(pygame.key.get_pressed(), self._clock.get_time())
468 if pygame.mouse.get_pressed()[0]:
469 self._parse_mouse(pygame.mouse.get_pos())
470 self._control.reverse = self._control.gear < 0
471 return None
472 # Moved Code from Carla example to WorldModel
473 return None
474
475 def _parse_vehicle_keys(self, keys, milliseconds):
476 """Handles manual vehicle controls via keyboard."""
477 if keys[K_UP] or keys[K_w]:
478 self._control.throttle = min(self._control.throttle + 0.2, 1)
479 self._control.brake = 0
480 #else:
481 # self._control.throttle = max(self._control.throttle - 0.2, 0)
482
483 if keys[K_DOWN] or keys[K_s]:
484 self._control.brake = min(self._control.brake + 0.2, 1)
485 self._control.throttle = 0
486 #else:
487 # self._control.brake = max(self._control.brake - 0.2, 0)
488
489 self._steer_cache = self._control.steer
490 steer_increment = 5e-4 * milliseconds
491 if keys[K_LEFT] or keys[K_a]:
492 if self._steer_cache > 0:
493 self._steer_cache = 0
494 else:
495 self._steer_cache -= steer_increment
496 elif keys[K_RIGHT] or keys[K_d]:
497 if self._steer_cache < 0:
498 self._steer_cache = 0
499 else:
500 self._steer_cache += steer_increment
501 elif self._steer_cache > 0:
502 self._steer_cache = max(self._steer_cache - steer_increment, 0.0)
503 elif self._steer_cache < 0:
504 self._steer_cache = min(self._steer_cache + steer_increment, 0.0)
505 else:
506 self._steer_cache = 0
507
508 self._steer_cache = min(1.0, max(-1.0, self._steer_cache))
509 self._control.steer = round(self._steer_cache, 1)
510 self._control.hand_brake = keys[K_SPACE]
511
512 def _parse_mouse(self, pos):
513 """Handles steering and throttle/break control via mouse."""
514 if not self._mouse_steering_center:
515 return
516
517 lateral = float(pos[0] - self._mouse_steering_center[0])
518 longitudinal = float(pos[1] - self._mouse_steering_center[1])
519 max_val = self.MOUSE_STEERING_RANGE
520 lateral = -max_val if lateral < -max_val else min(lateral, max_val)
521 longitudinal = -max_val if longitudinal < -max_val else min(longitudinal, max_val)
522 self._control.steer = lateral / max_val
523 if longitudinal < 0.0:
524 self._control.throttle = -longitudinal / max_val
525 self._control.brake = 0.0
526 elif longitudinal > 0.0:
527 self._control.throttle = 0.0
528 self._control.brake = longitudinal / max_val
529
530
531# Stops RSS and allows hard kills if the script is stuck
532signal.signal(signal.SIGINT, RSSKeyboardControl._signal_handler)