Source code for agents.tools.lunatic_agent_tools

  1
  2"""
  3Helper functions and methods for the :py:class:`.LunaticAgent`, some methods are variants
  4from the original CARLA agents that have been simplified and outsourced to this
  5module.
  6"""
  7# pyright: strict
  8# pyright: reportUnnecessaryIsInstance=information
  9# pyright: reportPrivateUsage=false
 10# pyright: reportTypeCommentUsage=none
 11
 12from __future__ import annotations
 13
 14import sys
 15from functools import partial, wraps
 16from inspect import isclass
 17from operator import attrgetter
 18from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Sequence, Tuple, Union
 19from typing import cast as assure_type
 20
 21import carla
 22from omegaconf import DictConfig
 23from shapely.geometry import Polygon
 24from typing_extensions import Concatenate, Literal, ParamSpec, TypeVar, assert_never
 25
 26from agents.tools.config_creation import AgentConfig
 27from agents.tools.hints import ObstacleDetectionResult
 28from agents.tools.logs import logger
 29from agents.tools.misc import is_within_distance
 30from classes.constants import Phase, RoadOption
 31from classes.exceptions import EmergencyStopException, LunaticAgentException
 32from launch_tools import CarlaDataProvider
 33
 34if TYPE_CHECKING:
 35    from classes.type_protocols import (
 36        AgentConfigT,
 37        CallableT,
 38        CanDetectNearbyObstacles,
 39        CanDetectObstacles,
 40        HasBaseSettings,
 41        HasConfig,
 42    )
 43    from agents.lunatic_agent import LunaticAgent
 44    from agents.tools.config_creation import BehaviorAgentSettings, LunaticAgentSettings
 45    from classes.worldmodel import WorldModel
 46
 47_T = TypeVar("_T")
 48_P = ParamSpec('_P')
 49if sys.version_info >= (3, 8):
 50    _AgentFunction = Callable[Concatenate["LunaticAgent", _P], _T]
 51else:
 52    _AgentFunction = Callable[[Concatenate["LunaticAgent", _P]], _T]
 53
 54# ------------------------------
 55# Decorators
 56# ------------------------------
 57    
 58    
