Source code for agents.tools.lunatic_agent_tools

  1"""
  2Helper functions and methods for the :py:class:`.LunaticAgent`, some methods are variants
  3from the original CARLA agents that have been simplified and outsourced to this
  4module.
  5"""
  6# pyright: strict
  7# pyright: reportUnnecessaryIsInstance=information
  8# pyright: reportPrivateUsage=false
  9# pyright: reportTypeCommentUsage=none
 10
 11from __future__ import annotations
 12
 13import sys
 14from functools import partial, wraps
 15from inspect import isclass
 16from operator import attrgetter
 17from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Sequence, Tuple, Union
 18import typing
 19
 20import carla
 21from omegaconf import DictConfig
 22from shapely.geometry import Polygon
 23from typing_extensions import Concatenate, Literal, ParamSpec, TypeVar, assert_never
 24
 25from agents.tools.config_creation import AgentConfig
 26from agents.tools.hints import ObstacleDetectionResult
 27from agents.tools.logs import logger
 28from agents.tools.misc import is_within_distance
 29from classes.constants import Phase, RoadOption
 30from classes.exceptions import EmergencyStopException, LunaticAgentException
 31from launch_tools import CarlaDataProvider
 32
 33if TYPE_CHECKING:
 34    from classes.type_protocols import (
 35        AgentConfigT,
 36        CallableT,
 37        CanDetectNearbyObstacles,
 38        CanDetectObstacles,
 39        HasBaseSettings,
 40        HasConfig,
 41    )
 42    from agents.lunatic_agent import LunaticAgent
 43    from agents.tools.config_creation import BehaviorAgentSettings, LunaticAgentSettings
 44    from classes.worldmodel import WorldModel
 45
 46_T = TypeVar("_T")
 47_P = ParamSpec("_P")
 48if sys.version_info >= (3, 8):
 49    _AgentFunction = Callable[Concatenate["LunaticAgent", _P], _T]
 50else:
 51    _AgentFunction = Callable[[Concatenate["LunaticAgent", _P]], _T]
 52
 53# ------------------------------
 54# Decorators
 55# ------------------------------
 56
 57
