Source code for agents.rules.obstacles.traffic_light_rules

  1from dataclasses import dataclass
  2from typing import NoReturn, TYPE_CHECKING
  3
  4import carla
  5from omegaconf import II
  6
  7from agents.tools.config_creation import RuleConfig
  8from agents.tools.logs import logger
  9from agents.tools.misc import get_closest_tl_trigger_wp
 10from classes.constants import Hazard, Phase, RulePriority
 11from classes.evaluation_function import ConditionFunction
 12from classes.exceptions import SkipInnerLoopException
 13from classes.rule import BlockingRule, Context, Rule
 14
 15if TYPE_CHECKING:
 16    from agents.tools.hints import TrafficLightDetectionResult
 17
 18__all__ = ["DriveSlowTowardsTrafficLight", "PassYellowTrafficLightRule"]
 19
 20
[docs] 21class PassYellowTrafficLightRule(Rule): 22 priority = RulePriority.HIGH 23 24 phase = Phase.DETECT_TRAFFIC_LIGHTS | Phase.END 25
[docs] 26 @dataclass 27 class self_config(RuleConfig): 28 try_to_pass: bool = False 29 """If the agent should try to pass the yellow light.""" 30 31 passing_speed: float = II("max:${mul:${live_info.current_speed_limit},1.33},${speed.target_speed}") 32 """The speed the agent should try to pass the yellow light."""
33
[docs] 34 @ConditionFunction 35 @staticmethod 36 def condition(ctx: Context) -> "carla.TrafficLightState | None": 37 """Executes if a traffic light is the only hazard""" 38 if not ctx.agent.current_traffic_light: 39 return None 40 return ctx.agent.current_traffic_light.state
41 42 @condition.register_action(carla.TrafficLightState.Yellow) 43 def yellow_action(self, ctx: Context): 44 logger.info("Entering IsAtYellowTrafficLight rule.") 45 if self.self_config.try_to_pass: 46 ctx.config.speed.follow_speed_limits = False 47 ctx.config.speed.target_speed = self.self_config.passing_speed 48 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT_YELLOW, match="exact") 49 else: 50 ctx.add_hazard(Hazard.TRAFFIC_LIGHT_YELLOW) # -> Emergency Rules
51 52
[docs] 53class DriveSlowTowardsTrafficLight(BlockingRule): 54 """ 55 When the agent is at a red traffic light and it is red drive forward slowly. 56 """ 57 58 priority = RulePriority.LOW 59 60 phase = Phase.EMERGENCY | Phase.BEGIN 61 62 MAX_TICKS = 2000 # 2000 * 0.05 = 100 seconds 63 64 DEFAULT_COOLDOWN_RESET = 500 65
[docs] 66 @ConditionFunction 67 def condition(self, ctx: Context): 68 """Executes if a traffic light is the only hazard""" 69 # Prevent recursive calls 70 if self in ctx.active_blocking_rules: 71 return 72 # Checks for yellow and red lights 73 return ctx.has_hazard(Hazard.TRAFFIC_LIGHT, "intersection") and not ctx.has_hazard(Hazard.OBSTACLE)
74 75 # Important need to turn this of to have custom speed limits. 76 overwrite_settings = { 77 "speed": {"follow_speed_limits": False}, 78 } 79
[docs] 80 @dataclass 81 class self_config(RuleConfig): 82 max_brake: float = II("divide:${controls.max_brake},8") 83 """Max break that should be applied when above the target speed.""" 84 85 max_throttle: float = II("divide:${controls.max_throttle},4") 86 """Max throttle that should be applied when below the target speed."""
87 88 # @phase_callback(on_exit=Phase.CUSTOM_CYCLE | Phase.END, on_exit_exceptions=LunaticAgentException)
[docs] 89 def action(self, ctx: Context): 90 # Remove triggering hazard 91 logger.info("Entering DriveSlowTowardsTrafficLight rule.") 92 93 # Remove the hazard as we handle it below; removes yellow and red light hazards 94 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT, "intersection") 95 96 last_traffic_light = ctx.agent.current_traffic_light 97 if not last_traffic_light: 98 return # should not happen 99 100 # We do not accidentally want to drive away from the traffic light 101 # Problems: 102 # Trigger Waypoint is before the traffic light, need an alternative 103 clostest_wp, distance = get_closest_tl_trigger_wp(ctx.live_info.current_location, last_traffic_light) 104 traffic_light_group = last_traffic_light.get_group_traffic_lights() 105 last_distance = float("inf") 106 107 # Smaller list of lights to check, however calls the simulator! 108 109 affected: TrafficLightDetectionResult | bool = True 110 while affected and distance <= last_distance + 1 / 1000: 111 # You should always use ctx.agent.calculate_control() before self.loop_agent 112 # This will move the planned waypoint queue forward. 113 # However, the rule might be after the step was already calculated. 114 115 # ------------------------------------------- 116 # Get the current control object of this step 117 # ------------------------------------------- 118 # if you use loop_agent(ctx, execute_planner=True) 119 # this will be the same as the next_control object acquired below. 120 121 if distance < ctx.config.obstacles.base_tlight_threshold + 0.5: 122 break # End loop -> Other Emergency Rule 123 124 if not ctx.control: # Not yet set; this is the expected case 125 # Change settings before calculating the control object 126 ctx.config.speed.target_speed = min(ctx.config.speed.target_speed, distance * 2) 127 ctx.config.controls.max_brake = self.self_config.max_brake 128 ctx.config.controls.max_throttle = self.self_config.max_throttle 129 # print("Target Speed: ", ctx.config.speed.target_speed, 130 # "current speed: ", ctx.live_info.current_speed) 131 # # calculate it now 132 control = ctx.get_or_calculate_control() 133 else: 134 logger.debug("Control is already set in DriveSlowTowardsTrafficLight rule. Skipping calculation.") 135 136 # ------------ Loop Agent ------------------- 137 # Logic: 138 # - ctx.get_or_calculate_control() 139 # 140 # - ctx.agent.parse_keyboard_input 141 # - ctx.agent.apply_control 142 # - self.update_world 143 # 144 # Are nearly equivalent to `BlockingRule.loop_agent` 145 # which encapsulates the above functions, the only difference is 146 # that BlockingRule.loop_agent will calculate the next control object at the end 147 # for the end of the tick when this rule is done. 148 # ------------------------------------------ 149 150 print("Control: ", control) 151 152 # It is up to the user wether or not to apply controls inside a blocking rule 153 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent 154 ctx.agent.apply_control(control) 155 156 # NOTE: This ticks the world forward by one step 157 # The ctx.control is reset to None 158 # > Phase.UPDATE_INFORMATION | Phase.BEGIN 159 self.update_world(ctx) 160 # > Phase.UPDATE_INFORMATION | Phase.END 161 162 # ------------------ Check if we should continue ----------------- 163 164 # Remove the traffic light hazard and check again 165 ctx.discard_hazard(Hazard.TRAFFIC_LIGHT, "intersection") 166 167 # Other obstacles we do not want to hit 168 obstacle = ctx.agent.detect_obstacles_in_path("all") 169 170 # TODO: should be a function 171 if obstacle.obstacle_was_found: 172 if isinstance(obstacle.obstacle, carla.Vehicle): 173 ctx.agent.add_hazard(Hazard.CAR) 174 elif isinstance(obstacle.obstacle, carla.Walker): 175 ctx.agent.add_hazard(Hazard.PEDESTRIAN) 176 else: 177 ctx.agent.add_hazard(Hazard.OBSTACLE) 178 break # End loop -> Other Emergency Rule 179 180 affected = ctx.agent.detect_traffic_light(traffic_light_group) 181 if affected.traffic_light_was_found: 182 ctx.add_hazard(Hazard.TRAFFIC_LIGHT) 183 affected = True 184 else: 185 affected = False 186 last_distance = distance 187 distance = ctx.live_info.current_location.distance(clostest_wp.transform.location) 188 189 logger.info("Exiting DriveSlowTowardsTrafficLight rule after %s ticks.", self.ticks_passed) 190 if ctx.control: # NOTE: This is unset with self.update_world 191 raise SkipInnerLoopException(ctx.control)
192
[docs] 193 def max_tick_callback(self, ctx: Context) -> NoReturn: 194 if ctx.control: 195 ctx.control.brake = 1.0 196 ctx.control.throttle = 0.0 197 else: 198 ctx.config.speed.target_speed = 0.0 199 ctx.get_or_calculate_control() 200 raise SkipInnerLoopException(ctx.control) # type: ignore[arg-type]