[docs] 59def result_to_context(key: str) -> Callable[[CallableT], CallableT]: 60 """ 61 Decorator to use for the agent. Sets the **key** attribute of the 62 :py:class:`.Context`. 63 """ 64 def decorator(func: CallableT) -> CallableT: 65 @wraps(func) 66 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs): 67 result = func(self, *args, **kwargs) 68 setattr(self.ctx, key, result) 69 return result 70 return wrapper # type: ignore[return-value] 71 72 return decorator
73 74
[docs] 75def must_clear_hazard(func: CallableT) -> CallableT: 76 """ 77 Decorator which raises an EmergencyStopException if self.detected_hazards 78 is not empty after the function call. 79 80 Raises: 81 EmergencyStopException: If self.detected_hazards is not empty after the function call. 82 """ 83 @wraps(func) 84 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs): 85 result = func(self, *args, **kwargs) 86 if self.detected_hazards: 87 raise EmergencyStopException(self.detected_hazards) 88 return result 89 return wrapper # type: ignore[return-value]
90 91
[docs] 92def phase_callback(*, on_enter: Union[Phase, Callable[['LunaticAgent'], Any], None] = None, 93 on_exit: Union[Phase, Callable[['LunaticAgent'], Any], None] = None, 94 on_exit_exceptions: Union[Sequence["type[BaseException]"], bool, None] = (), 95 prior_result_getter: Optional[Union[Callable[['LunaticAgent'], Any], str]] = None): 96 """ 97 Decorator function for defining phase callbacks that are executed at the start and end of a function. 98 99 Args: 100 on_enter (Phase, optional): 101 The phase to execute before the decorated function. 102 Defaults to None. 103 on_exit: 104 Either the phase to execute after the decorated function or a callable. 105 Defaults to None. 106 on_exit_exceptions (Tuple[BaseException] | bool)): 107 If a non-empty sequence of exceptions is provided, the **on_exit** phase will 108 **only be executed if one of the exceptions is raised.** 109 110 If :python:`True`, the **on_exit** phase will be executed if any 111 :py:exc:`LunaticAgentException` are raised. 112 Defaults to :code:`False`. 113 114 Attention: 115 - The **on_exit** phase will *only* be executed if and only if one of the exceptions 116 is raised. 117 - The **exception will be re-raised** after executing **on_exit**. 118 119 prior_result_getter: Can be the name of an attribute of the agent. If the 120 attribute is a callable, it will be called without arguments. Alternatively 121 a callable can be passed. The result will be used as the **prior_results** 122 argument for the :py:meth:`.LunaticAgent.execute_phase` method. 123 124 Warns: 125 If **on_enter** and **on_exit** are not set, the decorator will print a 126 warning and ignore the decorator. 127 """ 128 # Validate exception -> Tuple 129 _on_exit_exceptions_: Tuple["type[BaseException]", ...] 130 if on_exit_exceptions is True: 131 _on_exit_exceptions_ = (LunaticAgentException,) 132 elif isclass(on_exit_exceptions) and issubclass(on_exit_exceptions, BaseException): 133 # This allows to pass a single exception, and is actually Never 134 _on_exit_exceptions_ = assure_type(Tuple["type[BaseException]", ...], (on_exit_exceptions,)) 135 elif not on_exit_exceptions: 136 _on_exit_exceptions_ = () 137 else: 138 _on_exit_exceptions_ = tuple(on_exit_exceptions) 139 140 # Validate prior_result -> Callable 141 if prior_result_getter and not callable(prior_result_getter): 142 prior_result_getter = attrgetter(prior_result_getter) # raises Type Error if not string 143 144 # Pay attention to prior_result which should not be prior_result 145 def decorator(func: _AgentFunction[_P, _T]): 146 if on_enter is None and on_exit is None: 147 print("WARNING: No `on_enter`, `on_exit` phase set for `phase_callback` " 148 f"decorator for function {func.__name__}. Ignoring decorator.") 149 if TYPE_CHECKING: 150 assert_never(func) # we ignore this # pyright: ignore 151 return func 152 153 @wraps(func) 154 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs): 155 if on_enter: 156 # Careful do not set prior_result else it is not nonlocal 157 prior_result = prior_result_getter(self) if prior_result_getter else None 158 # if the attribute is a callable, e.g. get_control(), call it 159 if callable(prior_result): 160 # Call attribute of the agent 161 prior_result = prior_result() 162 if isinstance(on_enter, Phase): 163 self.execute_phase(on_enter, prior_results=prior_result) 164 else: 165 on_enter(self) 166 167 # Call with exception handling 168 if _on_exit_exceptions_: 169 try: 170 result = func(self, *args, **kwargs) 171 except _on_exit_exceptions_ as e: 172 if on_exit is not None: 173 if isinstance(on_exit, Phase): 174 self.execute_phase(on_exit, prior_results=e) 175 else: 176 on_exit(self) 177 raise 178 else: 179 result = func(self, *args, **kwargs) 180 181 if on_exit: 182 if callable(on_exit): 183 on_exit(self) 184 else: 185 self.execute_phase(on_exit, prior_results=result) 186 187 return result 188 189 return wrapper 190 191 return decorator
192 193# ------------------------------ 194# Obstacle Detection 195# ------------------------------ 196 197
[docs] 198def max_detection_distance(self: HasConfig["BehaviorAgentSettings | LunaticAgentSettings"], 199 lane: Literal["same_lane", "other_lane", "overtaking", "tailgating"]) -> float: 200 """ 201 Convenience function to be used with :py:func:`lunatic_agent_tools.detect_vehicles` and :any:`LunaticAgent.detect_obstacles_in_path`. 202 203 The max distance to consider an obstacle is calculated as: 204 205 .. code-block:: python 206 207 max(obstacles.min_proximity_threshold, 208 live_info.current_speed_limit / obstacles.speed_detection_downscale.[same|other]_lane) 209 210 Args: 211 self : An object that implements the `config` and `live_info` attributes 212 lane : The lane to consider. 213 214 Note: 215 **lane** must be a key in :code:`BehaviorAgentObstacleSettings.SpeedLimitDetectionDownscale`. 216 217 """ 218 219 return max(self.config.obstacles.min_proximity_threshold, 220 self.config.live_info.current_speed_limit / self.config.obstacles.speed_detection_downscale[lane])
221 222
[docs] 223def detect_obstacles_in_path(self: "CanDetectNearbyObstacles", 224 obstacle_list: Optional[Union[Sequence[carla.Actor], carla.ActorList, 225 Literal['all']]]) -> ObstacleDetectionResult: 226 """ 227 This module is in charge of warning in case of a collision 228 and managing possible tailgating chances. 229 230 Args: 231 self : The agent 232 obstacle_list : The list of obstacles that should be checked 233 234 Note: 235 - Distance to detect vehicles that hinder a lance change are calculated with the 236 :py:func:`max_detection_distance` function. 237 - Former :code:`BehaviorAgent.collision_and_car_avoid_manager`, which evaded cars via the 238 tailgating function; this is now rule based. 239 240 Tip: 241 As the first argument is the agent, this function can be used as a method, i.e 242 it can be added / imported directly into the agent class' body. 243 """ 244 245 if obstacle_list in (None, 'all'): 246 obstacle_list = self.all_obstacles_nearby 247 248 # Triple (<is there an obstacle> , <the actor> , <distance to the actor>) 249 if self.config.live_info.incoming_direction == RoadOption.CHANGELANELEFT: 250 detection_result: ObstacleDetectionResult = detect_obstacles(self, obstacle_list, 251 self.max_detection_distance("other_lane"), 252 up_angle_th=self.config.obstacles.detection_angles.cars_lane_change[1], 253 lane_offset=-1) 254 elif self.config.live_info.incoming_direction == RoadOption.CHANGELANERIGHT: 255 detection_result: ObstacleDetectionResult = detect_obstacles(self, obstacle_list, 256 self.max_detection_distance("other_lane"), 257 up_angle_th=self.config.obstacles.detection_angles.cars_lane_change[1], 258 lane_offset=1) 259 else: 260 detection_result: ObstacleDetectionResult = detect_obstacles(self, obstacle_list, 261 self.max_detection_distance("same_lane"), 262 up_angle_th=self.config.obstacles.detection_angles.cars_same_lane[1],) 263 return detection_result
264 265
[docs] 266def detect_obstacles(self: "CanDetectObstacles", 267 actor_list: Optional[Sequence[carla.Actor] | carla.ActorList] = None, 268 max_distance: Optional[float] = None, 269 up_angle_th: float = 90, 270 low_angle_th: float = 0, 271 *, 272 lane_offset: int = 0) -> ObstacleDetectionResult: 273 """ 274 Method to check if there is a vehicle in front or around the agent blocking its path. 275 276 Parameters: 277 278 self: The agent 279 actor_list: list containing relevant actors to check. 280 If :code:`None`, all vehicle in the scene are used. 281 max_distance: max free-space to check for obstacles. 282 If :code:`None`, the :py:attr:`.LunaticAgentSettings.obstacles.base_vehicle_threshold` value 283 is used. 284 lane_offset: check a different lane than the one the agent is currently in. 285 286 The angle between the location and reference transform will also be taken into account. 287 Being 0 a location in front and 180, one behind, i.e, the vector between has to satisfy: 288 **low_angle_th** < angle < **up_angle_th**. 289 290 Tip: 291 As the first argument is the agent, this function can be used as a method, i.e 292 it can be added / imported directly into the agent class' body. 293 """ 294 295 # See also scenario_runner scenario_helper.detect_lane_obstacle 296 297 if self.config.obstacles.ignore_vehicles: 298 return ObstacleDetectionResult(False, None, -1) 299 300 if actor_list is None: 301 # NOTE: If empty list is passed e.g. for walkers this pulls all vehicles 302 # TODO: Propose update to original carla 303 actor_list = self._vehicle.get_world().get_actors().filter("*vehicle*") 304 elif len(actor_list) == 0: # Case for no pedestrians 305 return ObstacleDetectionResult(False, None, -1) 306 307 if not max_distance: 308 max_distance = self.config.obstacles.base_vehicle_threshold # TODO: This is not modified with the dynamic threshold 309 310 def get_route_polygon() -> None | Polygon: 311 # Note nested functions can access variables from the outer scope 312 route_bb = [] # type: list[list[float]] 313 extent_y = self._vehicle.bounding_box.extent.y 314 r_ext = extent_y + self.config.planner.offset 315 l_ext = -extent_y + self.config.planner.offset 316 r_vec = ego_transform.get_right_vector() 317 p1 = ego_location + carla.Location(r_ext * r_vec.x, r_ext * r_vec.y) 318 p2 = ego_location + carla.Location(l_ext * r_vec.x, l_ext * r_vec.y) 319 route_bb.extend([[p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z]]) 320 321 for wp, _ in self._local_planner.get_plan(): 322 if ego_location.distance(wp.transform.location) > max_distance: 323 break 324 325 r_vec = wp.transform.get_right_vector() 326 p1 = wp.transform.location + carla.Location(r_ext * r_vec.x, r_ext * r_vec.y) 327 p2 = wp.transform.location + carla.Location(l_ext * r_vec.x, l_ext * r_vec.y) 328 route_bb.extend([[p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z]]) 329 330 # Two points don't create a polygon, nothing to check 331 if len(route_bb) < 3: 332 return None 333 334 return Polygon(route_bb) 335 336 # TODO: can get this from CDP 337 ego_transform = self._vehicle.get_transform() 338 ego_location = ego_transform.location # NOTE: property access creates a new location object, i.e. ego_location != ego_front_transform 339 ego_wpt = CarlaDataProvider.get_map().get_waypoint(ego_location) 340 341 # Get the right offset 342 if ego_wpt.lane_id < 0 and lane_offset != 0: 343 lane_offset *= -1 344 345 # Get the transform of the front of the ego 346 ego_front_transform = ego_transform 347 ego_front_transform.location += carla.Location( 348 ego_transform.get_forward_vector() * self._vehicle.bounding_box.extent.x) 349 350 opposite_invasion = abs(self.config.planner.offset) + self._vehicle.bounding_box.extent.y > ego_wpt.lane_width / 2 351 use_bbs = self.config.obstacles.use_bbs_detection or opposite_invasion or ego_wpt.is_junction 352 353 # Get the route bounding box 354 route_polygon = get_route_polygon() 355 356 for target_vehicle in actor_list: 357 if target_vehicle.id == self._vehicle.id: 358 continue 359 360 target_transform = target_vehicle.get_transform() 361 if target_transform.location.distance(ego_location) > max_distance: 362 continue 363 364 target_wpt = CarlaDataProvider.get_map().get_waypoint(target_transform.location, lane_type=carla.LaneType.Any) 365 if not target_wpt: 366 logger.warning("No waypoint found for the checked obstacle." 367 "This might be a bug in the map but ok for static obstacles.") 368 continue 369 370 # General approach for junctions and vehicles invading other lanes due to the offset 371 if (use_bbs or target_wpt.is_junction) and route_polygon: 372 373 target_bb = target_vehicle.bounding_box 374 target_vertices = target_bb.get_world_vertices(target_vehicle.get_transform()) 375 target_list = [[v.x, v.y, v.z] for v in target_vertices] 376 target_polygon = Polygon(target_list) 377 378 if route_polygon.intersects(target_polygon): 379 return ObstacleDetectionResult(True, 380 target_vehicle, 381 target_vehicle.get_location().distance(ego_location)) 382 383 # Simplified approach, using only the plan waypoints (similar to TM) 384 else: 385 386 if target_wpt.road_id != ego_wpt.road_id or target_wpt.lane_id != ego_wpt.lane_id + lane_offset: 387 next_wpt = self._local_planner.get_incoming_waypoint_and_direction(steps=3)[0] 388 if not next_wpt: 389 continue 390 if target_wpt.road_id != next_wpt.road_id or target_wpt.lane_id != next_wpt.lane_id + lane_offset: 391 continue 392 393 target_forward_vector = target_transform.get_forward_vector() 394 target_extent = target_vehicle.bounding_box.extent.x 395 target_rear_transform = target_transform 396 397 target_rear_transform.location -= carla.Location( 398 x=target_extent * target_forward_vector.x, 399 y=target_extent * target_forward_vector.y, 400 ) 401 402 if is_within_distance(target_rear_transform, ego_front_transform, max_distance, 403 [low_angle_th, up_angle_th]): 404 return ObstacleDetectionResult(True, 405 target_vehicle, 406 target_rear_transform.location.distance( 407 ego_front_transform.location)) 408 409 return ObstacleDetectionResult(False, None, -1)
410 411
[docs] 412def detect_vehicles(self: "CanDetectObstacles", 413 vehicle_list: Optional[Sequence[carla.Actor] | carla.ActorList] = None, 414 max_distance: Optional[float] = None, 415 up_angle_th: float = 90, 416 low_angle_th: float = 0, 417 lane_offset: int = 0) -> ObstacleDetectionResult: 418 """ 419 Method to check if there is a vehicle in front or around the agent blocking its path. 420 421 Parameters: 422 423 self: The agent 424 vehicle_list: list containing vehicle objects. 425 If :code:`None`, all vehicle in the scene are used. 426 max_distance: max free-space to check for obstacles. 427 If :code:`None`, the :py:attr:`.LunaticAgentSettings.obstacles.base_vehicle_threshold` value 428 is used. 429 lane_offset: check a different lane than the one the agent is currently in. 430 431 The angle between the location and reference transform will also be taken into account. 432 Being 0 a location in front and 180, one behind, i.e, the vector between has to satisfy: 433 **low_angle_th** < angle < **up_angle_th**. 434 435 Tip: 436 As the first argument is the agent, this function can be used as a method, i.e 437 it can be added / imported directly into the agent class' body. 438 439 .. deprecated:: 440 Use :py:func:`.detect_obstacles` instead. 441 """ 442 return detect_obstacles(self, vehicle_list, max_distance, 443 up_angle_th, low_angle_th, 444 lane_offset=lane_offset)
445 446 447# Untested 448detect_obstacles_in_front = partial(detect_vehicles, up_angle_th=90, low_angle_th=0) 449""" 450:py:func:`.detect_vehicles` with the default parameters for detecting vehicles in front of the agent. 451""" 452 453detect_obstacles_behind = partial(detect_vehicles, up_angle_th=180, low_angle_th=160) 454""" 455:py:func:`.detect_vehicles` with the default parameters for detecting vehicles behind the agent. 456""" 457 458 459# ------------------------------ 460# Path Planning 461# ------------------------------ 462
[docs] 463def generate_lane_change_path(waypoint: carla.Waypoint, 464 direction: Literal['left', 'right'] = 'left', 465 distance_same_lane: float = 10, 466 distance_other_lane: float = 25, 467 lane_change_distance: float = 25, 468 check: bool = True, 469 lane_changes: int = 1, 470 step_distance: float = 2) -> "list[tuple[carla.Waypoint, RoadOption]]": 471 """ 472 This method generates a path that results in a lane change. 473 Use the different distances to fine-tune the maneuver. 474 If the lane change is impossible, the returned path will be empty. 475 476 Distance traveled: 477 1. **distance_same_lane** in the same lane. 478 2. **lane_change_distance** while reaching the other lane. 479 3. **distance_other_lane** in the other lane. 480 481 Parameters: 482 waypoint: The starting waypoint. 483 direction: The direction of the lane change, either 'left' or 'right'. 484 Defaults to 'left'. 485 distance_same_lane: The distance to follow the same lane before the lane change. 486 distance_other_lane: The distance to follow the other lane after the lane change. 487 lane_change_distance: The distance to reach the center of the last lane. 488 A low value will make a fast lane change, while a high value will make slow lane change. 489 check: If :python:`True`, the method will check if the lane change is possible, i.e. that 490 there is a valid lane that the vehicle can change to. 491 This ignores :py:attr:`carla.Waypoint.lane_change`. 492 """ 493 distance_same_lane = max(distance_same_lane, 0.1) 494 distance_other_lane = max(distance_other_lane, 0.1) 495 lane_change_distance = max(lane_change_distance, 0.1) 496 497 plan: "list[tuple[carla.Waypoint, RoadOption]]" = [(waypoint, RoadOption.LANEFOLLOW)] 498 option = RoadOption.LANEFOLLOW 499 500 # Same lane 501 distance = 0 502 while distance < distance_same_lane: 503 next_wps = plan[-1][0].next(step_distance) # follow a path of waypoints 504 if not next_wps: 505 return [] 506 next_wp = next_wps[0] 507 distance += next_wp.transform.location.distance(plan[-1][0].transform.location) 508 plan.append((next_wp, RoadOption.LANEFOLLOW)) # next waypoint to the path 509 510 # TEMP 511 assert direction in ('left', 'right') # TODO: # END: remove at end of project 512 513 if direction == 'left': 514 option = RoadOption.CHANGELANELEFT 515 elif direction == 'right': 516 option = RoadOption.CHANGELANERIGHT 517 else: 518 # ERROR, input value for change must be 'left' or 'right' 519 return [] 520 521 lane_changes_done = 0 522 lane_change_distance = lane_change_distance / lane_changes 523 524 # Lane change 525 while lane_changes_done < lane_changes: 526 527 # Move forward 528 next_wps = plan[-1][0].next(lane_change_distance) 529 if not next_wps: 530 return [] 531 next_wp = next_wps[0] 532 533 # Get the side lane 534 if direction == 'left': 535 if check and str(next_wp.lane_change) not in ['Left', 'Both']: 536 return [] 537 side_wp = next_wp.get_left_lane() # get waypoint on other lane 538 else: 539 if check and str(next_wp.lane_change) not in ['Right', 'Both']: 540 return [] 541 side_wp = next_wp.get_right_lane() 542 543 if not side_wp or (check and side_wp.lane_type != carla.LaneType.Driving): 544 return [] 545 546 # Update the plan 547 plan.append((side_wp, option)) 548 lane_changes_done += 1 549 550 # Other lane 551 # NOTE: Might force it to follow the other lane for some time 552 distance = 0 553 while distance < distance_other_lane: 554 next_wps = plan[-1][0].next(step_distance) 555 if not next_wps: 556 return [] 557 next_wp = next_wps[0] 558 distance += next_wp.transform.location.distance(plan[-1][0].transform.location) 559 plan.append((next_wp, RoadOption.LANEFOLLOW)) 560 561 return plan
562 563
[docs] 564def create_agent_config(self: HasBaseSettings[AgentConfigT], 565 source: Union["type[AgentConfigT]", AgentConfigT, DictConfig, str, None] = None, 566 world_model: Optional["WorldModel"] = None, 567 overwrite_options: Optional[Dict[str, Any]] = None) -> AgentConfigT: 568 """ 569 Method to create the :py:class:`.AgentConfig` from different input types. 570 571 Parameters: 572 self (LunaticAgent): The agent 573 source: 574 - :code:`None` takes the config from the **world model** if available. 575 - :py:class:`.AgentConfig` (class or instance) to be used. 576 - :py:class:`omegaconf.DictConfig`, a dictionary with the configuration, 577 i.e. duck-typed as :py:class:`.AgentConfig`. 578 579 Returns: 580 :py:attr:`self.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` (duck-typed): 581 The configuration object. The actual type depends on **source**. 582 If it is a :python:`str`, :py:class:`.AgentConfig` or :py:class:`.DictConfig`, the actual 583 return type will be a :py:class:`omegaconf.DictConfig`. 584 """ 585 if overwrite_options is None: 586 overwrite_options = {} 587 if source is None and world_model and world_model._config is not None: # pyright: ignore[reportUnnecessaryComparison] 588 logger.debug("Using world model config") 589 opt_dict = world_model._config 590 elif source is None: 591 raise ValueError("Must pass a valid config as behavior or a world model with a set config.") 592 elif isinstance(source, str): # Assuming Path 593 logger.debug("Creating config from yaml file") 594 opt_dict = self.BASE_SETTINGS.from_yaml(source) 595 elif isinstance(source, AgentConfig) or (isclass(source) and issubclass(source, AgentConfig)): # pyright: ignore[reportUnnecessaryIsInstance] 596 logger.debug("Config is a dataclass / AgentConfig") 597 _cfg = source.to_dict_config() 598 _cfg.merge_with(overwrite_options) # Note uses DictConfig.update 599 opt_dict = assure_type(source.__class__, _cfg) 600 elif isinstance(source, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance] 601 logger.debug("Config is a DictConfig") 602 source.merge_with(overwrite_options) 603 opt_dict = self.BASE_SETTINGS.cast(source) 604 elif isclass(source): 605 logger.warning("Config is a class of type %s but not an AgentConfig, this is unexpected.", 606 type(source)) 607 opt_dict = assure_type(source, source(**overwrite_options)) 608 elif not overwrite_options: 609 logger.warning("Settings of type %s are not a supported Config class", type(source)) 610 opt_dict = source # assume the user passed something appropriate 611 else: 612 logger.warning("Warning: Settings of type %s are not an instance of a supported class. " 613 "Trying to apply overwrite options.", type(source)) 614 source.update(overwrite_options) 615 opt_dict = source # assume the user passed something appropriate 616 if isinstance(opt_dict, DictConfig): 617 opt_dict._set_flag("allow_objects", True) # pyright: ignore[reportPrivateUsage] 618 opt_dict.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config. 619 cfg = opt_dict # pyright: ignore[reportUnknownVariableType] 620 return self.BASE_SETTINGS.cast(cfg) # duck-type it