[docs] 58def result_to_context(key: str) -> Callable[[CallableT], CallableT]: 59 """ 60 Decorator to use for the agent. Sets the **key** attribute of the 61 :py:class:`.Context`. 62 """ 63 64 def decorator(func: CallableT) -> CallableT: 65 @wraps(func) 66 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs): 67 result = func(self, *args, **kwargs) 68 setattr(self.ctx, key, result) 69 return result 70 71 return wrapper # type: ignore[return-value] 72 73 return decorator
74 75
[docs] 76def must_clear_hazard(func: CallableT) -> CallableT: 77 """ 78 Decorator which raises an EmergencyStopException if self.detected_hazards 79 is not empty after the function call. 80 81 Raises: 82 EmergencyStopException: If self.detected_hazards is not empty after the function call. 83 """ 84 85 @wraps(func) 86 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs): 87 result = func(self, *args, **kwargs) 88 if self.detected_hazards: 89 raise EmergencyStopException(self.detected_hazards) 90 return result 91 92 return wrapper # type: ignore[return-value]
93 94
[docs] 95def phase_callback( 96 *, 97 on_enter: Union[Phase, Callable[["LunaticAgent"], Any], None] = None, 98 on_exit: Union[Phase, Callable[["LunaticAgent"], Any], None] = None, 99 on_exit_exceptions: Union[Sequence["type[BaseException]"], bool, None] = (), 100 prior_result_getter: Optional[Union[Callable[["LunaticAgent"], Any], str]] = None, 101): 102 """ 103 Decorator function for defining phase callbacks that are executed at the start and end of a function. 104 105 Args: 106 on_enter (Phase, optional): 107 The phase to execute before the decorated function. 108 Defaults to None. 109 on_exit: 110 Either the phase to execute after the decorated function or a callable. 111 Defaults to None. 112 on_exit_exceptions (Tuple[BaseException] | bool)): 113 If a non-empty sequence of exceptions is provided, the **on_exit** phase will 114 **only be executed if one of the exceptions is raised.** 115 116 If :python:`True`, the **on_exit** phase will be executed if any 117 :py:exc:`LunaticAgentException` are raised. 118 Defaults to :code:`False`. 119 120 Attention: 121 - The **on_exit** phase will *only* be executed if and only if one of the exceptions 122 is raised. 123 - The **exception will be re-raised** after executing **on_exit**. 124 125 prior_result_getter: Can be the name of an attribute of the agent. If the 126 attribute is a callable, it will be called without arguments. Alternatively 127 a callable can be passed. The result will be used as the **prior_results** 128 argument for the :py:meth:`.LunaticAgent.execute_phase` method. 129 130 Warns: 131 If **on_enter** and **on_exit** are not set, the decorator will print a 132 warning and ignore the decorator. 133 """ 134 # Validate exception -> Tuple 135 _on_exit_exceptions_: Tuple["type[BaseException]", ...] 136 if on_exit_exceptions is True: 137 _on_exit_exceptions_ = (LunaticAgentException,) 138 elif isclass(on_exit_exceptions) and issubclass(on_exit_exceptions, BaseException): 139 # This allows to pass a single exception, and is actually Never 140 exceptions_as_tuple = (on_exit_exceptions,) 141 _on_exit_exceptions_ = typing.cast("Tuple[type[BaseException], ...]", exceptions_as_tuple) 142 elif not on_exit_exceptions: 143 _on_exit_exceptions_ = () 144 else: 145 _on_exit_exceptions_ = tuple(on_exit_exceptions) 146 147 # Validate prior_result -> Callable 148 if prior_result_getter and not callable(prior_result_getter): 149 prior_result_getter = attrgetter(prior_result_getter) # raises Type Error if not string 150 151 # Pay attention to prior_result which should not be prior_result 152 def decorator(func: _AgentFunction[_P, _T]): 153 if on_enter is None and on_exit is None: 154 print( 155 "WARNING: No `on_enter`, `on_exit` phase set for `phase_callback` " 156 f"decorator for function {func.__name__}. Ignoring decorator." 157 ) 158 if TYPE_CHECKING: 159 assert_never(func) # we ignore this # pyright: ignore 160 return func 161 162 @wraps(func) 163 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs): 164 if on_enter: 165 # Careful do not set prior_result else it is not nonlocal 166 prior_result = prior_result_getter(self) if prior_result_getter else None 167 # if the attribute is a callable, e.g. get_control(), call it 168 if callable(prior_result): 169 # Call attribute of the agent 170 prior_result = prior_result() 171 if isinstance(on_enter, Phase): 172 self.execute_phase(on_enter, prior_results=prior_result) 173 else: 174 on_enter(self) 175 176 # Call with exception handling 177 if _on_exit_exceptions_: 178 try: 179 result = func(self, *args, **kwargs) 180 except _on_exit_exceptions_ as e: 181 if on_exit is not None: 182 if isinstance(on_exit, Phase): 183 self.execute_phase(on_exit, prior_results=e) 184 else: 185 on_exit(self) 186 raise 187 else: 188 result = func(self, *args, **kwargs) 189 190 if on_exit: 191 if callable(on_exit): 192 on_exit(self) 193 else: 194 self.execute_phase(on_exit, prior_results=result) 195 196 return result 197 198 return wrapper 199 200 return decorator
201 202 203# ------------------------------ 204# Obstacle Detection 205# ------------------------------ 206 207
[docs] 208def max_detection_distance( 209 self: HasConfig["BehaviorAgentSettings | LunaticAgentSettings"], 210 lane: Literal["same_lane", "other_lane", "overtaking", "tailgating"], 211) -> float: 212 """ 213 Convenience function to be used with :py:func:`lunatic_agent_tools.detect_vehicles` and :any:`LunaticAgent.detect_obstacles_in_path`. 214 215 The max distance to consider an obstacle is calculated as: 216 217 .. code-block:: python 218 219 max(obstacles.min_proximity_threshold, 220 live_info.current_speed_limit / obstacles.speed_detection_downscale.[same|other]_lane) 221 222 Args: 223 self : An object that implements the `config` and `live_info` attributes 224 lane : The lane to consider. 225 226 Note: 227 **lane** must be a key in :code:`BehaviorAgentObstacleSettings.SpeedLimitDetectionDownscale`. 228 229 """ 230 231 return max( 232 self.config.obstacles.min_proximity_threshold, 233 self.config.live_info.current_speed_limit / self.config.obstacles.speed_detection_downscale[lane], 234 )
235 236
[docs] 237def detect_obstacles_in_path( 238 self: "CanDetectNearbyObstacles", 239 obstacle_list: Optional[Union[Sequence[carla.Actor], carla.ActorList, Literal["all"]]], 240) -> ObstacleDetectionResult: 241 """ 242 This module is in charge of warning in case of a collision 243 and managing possible tailgating chances. 244 245 Args: 246 self : The agent 247 obstacle_list : The list of obstacles that should be checked 248 249 Note: 250 - Distance to detect vehicles that hinder a lance change are calculated with the 251 :py:func:`max_detection_distance` function. 252 - Former :code:`BehaviorAgent.collision_and_car_avoid_manager`, which evaded cars via the 253 tailgating function; this is now rule based. 254 255 Tip: 256 As the first argument is the agent, this function can be used as a method, i.e 257 it can be added / imported directly into the agent class' body. 258 """ 259 260 if obstacle_list in (None, "all"): 261 obstacle_list = self.all_obstacles_nearby 262 263 # Triple (<is there an obstacle> , <the actor> , <distance to the actor>) 264 if self.config.live_info.incoming_direction == RoadOption.CHANGELANELEFT: 265 detection_result: ObstacleDetectionResult = detect_obstacles( 266 self, 267 obstacle_list, 268 self.max_detection_distance("other_lane"), 269 up_angle_th=self.config.obstacles.detection_angles.cars_lane_change[1], 270 lane_offset=-1, 271 ) 272 elif self.config.live_info.incoming_direction == RoadOption.CHANGELANERIGHT: 273 detection_result: ObstacleDetectionResult = detect_obstacles( 274 self, 275 obstacle_list, 276 self.max_detection_distance("other_lane"), 277 up_angle_th=self.config.obstacles.detection_angles.cars_lane_change[1], 278 lane_offset=1, 279 ) 280 else: 281 detection_result: ObstacleDetectionResult = detect_obstacles( 282 self, 283 obstacle_list, 284 self.max_detection_distance("same_lane"), 285 up_angle_th=self.config.obstacles.detection_angles.cars_same_lane[1], 286 ) 287 return detection_result
288 289
[docs] 290def detect_obstacles( 291 self: "CanDetectObstacles", 292 actor_list: Optional[Sequence[carla.Actor] | carla.ActorList] = None, 293 max_distance: Optional[float] = None, 294 up_angle_th: float = 90, 295 low_angle_th: float = 0, 296 *, 297 lane_offset: int = 0, 298) -> ObstacleDetectionResult: 299 """ 300 Method to check if there is a vehicle in front or around the agent blocking its path. 301 302 Parameters: 303 304 self: The agent 305 actor_list: list containing relevant actors to check. 306 If :code:`None`, all vehicle in the scene are used. 307 max_distance: max free-space to check for obstacles. 308 If :code:`None`, the :py:attr:`.LunaticAgentSettings.obstacles.base_vehicle_threshold` value 309 is used. 310 lane_offset: check a different lane than the one the agent is currently in. 311 312 The angle between the location and reference transform will also be taken into account. 313 Being 0 a location in front and 180, one behind, i.e, the vector between has to satisfy: 314 **low_angle_th** < angle < **up_angle_th**. 315 316 Tip: 317 As the first argument is the agent, this function can be used as a method, i.e 318 it can be added / imported directly into the agent class' body. 319 """ 320 321 # See also scenario_runner scenario_helper.detect_lane_obstacle 322 323 if self.config.obstacles.ignore_vehicles: 324 return ObstacleDetectionResult(False, None, -1) 325 326 if actor_list is None: 327 # NOTE: If empty list is passed e.g. for walkers this pulls all vehicles 328 # TODO: Propose update to original carla 329 actor_list = self._vehicle.get_world().get_actors().filter("*vehicle*") 330 elif len(actor_list) == 0: # Case for no pedestrians 331 return ObstacleDetectionResult(False, None, -1) 332 333 if not max_distance: 334 max_distance = ( 335 self.config.obstacles.base_vehicle_threshold 336 ) # TODO: This is not modified with the dynamic threshold 337 338 def get_route_polygon() -> Polygon | None: 339 # Note nested functions can access variables from the outer scope 340 route_bb = [] # type: list[list[float]] 341 extent_y = self._vehicle.bounding_box.extent.y 342 r_ext = extent_y + self.config.planner.offset 343 l_ext = -extent_y + self.config.planner.offset 344 r_vec = ego_transform.get_right_vector() 345 p1 = ego_location + carla.Location(r_ext * r_vec.x, r_ext * r_vec.y) 346 p2 = ego_location + carla.Location(l_ext * r_vec.x, l_ext * r_vec.y) 347 route_bb.extend([[p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z]]) 348 349 for wp, _ in self._local_planner.get_plan(): 350 if ego_location.distance(wp.transform.location) > max_distance: 351 break 352 353 r_vec = wp.transform.get_right_vector() 354 p1 = wp.transform.location + carla.Location(r_ext * r_vec.x, r_ext * r_vec.y) 355 p2 = wp.transform.location + carla.Location(l_ext * r_vec.x, l_ext * r_vec.y) 356 route_bb.extend([[p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z]]) 357 358 # Two points don't create a polygon, nothing to check 359 if len(route_bb) < 3: 360 return None 361 362 return Polygon(route_bb) 363 364 # TODO: can get this from CDP 365 ego_transform = self._vehicle.get_transform() 366 ego_location = ( 367 ego_transform.location 368 ) # NOTE: property access creates a new location object, i.e. ego_location != ego_front_transform 369 ego_wpt = CarlaDataProvider.get_map().get_waypoint(ego_location) 370 371 # Get the right offset 372 if ego_wpt.lane_id < 0 and lane_offset != 0: 373 lane_offset *= -1 374 375 # Get the transform of the front of the ego 376 ego_front_transform = ego_transform 377 ego_front_transform.location += carla.Location( 378 ego_transform.get_forward_vector() * self._vehicle.bounding_box.extent.x 379 ) 380 381 opposite_invasion = abs(self.config.planner.offset) + self._vehicle.bounding_box.extent.y > ego_wpt.lane_width / 2 382 use_bbs = self.config.obstacles.use_bbs_detection or opposite_invasion or ego_wpt.is_junction 383 384 # Get the route bounding box 385 route_polygon = get_route_polygon() 386 387 for target_vehicle in actor_list: 388 if target_vehicle.id == self._vehicle.id: 389 continue 390 391 target_transform = target_vehicle.get_transform() 392 if target_transform.location.distance(ego_location) > max_distance: 393 continue 394 395 target_wpt = CarlaDataProvider.get_map().get_waypoint(target_transform.location, lane_type=carla.LaneType.Any) 396 if not target_wpt: 397 logger.warning( 398 "No waypoint found for the checked obstacle.This might be a bug in the map but ok for static obstacles." 399 ) 400 continue 401 402 # General approach for junctions and vehicles invading other lanes due to the offset 403 if (use_bbs or target_wpt.is_junction) and route_polygon: 404 target_bb = target_vehicle.bounding_box 405 target_vertices = target_bb.get_world_vertices(target_vehicle.get_transform()) 406 target_list = [[v.x, v.y, v.z] for v in target_vertices] 407 target_polygon = Polygon(target_list) 408 409 if route_polygon.intersects(target_polygon): 410 return ObstacleDetectionResult( 411 True, target_vehicle, target_vehicle.get_location().distance(ego_location) 412 ) 413 414 # Simplified approach, using only the plan waypoints (similar to TM) 415 else: 416 if target_wpt.road_id != ego_wpt.road_id or target_wpt.lane_id != ego_wpt.lane_id + lane_offset: 417 next_wpt = self._local_planner.get_incoming_waypoint_and_direction(steps=3)[0] 418 if not next_wpt: 419 continue 420 if target_wpt.road_id != next_wpt.road_id or target_wpt.lane_id != next_wpt.lane_id + lane_offset: 421 continue 422 423 target_forward_vector = target_transform.get_forward_vector() 424 target_extent = target_vehicle.bounding_box.extent.x 425 target_rear_transform = target_transform 426 427 target_rear_transform.location -= carla.Location( 428 x=target_extent * target_forward_vector.x, 429 y=target_extent * target_forward_vector.y, 430 ) 431 432 if is_within_distance( 433 target_rear_transform, ego_front_transform, max_distance, [low_angle_th, up_angle_th] 434 ): 435 return ObstacleDetectionResult( 436 True, target_vehicle, target_rear_transform.location.distance(ego_front_transform.location) 437 ) 438 439 return ObstacleDetectionResult(False, None, -1)
440 441
[docs] 442def detect_vehicles( 443 self: "CanDetectObstacles", 444 vehicle_list: Optional[Sequence[carla.Actor] | carla.ActorList] = None, 445 max_distance: Optional[float] = None, 446 up_angle_th: float = 90, 447 low_angle_th: float = 0, 448 lane_offset: int = 0, 449) -> ObstacleDetectionResult: 450 """ 451 Method to check if there is a vehicle in front or around the agent blocking its path. 452 453 Parameters: 454 455 self: The agent 456 vehicle_list: list containing vehicle objects. 457 If :code:`None`, all vehicle in the scene are used. 458 max_distance: max free-space to check for obstacles. 459 If :code:`None`, the :py:attr:`.LunaticAgentSettings.obstacles.base_vehicle_threshold` value 460 is used. 461 lane_offset: check a different lane than the one the agent is currently in. 462 463 The angle between the location and reference transform will also be taken into account. 464 Being 0 a location in front and 180, one behind, i.e, the vector between has to satisfy: 465 **low_angle_th** < angle < **up_angle_th**. 466 467 Tip: 468 As the first argument is the agent, this function can be used as a method, i.e 469 it can be added / imported directly into the agent class' body. 470 471 .. deprecated:: 472 Use :py:func:`.detect_obstacles` instead. 473 """ 474 return detect_obstacles(self, vehicle_list, max_distance, up_angle_th, low_angle_th, lane_offset=lane_offset)
475 476 477# Untested 478detect_obstacles_in_front = partial(detect_vehicles, up_angle_th=90, low_angle_th=0) 479""" 480:py:func:`.detect_vehicles` with the default parameters for detecting vehicles in front of the agent. 481""" 482 483detect_obstacles_behind = partial(detect_vehicles, up_angle_th=180, low_angle_th=160) 484""" 485:py:func:`.detect_vehicles` with the default parameters for detecting vehicles behind the agent. 486""" 487 488 489# ------------------------------ 490# Path Planning 491# ------------------------------ 492 493
[docs] 494def generate_lane_change_path( 495 waypoint: carla.Waypoint, 496 direction: Literal["left", "right"] = "left", 497 distance_same_lane: float = 10, 498 distance_other_lane: float = 25, 499 lane_change_distance: float = 25, 500 check: bool = True, 501 lane_changes: int = 1, 502 step_distance: float = 2, 503) -> "list[tuple[carla.Waypoint, RoadOption]]": 504 """ 505 This method generates a path that results in a lane change. 506 Use the different distances to fine-tune the maneuver. 507 If the lane change is impossible, the returned path will be empty. 508 509 Distance traveled: 510 1. **distance_same_lane** in the same lane. 511 2. **lane_change_distance** while reaching the other lane. 512 3. **distance_other_lane** in the other lane. 513 514 Parameters: 515 waypoint: The starting waypoint. 516 direction: The direction of the lane change, either 'left' or 'right'. 517 Defaults to 'left'. 518 distance_same_lane: The distance to follow the same lane before the lane change. 519 distance_other_lane: The distance to follow the other lane after the lane change. 520 lane_change_distance: The distance to reach the center of the last lane. 521 A low value will make a fast lane change, while a high value will make slow lane change. 522 check: If :python:`True`, the method will check if the lane change is possible, i.e. that 523 there is a valid lane that the vehicle can change to. 524 This ignores :py:attr:`carla.Waypoint.lane_change`. 525 """ 526 distance_same_lane = max(distance_same_lane, 0.1) 527 distance_other_lane = max(distance_other_lane, 0.1) 528 lane_change_distance = max(lane_change_distance, 0.1) 529 530 plan: "list[tuple[carla.Waypoint, RoadOption]]" = [(waypoint, RoadOption.LANEFOLLOW)] 531 option = RoadOption.LANEFOLLOW 532 533 # Same lane 534 distance = 0 535 while distance < distance_same_lane: 536 next_wps = plan[-1][0].next(step_distance) # follow a path of waypoints 537 if not next_wps: 538 return [] 539 next_wp = next_wps[0] 540 distance += next_wp.transform.location.distance(plan[-1][0].transform.location) 541 plan.append((next_wp, RoadOption.LANEFOLLOW)) # next waypoint to the path 542 543 # TEMP 544 assert direction in ("left", "right") # TODO: # END: remove at end of project 545 546 if direction == "left": 547 option = RoadOption.CHANGELANELEFT 548 elif direction == "right": 549 option = RoadOption.CHANGELANERIGHT 550 else: 551 # ERROR, input value for change must be 'left' or 'right' 552 return [] 553 554 lane_changes_done = 0 555 lane_change_distance = lane_change_distance / lane_changes 556 557 # Lane change 558 while lane_changes_done < lane_changes: 559 # Move forward 560 next_wps = plan[-1][0].next(lane_change_distance) 561 if not next_wps: 562 return [] 563 next_wp = next_wps[0] 564 565 # Get the side lane 566 if direction == "left": 567 if check and str(next_wp.lane_change) not in ["Left", "Both"]: 568 return [] 569 side_wp = next_wp.get_left_lane() # get waypoint on other lane 570 else: 571 if check and str(next_wp.lane_change) not in ["Right", "Both"]: 572 return [] 573 side_wp = next_wp.get_right_lane() 574 575 if not side_wp or (check and side_wp.lane_type != carla.LaneType.Driving): 576 return [] 577 578 # Update the plan 579 plan.append((side_wp, option)) 580 lane_changes_done += 1 581 582 # Other lane 583 # NOTE: Might force it to follow the other lane for some time 584 distance = 0 585 while distance < distance_other_lane: 586 next_wps = plan[-1][0].next(step_distance) 587 if not next_wps: 588 return [] 589 next_wp = next_wps[0] 590 distance += next_wp.transform.location.distance(plan[-1][0].transform.location) 591 plan.append((next_wp, RoadOption.LANEFOLLOW)) 592 593 return plan
594 595
[docs] 596def create_agent_config( 597 self: HasBaseSettings[AgentConfigT], 598 source: Union["type[AgentConfigT]", AgentConfigT, DictConfig, str, None] = None, 599 world_model: Optional["WorldModel"] = None, 600 overwrite_options: Optional[Dict[str, Any]] = None, 601) -> AgentConfigT: 602 """ 603 Method to create the :py:class:`.AgentConfig` from different input types. 604 605 Parameters: 606 self (LunaticAgent): The agent 607 source: 608 - :code:`None` takes the config from the **world model** if available. 609 - :py:class:`.AgentConfig` (class or instance) to be used. 610 - :py:class:`omegaconf.DictConfig`, a dictionary with the configuration, 611 i.e. duck-typed as :py:class:`.AgentConfig`. 612 613 Returns: 614 :py:attr:`self.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` (duck-typed): 615 The configuration object. The actual type depends on **source**. 616 If it is a :python:`str`, :py:class:`.AgentConfig` or :py:class:`.DictConfig`, the actual 617 return type will be a :py:class:`omegaconf.DictConfig`. 618 """ 619 if overwrite_options is None: 620 overwrite_options = {} 621 if source is None and world_model and world_model._config is not None: # pyright: ignore[reportUnnecessaryComparison] 622 logger.debug("Using world model config") 623 opt_dict = world_model._config 624 elif source is None: 625 raise ValueError("Must pass a valid config as behavior or a world model with a set config.") 626 elif isinstance(source, str): # Assuming Path 627 logger.debug("Creating config from yaml file") 628 opt_dict = self.BASE_SETTINGS.from_yaml(source) 629 elif isinstance(source, AgentConfig) or (isclass(source) and issubclass(source, AgentConfig)): # pyright: ignore[reportUnnecessaryIsInstance] 630 logger.debug("Config is a dataclass / AgentConfig") 631 cfg = source.to_dict_config() 632 cfg.merge_with(overwrite_options) # Note uses DictConfig.update 633 opt_dict = cfg # has type source.__class__ 634 elif isinstance(source, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance] 635 logger.debug("Config is a DictConfig") 636 source.merge_with(overwrite_options) 637 opt_dict = self.BASE_SETTINGS.cast(source) 638 elif isclass(source): 639 logger.warning("Config is a class of type %s but not an AgentConfig, this is unexpected.", type(source)) 640 opt_dict = typing.cast("source", source(**overwrite_options)) 641 elif not overwrite_options: 642 logger.warning("Settings of type %s are not a supported Config class", type(source)) 643 opt_dict = source # assume the user passed something appropriate 644 else: 645 logger.warning( 646 "Warning: Settings of type %s are not an instance of a supported class. Trying to apply overwrite options.", 647 type(source), 648 ) 649 source.update(overwrite_options) 650 opt_dict = source # assume the user passed something appropriate 651 if isinstance(opt_dict, DictConfig): 652 opt_dict._set_flag("allow_objects", True) # pyright: ignore[reportPrivateUsage] 653 opt_dict.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config. 654 cfg = opt_dict # pyright: ignore[reportUnknownVariableType] 655 return self.BASE_SETTINGS.cast(cfg) # duck-type it