1"""
2Helper functions and methods for the :py:class:`.LunaticAgent`, some methods are variants
3from the original CARLA agents that have been simplified and outsourced to this
4module.
5"""
6# pyright: strict
7# pyright: reportUnnecessaryIsInstance=information
8# pyright: reportPrivateUsage=false
9# pyright: reportTypeCommentUsage=none
10
11from __future__ import annotations
12
13import sys
14from functools import partial, wraps
15from inspect import isclass
16from operator import attrgetter
17from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Sequence, Tuple, Union
18import typing
19
20import carla
21from omegaconf import DictConfig
22from shapely.geometry import Polygon
23from typing_extensions import Concatenate, Literal, ParamSpec, TypeVar, assert_never
24
25from agents.tools.config_creation import AgentConfig
26from agents.tools.hints import ObstacleDetectionResult
27from agents.tools.logs import logger
28from agents.tools.misc import is_within_distance
29from classes.constants import Phase, RoadOption
30from classes.exceptions import EmergencyStopException, LunaticAgentException
31from launch_tools import CarlaDataProvider
32
33if TYPE_CHECKING:
34 from classes.type_protocols import (
35 AgentConfigT,
36 CallableT,
37 CanDetectNearbyObstacles,
38 CanDetectObstacles,
39 HasBaseSettings,
40 HasConfig,
41 )
42 from agents.lunatic_agent import LunaticAgent
43 from agents.tools.config_creation import BehaviorAgentSettings, LunaticAgentSettings
44 from classes.worldmodel import WorldModel
45
46_T = TypeVar("_T")
47_P = ParamSpec("_P")
48if sys.version_info >= (3, 8):
49 _AgentFunction = Callable[Concatenate["LunaticAgent", _P], _T]
50else:
51 _AgentFunction = Callable[[Concatenate["LunaticAgent", _P]], _T]
52
53# ------------------------------
54# Decorators
55# ------------------------------
56
57
[docs]
58def result_to_context(key: str) -> Callable[[CallableT], CallableT]:
59 """
60 Decorator to use for the agent. Sets the **key** attribute of the
61 :py:class:`.Context`.
62 """
63
64 def decorator(func: CallableT) -> CallableT:
65 @wraps(func)
66 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs):
67 result = func(self, *args, **kwargs)
68 setattr(self.ctx, key, result)
69 return result
70
71 return wrapper # type: ignore[return-value]
72
73 return decorator
74
75
[docs]
76def must_clear_hazard(func: CallableT) -> CallableT:
77 """
78 Decorator which raises an EmergencyStopException if self.detected_hazards
79 is not empty after the function call.
80
81 Raises:
82 EmergencyStopException: If self.detected_hazards is not empty after the function call.
83 """
84
85 @wraps(func)
86 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs):
87 result = func(self, *args, **kwargs)
88 if self.detected_hazards:
89 raise EmergencyStopException(self.detected_hazards)
90 return result
91
92 return wrapper # type: ignore[return-value]
93
94
[docs]
95def phase_callback(
96 *,
97 on_enter: Union[Phase, Callable[["LunaticAgent"], Any], None] = None,
98 on_exit: Union[Phase, Callable[["LunaticAgent"], Any], None] = None,
99 on_exit_exceptions: Union[Sequence["type[BaseException]"], bool, None] = (),
100 prior_result_getter: Optional[Union[Callable[["LunaticAgent"], Any], str]] = None,
101):
102 """
103 Decorator function for defining phase callbacks that are executed at the start and end of a function.
104
105 Args:
106 on_enter (Phase, optional):
107 The phase to execute before the decorated function.
108 Defaults to None.
109 on_exit:
110 Either the phase to execute after the decorated function or a callable.
111 Defaults to None.
112 on_exit_exceptions (Tuple[BaseException] | bool)):
113 If a non-empty sequence of exceptions is provided, the **on_exit** phase will
114 **only be executed if one of the exceptions is raised.**
115
116 If :python:`True`, the **on_exit** phase will be executed if any
117 :py:exc:`LunaticAgentException` are raised.
118 Defaults to :code:`False`.
119
120 Attention:
121 - The **on_exit** phase will *only* be executed if and only if one of the exceptions
122 is raised.
123 - The **exception will be re-raised** after executing **on_exit**.
124
125 prior_result_getter: Can be the name of an attribute of the agent. If the
126 attribute is a callable, it will be called without arguments. Alternatively
127 a callable can be passed. The result will be used as the **prior_results**
128 argument for the :py:meth:`.LunaticAgent.execute_phase` method.
129
130 Warns:
131 If **on_enter** and **on_exit** are not set, the decorator will print a
132 warning and ignore the decorator.
133 """
134 # Validate exception -> Tuple
135 _on_exit_exceptions_: Tuple["type[BaseException]", ...]
136 if on_exit_exceptions is True:
137 _on_exit_exceptions_ = (LunaticAgentException,)
138 elif isclass(on_exit_exceptions) and issubclass(on_exit_exceptions, BaseException):
139 # This allows to pass a single exception, and is actually Never
140 exceptions_as_tuple = (on_exit_exceptions,)
141 _on_exit_exceptions_ = typing.cast("Tuple[type[BaseException], ...]", exceptions_as_tuple)
142 elif not on_exit_exceptions:
143 _on_exit_exceptions_ = ()
144 else:
145 _on_exit_exceptions_ = tuple(on_exit_exceptions)
146
147 # Validate prior_result -> Callable
148 if prior_result_getter and not callable(prior_result_getter):
149 prior_result_getter = attrgetter(prior_result_getter) # raises Type Error if not string
150
151 # Pay attention to prior_result which should not be prior_result
152 def decorator(func: _AgentFunction[_P, _T]):
153 if on_enter is None and on_exit is None:
154 print(
155 "WARNING: No `on_enter`, `on_exit` phase set for `phase_callback` "
156 f"decorator for function {func.__name__}. Ignoring decorator."
157 )
158 if TYPE_CHECKING:
159 assert_never(func) # we ignore this # pyright: ignore
160 return func
161
162 @wraps(func)
163 def wrapper(self: "LunaticAgent", *args: _P.args, **kwargs: _P.kwargs):
164 if on_enter:
165 # Careful do not set prior_result else it is not nonlocal
166 prior_result = prior_result_getter(self) if prior_result_getter else None
167 # if the attribute is a callable, e.g. get_control(), call it
168 if callable(prior_result):
169 # Call attribute of the agent
170 prior_result = prior_result()
171 if isinstance(on_enter, Phase):
172 self.execute_phase(on_enter, prior_results=prior_result)
173 else:
174 on_enter(self)
175
176 # Call with exception handling
177 if _on_exit_exceptions_:
178 try:
179 result = func(self, *args, **kwargs)
180 except _on_exit_exceptions_ as e:
181 if on_exit is not None:
182 if isinstance(on_exit, Phase):
183 self.execute_phase(on_exit, prior_results=e)
184 else:
185 on_exit(self)
186 raise
187 else:
188 result = func(self, *args, **kwargs)
189
190 if on_exit:
191 if callable(on_exit):
192 on_exit(self)
193 else:
194 self.execute_phase(on_exit, prior_results=result)
195
196 return result
197
198 return wrapper
199
200 return decorator
201
202
203# ------------------------------
204# Obstacle Detection
205# ------------------------------
206
207
[docs]
208def max_detection_distance(
209 self: HasConfig["BehaviorAgentSettings | LunaticAgentSettings"],
210 lane: Literal["same_lane", "other_lane", "overtaking", "tailgating"],
211) -> float:
212 """
213 Convenience function to be used with :py:func:`lunatic_agent_tools.detect_vehicles` and :any:`LunaticAgent.detect_obstacles_in_path`.
214
215 The max distance to consider an obstacle is calculated as:
216
217 .. code-block:: python
218
219 max(obstacles.min_proximity_threshold,
220 live_info.current_speed_limit / obstacles.speed_detection_downscale.[same|other]_lane)
221
222 Args:
223 self : An object that implements the `config` and `live_info` attributes
224 lane : The lane to consider.
225
226 Note:
227 **lane** must be a key in :code:`BehaviorAgentObstacleSettings.SpeedLimitDetectionDownscale`.
228
229 """
230
231 return max(
232 self.config.obstacles.min_proximity_threshold,
233 self.config.live_info.current_speed_limit / self.config.obstacles.speed_detection_downscale[lane],
234 )
235
236
[docs]
237def detect_obstacles_in_path(
238 self: "CanDetectNearbyObstacles",
239 obstacle_list: Optional[Union[Sequence[carla.Actor], carla.ActorList, Literal["all"]]],
240) -> ObstacleDetectionResult:
241 """
242 This module is in charge of warning in case of a collision
243 and managing possible tailgating chances.
244
245 Args:
246 self : The agent
247 obstacle_list : The list of obstacles that should be checked
248
249 Note:
250 - Distance to detect vehicles that hinder a lance change are calculated with the
251 :py:func:`max_detection_distance` function.
252 - Former :code:`BehaviorAgent.collision_and_car_avoid_manager`, which evaded cars via the
253 tailgating function; this is now rule based.
254
255 Tip:
256 As the first argument is the agent, this function can be used as a method, i.e
257 it can be added / imported directly into the agent class' body.
258 """
259
260 if obstacle_list in (None, "all"):
261 obstacle_list = self.all_obstacles_nearby
262
263 # Triple (<is there an obstacle> , <the actor> , <distance to the actor>)
264 if self.config.live_info.incoming_direction == RoadOption.CHANGELANELEFT:
265 detection_result: ObstacleDetectionResult = detect_obstacles(
266 self,
267 obstacle_list,
268 self.max_detection_distance("other_lane"),
269 up_angle_th=self.config.obstacles.detection_angles.cars_lane_change[1],
270 lane_offset=-1,
271 )
272 elif self.config.live_info.incoming_direction == RoadOption.CHANGELANERIGHT:
273 detection_result: ObstacleDetectionResult = detect_obstacles(
274 self,
275 obstacle_list,
276 self.max_detection_distance("other_lane"),
277 up_angle_th=self.config.obstacles.detection_angles.cars_lane_change[1],
278 lane_offset=1,
279 )
280 else:
281 detection_result: ObstacleDetectionResult = detect_obstacles(
282 self,
283 obstacle_list,
284 self.max_detection_distance("same_lane"),
285 up_angle_th=self.config.obstacles.detection_angles.cars_same_lane[1],
286 )
287 return detection_result
288
289
[docs]
290def detect_obstacles(
291 self: "CanDetectObstacles",
292 actor_list: Optional[Sequence[carla.Actor] | carla.ActorList] = None,
293 max_distance: Optional[float] = None,
294 up_angle_th: float = 90,
295 low_angle_th: float = 0,
296 *,
297 lane_offset: int = 0,
298) -> ObstacleDetectionResult:
299 """
300 Method to check if there is a vehicle in front or around the agent blocking its path.
301
302 Parameters:
303
304 self: The agent
305 actor_list: list containing relevant actors to check.
306 If :code:`None`, all vehicle in the scene are used.
307 max_distance: max free-space to check for obstacles.
308 If :code:`None`, the :py:attr:`.LunaticAgentSettings.obstacles.base_vehicle_threshold` value
309 is used.
310 lane_offset: check a different lane than the one the agent is currently in.
311
312 The angle between the location and reference transform will also be taken into account.
313 Being 0 a location in front and 180, one behind, i.e, the vector between has to satisfy:
314 **low_angle_th** < angle < **up_angle_th**.
315
316 Tip:
317 As the first argument is the agent, this function can be used as a method, i.e
318 it can be added / imported directly into the agent class' body.
319 """
320
321 # See also scenario_runner scenario_helper.detect_lane_obstacle
322
323 if self.config.obstacles.ignore_vehicles:
324 return ObstacleDetectionResult(False, None, -1)
325
326 if actor_list is None:
327 # NOTE: If empty list is passed e.g. for walkers this pulls all vehicles
328 # TODO: Propose update to original carla
329 actor_list = self._vehicle.get_world().get_actors().filter("*vehicle*")
330 elif len(actor_list) == 0: # Case for no pedestrians
331 return ObstacleDetectionResult(False, None, -1)
332
333 if not max_distance:
334 max_distance = (
335 self.config.obstacles.base_vehicle_threshold
336 ) # TODO: This is not modified with the dynamic threshold
337
338 def get_route_polygon() -> Polygon | None:
339 # Note nested functions can access variables from the outer scope
340 route_bb = [] # type: list[list[float]]
341 extent_y = self._vehicle.bounding_box.extent.y
342 r_ext = extent_y + self.config.planner.offset
343 l_ext = -extent_y + self.config.planner.offset
344 r_vec = ego_transform.get_right_vector()
345 p1 = ego_location + carla.Location(r_ext * r_vec.x, r_ext * r_vec.y)
346 p2 = ego_location + carla.Location(l_ext * r_vec.x, l_ext * r_vec.y)
347 route_bb.extend([[p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z]])
348
349 for wp, _ in self._local_planner.get_plan():
350 if ego_location.distance(wp.transform.location) > max_distance:
351 break
352
353 r_vec = wp.transform.get_right_vector()
354 p1 = wp.transform.location + carla.Location(r_ext * r_vec.x, r_ext * r_vec.y)
355 p2 = wp.transform.location + carla.Location(l_ext * r_vec.x, l_ext * r_vec.y)
356 route_bb.extend([[p1.x, p1.y, p1.z], [p2.x, p2.y, p2.z]])
357
358 # Two points don't create a polygon, nothing to check
359 if len(route_bb) < 3:
360 return None
361
362 return Polygon(route_bb)
363
364 # TODO: can get this from CDP
365 ego_transform = self._vehicle.get_transform()
366 ego_location = (
367 ego_transform.location
368 ) # NOTE: property access creates a new location object, i.e. ego_location != ego_front_transform
369 ego_wpt = CarlaDataProvider.get_map().get_waypoint(ego_location)
370
371 # Get the right offset
372 if ego_wpt.lane_id < 0 and lane_offset != 0:
373 lane_offset *= -1
374
375 # Get the transform of the front of the ego
376 ego_front_transform = ego_transform
377 ego_front_transform.location += carla.Location(
378 ego_transform.get_forward_vector() * self._vehicle.bounding_box.extent.x
379 )
380
381 opposite_invasion = abs(self.config.planner.offset) + self._vehicle.bounding_box.extent.y > ego_wpt.lane_width / 2
382 use_bbs = self.config.obstacles.use_bbs_detection or opposite_invasion or ego_wpt.is_junction
383
384 # Get the route bounding box
385 route_polygon = get_route_polygon()
386
387 for target_vehicle in actor_list:
388 if target_vehicle.id == self._vehicle.id:
389 continue
390
391 target_transform = target_vehicle.get_transform()
392 if target_transform.location.distance(ego_location) > max_distance:
393 continue
394
395 target_wpt = CarlaDataProvider.get_map().get_waypoint(target_transform.location, lane_type=carla.LaneType.Any)
396 if not target_wpt:
397 logger.warning(
398 "No waypoint found for the checked obstacle.This might be a bug in the map but ok for static obstacles."
399 )
400 continue
401
402 # General approach for junctions and vehicles invading other lanes due to the offset
403 if (use_bbs or target_wpt.is_junction) and route_polygon:
404 target_bb = target_vehicle.bounding_box
405 target_vertices = target_bb.get_world_vertices(target_vehicle.get_transform())
406 target_list = [[v.x, v.y, v.z] for v in target_vertices]
407 target_polygon = Polygon(target_list)
408
409 if route_polygon.intersects(target_polygon):
410 return ObstacleDetectionResult(
411 True, target_vehicle, target_vehicle.get_location().distance(ego_location)
412 )
413
414 # Simplified approach, using only the plan waypoints (similar to TM)
415 else:
416 if target_wpt.road_id != ego_wpt.road_id or target_wpt.lane_id != ego_wpt.lane_id + lane_offset:
417 next_wpt = self._local_planner.get_incoming_waypoint_and_direction(steps=3)[0]
418 if not next_wpt:
419 continue
420 if target_wpt.road_id != next_wpt.road_id or target_wpt.lane_id != next_wpt.lane_id + lane_offset:
421 continue
422
423 target_forward_vector = target_transform.get_forward_vector()
424 target_extent = target_vehicle.bounding_box.extent.x
425 target_rear_transform = target_transform
426
427 target_rear_transform.location -= carla.Location(
428 x=target_extent * target_forward_vector.x,
429 y=target_extent * target_forward_vector.y,
430 )
431
432 if is_within_distance(
433 target_rear_transform, ego_front_transform, max_distance, [low_angle_th, up_angle_th]
434 ):
435 return ObstacleDetectionResult(
436 True, target_vehicle, target_rear_transform.location.distance(ego_front_transform.location)
437 )
438
439 return ObstacleDetectionResult(False, None, -1)
440
441
[docs]
442def detect_vehicles(
443 self: "CanDetectObstacles",
444 vehicle_list: Optional[Sequence[carla.Actor] | carla.ActorList] = None,
445 max_distance: Optional[float] = None,
446 up_angle_th: float = 90,
447 low_angle_th: float = 0,
448 lane_offset: int = 0,
449) -> ObstacleDetectionResult:
450 """
451 Method to check if there is a vehicle in front or around the agent blocking its path.
452
453 Parameters:
454
455 self: The agent
456 vehicle_list: list containing vehicle objects.
457 If :code:`None`, all vehicle in the scene are used.
458 max_distance: max free-space to check for obstacles.
459 If :code:`None`, the :py:attr:`.LunaticAgentSettings.obstacles.base_vehicle_threshold` value
460 is used.
461 lane_offset: check a different lane than the one the agent is currently in.
462
463 The angle between the location and reference transform will also be taken into account.
464 Being 0 a location in front and 180, one behind, i.e, the vector between has to satisfy:
465 **low_angle_th** < angle < **up_angle_th**.
466
467 Tip:
468 As the first argument is the agent, this function can be used as a method, i.e
469 it can be added / imported directly into the agent class' body.
470
471 .. deprecated::
472 Use :py:func:`.detect_obstacles` instead.
473 """
474 return detect_obstacles(self, vehicle_list, max_distance, up_angle_th, low_angle_th, lane_offset=lane_offset)
475
476
477# Untested
478detect_obstacles_in_front = partial(detect_vehicles, up_angle_th=90, low_angle_th=0)
479"""
480:py:func:`.detect_vehicles` with the default parameters for detecting vehicles in front of the agent.
481"""
482
483detect_obstacles_behind = partial(detect_vehicles, up_angle_th=180, low_angle_th=160)
484"""
485:py:func:`.detect_vehicles` with the default parameters for detecting vehicles behind the agent.
486"""
487
488
489# ------------------------------
490# Path Planning
491# ------------------------------
492
493
[docs]
494def generate_lane_change_path(
495 waypoint: carla.Waypoint,
496 direction: Literal["left", "right"] = "left",
497 distance_same_lane: float = 10,
498 distance_other_lane: float = 25,
499 lane_change_distance: float = 25,
500 check: bool = True,
501 lane_changes: int = 1,
502 step_distance: float = 2,
503) -> "list[tuple[carla.Waypoint, RoadOption]]":
504 """
505 This method generates a path that results in a lane change.
506 Use the different distances to fine-tune the maneuver.
507 If the lane change is impossible, the returned path will be empty.
508
509 Distance traveled:
510 1. **distance_same_lane** in the same lane.
511 2. **lane_change_distance** while reaching the other lane.
512 3. **distance_other_lane** in the other lane.
513
514 Parameters:
515 waypoint: The starting waypoint.
516 direction: The direction of the lane change, either 'left' or 'right'.
517 Defaults to 'left'.
518 distance_same_lane: The distance to follow the same lane before the lane change.
519 distance_other_lane: The distance to follow the other lane after the lane change.
520 lane_change_distance: The distance to reach the center of the last lane.
521 A low value will make a fast lane change, while a high value will make slow lane change.
522 check: If :python:`True`, the method will check if the lane change is possible, i.e. that
523 there is a valid lane that the vehicle can change to.
524 This ignores :py:attr:`carla.Waypoint.lane_change`.
525 """
526 distance_same_lane = max(distance_same_lane, 0.1)
527 distance_other_lane = max(distance_other_lane, 0.1)
528 lane_change_distance = max(lane_change_distance, 0.1)
529
530 plan: "list[tuple[carla.Waypoint, RoadOption]]" = [(waypoint, RoadOption.LANEFOLLOW)]
531 option = RoadOption.LANEFOLLOW
532
533 # Same lane
534 distance = 0
535 while distance < distance_same_lane:
536 next_wps = plan[-1][0].next(step_distance) # follow a path of waypoints
537 if not next_wps:
538 return []
539 next_wp = next_wps[0]
540 distance += next_wp.transform.location.distance(plan[-1][0].transform.location)
541 plan.append((next_wp, RoadOption.LANEFOLLOW)) # next waypoint to the path
542
543 # TEMP
544 assert direction in ("left", "right") # TODO: # END: remove at end of project
545
546 if direction == "left":
547 option = RoadOption.CHANGELANELEFT
548 elif direction == "right":
549 option = RoadOption.CHANGELANERIGHT
550 else:
551 # ERROR, input value for change must be 'left' or 'right'
552 return []
553
554 lane_changes_done = 0
555 lane_change_distance = lane_change_distance / lane_changes
556
557 # Lane change
558 while lane_changes_done < lane_changes:
559 # Move forward
560 next_wps = plan[-1][0].next(lane_change_distance)
561 if not next_wps:
562 return []
563 next_wp = next_wps[0]
564
565 # Get the side lane
566 if direction == "left":
567 if check and str(next_wp.lane_change) not in ["Left", "Both"]:
568 return []
569 side_wp = next_wp.get_left_lane() # get waypoint on other lane
570 else:
571 if check and str(next_wp.lane_change) not in ["Right", "Both"]:
572 return []
573 side_wp = next_wp.get_right_lane()
574
575 if not side_wp or (check and side_wp.lane_type != carla.LaneType.Driving):
576 return []
577
578 # Update the plan
579 plan.append((side_wp, option))
580 lane_changes_done += 1
581
582 # Other lane
583 # NOTE: Might force it to follow the other lane for some time
584 distance = 0
585 while distance < distance_other_lane:
586 next_wps = plan[-1][0].next(step_distance)
587 if not next_wps:
588 return []
589 next_wp = next_wps[0]
590 distance += next_wp.transform.location.distance(plan[-1][0].transform.location)
591 plan.append((next_wp, RoadOption.LANEFOLLOW))
592
593 return plan
594
595
[docs]
596def create_agent_config(
597 self: HasBaseSettings[AgentConfigT],
598 source: Union["type[AgentConfigT]", AgentConfigT, DictConfig, str, None] = None,
599 world_model: Optional["WorldModel"] = None,
600 overwrite_options: Optional[Dict[str, Any]] = None,
601) -> AgentConfigT:
602 """
603 Method to create the :py:class:`.AgentConfig` from different input types.
604
605 Parameters:
606 self (LunaticAgent): The agent
607 source:
608 - :code:`None` takes the config from the **world model** if available.
609 - :py:class:`.AgentConfig` (class or instance) to be used.
610 - :py:class:`omegaconf.DictConfig`, a dictionary with the configuration,
611 i.e. duck-typed as :py:class:`.AgentConfig`.
612
613 Returns:
614 :py:attr:`self.BASE_SETTINGS <.LunaticAgent.BASE_SETTINGS>` (duck-typed):
615 The configuration object. The actual type depends on **source**.
616 If it is a :python:`str`, :py:class:`.AgentConfig` or :py:class:`.DictConfig`, the actual
617 return type will be a :py:class:`omegaconf.DictConfig`.
618 """
619 if overwrite_options is None:
620 overwrite_options = {}
621 if source is None and world_model and world_model._config is not None: # pyright: ignore[reportUnnecessaryComparison]
622 logger.debug("Using world model config")
623 opt_dict = world_model._config
624 elif source is None:
625 raise ValueError("Must pass a valid config as behavior or a world model with a set config.")
626 elif isinstance(source, str): # Assuming Path
627 logger.debug("Creating config from yaml file")
628 opt_dict = self.BASE_SETTINGS.from_yaml(source)
629 elif isinstance(source, AgentConfig) or (isclass(source) and issubclass(source, AgentConfig)): # pyright: ignore[reportUnnecessaryIsInstance]
630 logger.debug("Config is a dataclass / AgentConfig")
631 cfg = source.to_dict_config()
632 cfg.merge_with(overwrite_options) # Note uses DictConfig.update
633 opt_dict = cfg # has type source.__class__
634 elif isinstance(source, DictConfig): # pyright: ignore[reportUnnecessaryIsInstance]
635 logger.debug("Config is a DictConfig")
636 source.merge_with(overwrite_options)
637 opt_dict = self.BASE_SETTINGS.cast(source)
638 elif isclass(source):
639 logger.warning("Config is a class of type %s but not an AgentConfig, this is unexpected.", type(source))
640 opt_dict = typing.cast("source", source(**overwrite_options))
641 elif not overwrite_options:
642 logger.warning("Settings of type %s are not a supported Config class", type(source))
643 opt_dict = source # assume the user passed something appropriate
644 else:
645 logger.warning(
646 "Warning: Settings of type %s are not an instance of a supported class. Trying to apply overwrite options.",
647 type(source),
648 )
649 source.update(overwrite_options)
650 opt_dict = source # assume the user passed something appropriate
651 if isinstance(opt_dict, DictConfig):
652 opt_dict._set_flag("allow_objects", True) # pyright: ignore[reportPrivateUsage]
653 opt_dict.__dict__["_parent"] = None # Remove parent from the config, i.e. make it a top-level config.
654 cfg = opt_dict # pyright: ignore[reportUnknownVariableType]
655 return self.BASE_SETTINGS.cast(cfg) # duck-type it