1from dataclasses import dataclass
2from typing import NoReturn, TYPE_CHECKING
3
4import carla
5from omegaconf import II
6
7from agents.tools.config_creation import RuleConfig
8from agents.tools.logs import logger
9from agents.tools.misc import get_closest_tl_trigger_wp
10from classes.constants import Hazard, Phase, RulePriority
11from classes.evaluation_function import ConditionFunction
12from classes.exceptions import SkipInnerLoopException
13from classes.rule import BlockingRule, Context, Rule
14
15if TYPE_CHECKING:
16 from agents.tools.hints import TrafficLightDetectionResult
17
18__all__ = ["DriveSlowTowardsTrafficLight", "PassYellowTrafficLightRule"]
19
20
[docs]
21class PassYellowTrafficLightRule(Rule):
22 priority = RulePriority.HIGH
23
24 phase = Phase.DETECT_TRAFFIC_LIGHTS | Phase.END
25
[docs]
26 @dataclass
27 class self_config(RuleConfig):
28 try_to_pass: bool = False
29 """If the agent should try to pass the yellow light."""
30
31 passing_speed: float = II("max:${mul:${live_info.current_speed_limit},1.33},${speed.target_speed}")
32 """The speed the agent should try to pass the yellow light."""
33
[docs]
34 @ConditionFunction
35 @staticmethod
36 def condition(ctx: Context) -> "None | carla.TrafficLightState":
37 """Executes if a traffic light is the only hazard"""
38 if not ctx.agent.current_traffic_light:
39 return None
40 return ctx.agent.current_traffic_light.state
41
42 @condition.register_action(carla.TrafficLightState.Yellow)
43 def yellow_action(self, ctx: Context):
44 logger.info("Entering IsAtYellowTrafficLight rule.")
45 if self.self_config.try_to_pass:
46 ctx.config.speed.follow_speed_limits = False
47 ctx.config.speed.target_speed = self.self_config.passing_speed
48 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, match="exact")
49 else:
50 ctx.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW) # -> Emergency Rules
51
52
[docs]
53class DriveSlowTowardsTrafficLight(BlockingRule):
54 """
55 When the agent is at a red traffic light and it is red drive forward slowly.
56 """
57
58 priority = RulePriority.LOW
59
60 phase = Phase.EMERGENCY | Phase.BEGIN
61
62 MAX_TICKS = 2000 # 2000 * 0.05 = 100 seconds
63
64 DEFAULT_COOLDOWN_RESET = 500
65
[docs]
66 @ConditionFunction
67 def condition(self, ctx: Context):
68 """Executes if a traffic light is the only hazard"""
69 # Prevent recursive calls
70 if self in ctx.active_blocking_rules:
71 return
72 # Checks for yellow and red lights
73 return ctx.has_hazard(Hazard.TRAFFIC_LIGHT, "intersection") and not ctx.has_hazard(Hazard.OBSTACLE)
74
75 # Important need to turn this of to have custom speed limits.
76 overwrite_settings = {"speed": {"follow_speed_limits": False}, }
77
[docs]
78 @dataclass
79 class self_config(RuleConfig):
80 max_brake: float = II("divide:${controls.max_brake},8")
81 """Max break that should be applied when above the target speed."""
82
83 max_throttle: float = II("divide:${controls.max_throttle},4")
84 """Max throttle that should be applied when below the target speed."""
85
86 #@phase_callback(on_exit=Phase.CUSTOM_CYCLE | Phase.END, on_exit_exceptions=LunaticAgentException)
[docs]
87 def action(self, ctx: Context):
88 # Remove triggering hazard
89 logger.info("Entering DriveSlowTowardsTrafficLight rule.")
90
91 # Remove the hazard as we handle it below; removes yellow and red light hazards
92 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT, "intersection")
93
94 last_traffic_light = ctx.agent.current_traffic_light
95 if not last_traffic_light:
96 return # should not happen
97
98 # We do not accidentally want to drive away from the traffic light
99 # Problems:
100 # Trigger Waypoint is before the traffic light, need an alternative
101 clostest_wp, distance = get_closest_tl_trigger_wp(ctx.live_info.current_location, last_traffic_light)
102 traffic_light_group = last_traffic_light.get_group_traffic_lights()
103 last_distance = float("inf")
104
105 # Smaller list of lights to check, however calls the simulator!
106
107 affected: TrafficLightDetectionResult | bool = True
108 while affected and distance <= last_distance + 1 / 1000:
109 # You should always use ctx.agent.calculate_control() before self.loop_agent
110 # This will move the planned waypoint queue forward.
111 # However, the rule might be after the step was already calculated.
112
113 # -------------------------------------------
114 # Get the current control object of this step
115 # -------------------------------------------
116 # if you use loop_agent(ctx, execute_planner=True)
117 # this will be the same as the next_control object acquired below.
118
119 if distance < ctx.config.obstacles.base_tlight_threshold + 0.5:
120 break # End loop -> Other Emergency Rule
121
122 if not ctx.control: # Not yet set; this is the expected case
123 # Change settings before calculating the control object
124 ctx.config.speed.target_speed = min(ctx.config.speed.target_speed, distance * 2)
125 ctx.config.controls.max_brake = self.self_config.max_brake
126 ctx.config.controls.max_throttle = self.self_config.max_throttle
127 #print("Target Speed: ", ctx.config.speed.target_speed,
128 # "current speed: ", ctx.live_info.current_speed)
129 # # calculate it now
130 control = ctx.get_or_calculate_control()
131 else:
132 logger.debug("Control is already set in DriveSlowTowardsTrafficLight rule. Skipping calculation.")
133
134 # ------------ Loop Agent -------------------
135 # Logic:
136 # - ctx.get_or_calculate_control()
137 #
138 # - ctx.agent.parse_keyboard_input
139 # - ctx.agent.apply_control
140 # - self.update_world
141 #
142 # Are nearly equivalent to `BlockingRule.loop_agent`
143 # which encapsulates the above functions, the only difference is
144 # that BlockingRule.loop_agent will calculate the next control object at the end
145 # for the end of the tick when this rule is done.
146 # ------------------------------------------
147
148 print("Control: ", control)
149
150 # It is up to the user wether or not to apply controls inside a blocking rule
151 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent
152 ctx.agent.apply_control(control)
153
154 # NOTE: This ticks the world forward by one step
155 # The ctx.control is reset to None
156 # > Phase.UPDATE_INFORMATION | Phase.BEGIN
157 self.update_world(ctx)
158 # > Phase.UPDATE_INFORMATION | Phase.END
159
160 # ------------------ Check if we should continue -----------------
161
162 # Remove the traffic light hazard and check again
163 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT, "intersection")
164
165 # Other obstacles we do not want to hit
166 obstacle = ctx.agent.detect_obstacles_in_path("all")
167
168 # TODO: should be a function
169 if obstacle.obstacle_was_found:
170 if isinstance(obstacle.obstacle, carla.Vehicle):
171 ctx.agent.add_hazard(Hazard.CAR)
172 elif isinstance(obstacle.obstacle, carla.Walker):
173 ctx.agent.add_hazard(Hazard.PEDESTRIAN)
174 else:
175 ctx.agent.add_hazard(Hazard.OBSTACLE)
176 break # End loop -> Other Emergency Rule
177
178 affected = ctx.agent.detect_traffic_light(traffic_light_group)
179 if affected.traffic_light_was_found:
180 ctx.add_hazard(Hazard.TRAFFIC_LIGHT)
181 affected = True
182 else:
183 affected = False
184 last_distance = distance
185 distance = ctx.live_info.current_location.distance(clostest_wp.transform.location)
186
187 logger.info("Exiting DriveSlowTowardsTrafficLight rule after %s ticks.", self.ticks_passed)
188 if ctx.control: # NOTE: This is unset with self.update_world
189 raise SkipInnerLoopException(ctx.control)
190
[docs]
191 def max_tick_callback(self, ctx: Context) -> NoReturn:
192 if ctx.control:
193 ctx.control.brake = 1.0
194 ctx.control.throttle = 0.0
195 else:
196 ctx.config.speed.target_speed = 0.0
197 ctx.get_or_calculate_control()
198 raise SkipInnerLoopException(ctx.control) # type: ignore[arg-type]