1from dataclasses import dataclass
2from typing import NoReturn, TYPE_CHECKING
3
4import carla
5from omegaconf import II
6
7from agents.tools.config_creation import RuleConfig
8from agents.tools.logs import logger
9from agents.tools.misc import get_closest_tl_trigger_wp
10from classes.constants import Hazard, Phase, RulePriority
11from classes.evaluation_function import ConditionFunction
12from classes.exceptions import SkipInnerLoopException
13from classes.rule import BlockingRule, Context, Rule
14
15if TYPE_CHECKING:
16 from agents.tools.hints import TrafficLightDetectionResult
17
18__all__ = ["DriveSlowTowardsTrafficLight", "PassYellowTrafficLightRule"]
19
20
[docs]
21class PassYellowTrafficLightRule(Rule):
22 priority = RulePriority.HIGH
23
24 phase = Phase.DETECT_TRAFFIC_LIGHTS | Phase.END
25
[docs]
26 @dataclass
27 class self_config(RuleConfig):
28 try_to_pass: bool = False
29 """If the agent should try to pass the yellow light."""
30
31 passing_speed: float = II("max:${mul:${live_info.current_speed_limit},1.33},${speed.target_speed}")
32 """The speed the agent should try to pass the yellow light."""
33
[docs]
34 @ConditionFunction
35 @staticmethod
36 def condition(ctx: Context) -> "carla.TrafficLightState | None":
37 """Executes if a traffic light is the only hazard"""
38 if not ctx.agent.current_traffic_light:
39 return None
40 return ctx.agent.current_traffic_light.state
41
42 @condition.register_action(carla.TrafficLightState.Yellow)
43 def yellow_action(self, ctx: Context):
44 logger.info("Entering IsAtYellowTrafficLight rule.")
45 if self.self_config.try_to_pass:
46 ctx.config.speed.follow_speed_limits = False
47 ctx.config.speed.target_speed = self.self_config.passing_speed
48 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, match="exact")
49 else:
50 ctx.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW) # -> Emergency Rules
51
52
[docs]
53class DriveSlowTowardsTrafficLight(BlockingRule):
54 """
55 When the agent is at a red traffic light and it is red drive forward slowly.
56 """
57
58 priority = RulePriority.LOW
59
60 phase = Phase.EMERGENCY | Phase.BEGIN
61
62 MAX_TICKS = 2000 # 2000 * 0.05 = 100 seconds
63
64 DEFAULT_COOLDOWN_RESET = 500
65
[docs]
66 @ConditionFunction
67 def condition(self, ctx: Context):
68 """Executes if a traffic light is the only hazard"""
69 # Prevent recursive calls
70 if self in ctx.active_blocking_rules:
71 return
72 # Checks for yellow and red lights
73 return ctx.has_hazard(Hazard.TRAFFIC_LIGHT, "intersection") and not ctx.has_hazard(Hazard.OBSTACLE)
74
75 # Important need to turn this of to have custom speed limits.
76 overwrite_settings = {
77 "speed": {"follow_speed_limits": False},
78 }
79
[docs]
80 @dataclass
81 class self_config(RuleConfig):
82 max_brake: float = II("divide:${controls.max_brake},8")
83 """Max break that should be applied when above the target speed."""
84
85 max_throttle: float = II("divide:${controls.max_throttle},4")
86 """Max throttle that should be applied when below the target speed."""
87
88 # @phase_callback(on_exit=Phase.CUSTOM_CYCLE | Phase.END, on_exit_exceptions=LunaticAgentException)
[docs]
89 def action(self, ctx: Context):
90 # Remove triggering hazard
91 logger.info("Entering DriveSlowTowardsTrafficLight rule.")
92
93 # Remove the hazard as we handle it below; removes yellow and red light hazards
94 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT, "intersection")
95
96 last_traffic_light = ctx.agent.current_traffic_light
97 if not last_traffic_light:
98 return # should not happen
99
100 # We do not accidentally want to drive away from the traffic light
101 # Problems:
102 # Trigger Waypoint is before the traffic light, need an alternative
103 clostest_wp, distance = get_closest_tl_trigger_wp(ctx.live_info.current_location, last_traffic_light)
104 traffic_light_group = last_traffic_light.get_group_traffic_lights()
105 last_distance = float("inf")
106
107 # Smaller list of lights to check, however calls the simulator!
108
109 affected: TrafficLightDetectionResult | bool = True
110 while affected and distance <= last_distance + 1 / 1000:
111 # You should always use ctx.agent.calculate_control() before self.loop_agent
112 # This will move the planned waypoint queue forward.
113 # However, the rule might be after the step was already calculated.
114
115 # -------------------------------------------
116 # Get the current control object of this step
117 # -------------------------------------------
118 # if you use loop_agent(ctx, execute_planner=True)
119 # this will be the same as the next_control object acquired below.
120
121 if distance < ctx.config.obstacles.base_tlight_threshold + 0.5:
122 break # End loop -> Other Emergency Rule
123
124 if not ctx.control: # Not yet set; this is the expected case
125 # Change settings before calculating the control object
126 ctx.config.speed.target_speed = min(ctx.config.speed.target_speed, distance * 2)
127 ctx.config.controls.max_brake = self.self_config.max_brake
128 ctx.config.controls.max_throttle = self.self_config.max_throttle
129 # print("Target Speed: ", ctx.config.speed.target_speed,
130 # "current speed: ", ctx.live_info.current_speed)
131 # # calculate it now
132 control = ctx.get_or_calculate_control()
133 else:
134 logger.debug("Control is already set in DriveSlowTowardsTrafficLight rule. Skipping calculation.")
135
136 # ------------ Loop Agent -------------------
137 # Logic:
138 # - ctx.get_or_calculate_control()
139 #
140 # - ctx.agent.parse_keyboard_input
141 # - ctx.agent.apply_control
142 # - self.update_world
143 #
144 # Are nearly equivalent to `BlockingRule.loop_agent`
145 # which encapsulates the above functions, the only difference is
146 # that BlockingRule.loop_agent will calculate the next control object at the end
147 # for the end of the tick when this rule is done.
148 # ------------------------------------------
149
150 print("Control: ", control)
151
152 # It is up to the user wether or not to apply controls inside a blocking rule
153 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent
154 ctx.agent.apply_control(control)
155
156 # NOTE: This ticks the world forward by one step
157 # The ctx.control is reset to None
158 # > Phase.UPDATE_INFORMATION | Phase.BEGIN
159 self.update_world(ctx)
160 # > Phase.UPDATE_INFORMATION | Phase.END
161
162 # ------------------ Check if we should continue -----------------
163
164 # Remove the traffic light hazard and check again
165 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT, "intersection")
166
167 # Other obstacles we do not want to hit
168 obstacle = ctx.agent.detect_obstacles_in_path("all")
169
170 # TODO: should be a function
171 if obstacle.obstacle_was_found:
172 if isinstance(obstacle.obstacle, carla.Vehicle):
173 ctx.agent.add_hazard(Hazard.CAR)
174 elif isinstance(obstacle.obstacle, carla.Walker):
175 ctx.agent.add_hazard(Hazard.PEDESTRIAN)
176 else:
177 ctx.agent.add_hazard(Hazard.OBSTACLE)
178 break # End loop -> Other Emergency Rule
179
180 affected = ctx.agent.detect_traffic_light(traffic_light_group)
181 if affected.traffic_light_was_found:
182 ctx.add_hazard(Hazard.TRAFFIC_LIGHT)
183 affected = True
184 else:
185 affected = False
186 last_distance = distance
187 distance = ctx.live_info.current_location.distance(clostest_wp.transform.location)
188
189 logger.info("Exiting DriveSlowTowardsTrafficLight rule after %s ticks.", self.ticks_passed)
190 if ctx.control: # NOTE: This is unset with self.update_world
191 raise SkipInnerLoopException(ctx.control)
192
[docs]
193 def max_tick_callback(self, ctx: Context) -> NoReturn:
194 if ctx.control:
195 ctx.control.brake = 1.0
196 ctx.control.throttle = 0.0
197 else:
198 ctx.config.speed.target_speed = 0.0
199 ctx.get_or_calculate_control()
200 raise SkipInnerLoopException(ctx.control) # type: ignore[arg-type]