Source code for classes.rule

   1# pyright: strict
   2
   3# pyright: reportUnnecessaryIsInstance=false, reportUnnecessaryComparison=false
   4
   5
   6from __future__ import annotations
   7
   8import inspect
   9
  10import random
  11from collections.abc import Mapping
  12from dataclasses import is_dataclass
  13from functools import partial, update_wrapper, wraps
  14from inspect import isclass
  15from itertools import accumulate
  16from typing import (
  17    TYPE_CHECKING,
  18    Any,
  19    Callable,
  20    ClassVar,
  21    Container,
  22    Dict,
  23    Hashable,
  24    Iterable,
  25    List,
  26    NoReturn,
  27    Optional,
  28    Set,
  29    TypeVar,
  30    Union,
  31    cast,
  32)
  33from weakref import CallableProxyType, WeakSet, proxy
  34
  35import pygame
  36from omegaconf import DictConfig, OmegaConf
  37from typing_extensions import (
  38    Annotated,
  39    Concatenate,
  40    Literal,
  41    NotRequired,
  42    ParamSpec,
  43    Required,
  44    Self,
  45    TypedDict,
  46    Unpack,
  47    overload,
  48)
  49
  50from agents.tools.logs import logger
  51from classes.constants import READTHEDOCS, Hazard, HazardSeverity, Phase, RulePriority, RuleResult
  52from classes.evaluation_function import ConditionFunction
  53from classes.exceptions import DoNotEvaluateChildRules, UnblockRuleException
  54from classes.worldmodel import GameFramework
  55from classes.information_manager import InformationManager
  56from launch_tools import CarlaDataProvider, singledispatchmethod
  57
  58if TYPE_CHECKING:
  59    import carla
  60    from classes.type_protocols import CallableAction, CallableActionT, ConditionFunctionLike, ConditionFunctionLikeT
  61    from classes.detection_matrix import DetectionMatrix
  62    from agents.lunatic_agent import LunaticAgent
  63    from agents.tools.config_creation import ContextSettings, LiveInfo, RuleConfig
  64    # NOTE: gameframework.py adds GameFramework to this module's variables
  65    # at this position it would be a circular import
  66
  67_T = TypeVar("_T")
  68_P = ParamSpec("_P")
  69_Rule = TypeVar("_Rule", bound="Rule")
  70
  71
[docs] 72class Context(CarlaDataProvider): 73 """ 74 Object to be passed as the first argument (instead of self) to rules, actions and evaluation functions. 75 76 The :py:class:`Context` class derives from the scenario runner's :py:class:`CarlaDataProvider` to allow access to the world, map, etc. 77 78 Tip: 79 There is normally no need to initialize the context object manually. 80 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`. 81 """ 82 83 agent: "LunaticAgent" 84 """Backreference to the agent.""" 85 86 config: "ContextSettings" 87 """A copy of the agents config. Overwritten by the condition's settings.""" 88 89 evaluation_results: dict[Phase, Hashable] # ambiguous wording, which result? here evaluation result 90 """ 91 Stores the result from the :py:meth:`Rule.condition` of the last rule that was evaluated in a phase. 92 93 .. deprecated:: in consideration 94 """ 95 96 action_results: dict[Phase, Any] 97 """ 98 Stores the result from the :py:meth:`action` of the last rule that was applicable in a phase. 99 100 .. deprecated:: in consideration 101 """ 102 103 _control: Optional[carla.VehicleControl] 104 """ 105 Current control the agent should use. 106 107 Accessible from the :py:attr:`control` property. 108 """ 109 110 prior_result: Optional[Any] # TODO: maybe rename 111 """Result of the current phase.""" 112 113 phase_results: dict[Phase, Any] 114 """ 115 Stores the results of the phases the agent has been in. 116 By default the keys are set to :py:attr:`Context.PHASE_NOT_EXECUTED`. 117 """ 118 119 last_context: Optional["Context"] 120 """The context object of the last tick. Used to access the last phase's results.""" 121 122 second_pass: Optional[bool] = None 123 """ 124 Whether or not the run_step function performs a second pass, i.e. 125 after the route has been replanned. 126 127 Warning: 128 The correctness should *not* be assumed. 129 The user is responsible for setting this value to True if a second pass is required. 130 """ 131 132 _detected_hazards: Set[Hazard] 133 """ 134 Detected hazards in the current phase. 135 136 If not empty at the end of the inner step an EmergencyStopException is raised. 137 """ 138 139 detected_hazards_info: dict[Hazard, Union[HazardSeverity, Any]] 140 """ 141 Information about the detected hazards. :py:meth:`add_hazard` inserts the given :py:class:`.HazardSeverity` as value, 142 however, note that the values are can be arbitrary if used otherwise. 143 """ 144 145 PHASE_NOT_EXECUTED: object = object() 146 """ 147 Value in :py:attr:`phase_results` to indicate that :python:`agent.execute_phase(phase)` was called for the respective phase 148 149 :meta hide-value: 150 """ 151 152 def __init__(self, agent: "LunaticAgent", **kwargs: Any): 153 """ 154 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`. 155 """ 156 self.agent = agent 157 self._control = kwargs.pop("control", None) 158 self._init_arguments = kwargs 159 self.phase_results = dict.fromkeys(Phase.get_phases(), Context.PHASE_NOT_EXECUTED) 160 161 self.config: "ContextSettings" = agent.config.copy() # type: ignore 162 self.config._content["live_info"] = agent.live_info # not a copy! # pyright: ignore 163 164 self.detected_hazards = set() 165 self.detected_hazards_info = dict.fromkeys(Hazard, HazardSeverity.NONE) 166 self.__dict__.update(kwargs) 167 168 # Less used attributes 169 self.evaluation_results = {} 170 self.action_results = {} 171 172 @property 173 def current_phase(self) -> Phase: 174 """Current :py:class:`Phase` the agent is in.""" 175 return self.agent.current_phase 176 177 @property 178 def control(self) -> Union[carla.VehicleControl, None]: 179 """ 180 Current control the agent should use. Set by :py:meth:`execute_phase(update_controls=...) <.LunaticAgent.execute_phase>`. 181 182 Note: 183 Safeguarded to be not set to None. Setting it to :code:`None` is discouraged. 184 Use :py:meth:`set_control` if setting it to :code:`None` is really needed. 185 """ 186 return self._control 187 188 @control.setter 189 def control(self, control: carla.VehicleControl): 190 if control is None: # type: ignore[comparison] 191 raise ValueError("Context.control must not be None. To set it to None explicitly use set_control.") 192 self._control = control 193
[docs] 194 def set_control(self, control: Optional[carla.VehicleControl]): 195 """Set the control, allows to set it to None.""" 196 self._control = control
197
[docs] 198 def get_or_calculate_control(self) -> carla.VehicleControl: 199 """ 200 Get the control if it is set, otherwise calculate it by 201 executing the local planner. 202 203 Returns: 204 The control the agent should use. 205 206 Note: 207 Use this function inside rules to acquire a control object- 208 209 Warning: 210 This is equivalent to ending the inner step of the agent. 211 212 See Also: 213 :py:meth:`LunaticAgent._calculate_control` 214 """ 215 if self.control: 216 return self.control 217 self.control = self.agent._calculate_control() # pyright: ignore[reportPrivateUsage] 218 return self.control # pyright: ignore[reportReturnType]
219 220 @property 221 def detected_hazards(self) -> Set[Hazard]: 222 """ 223 Detected hazards in the current phase. 224 225 If not empty at the end of the inner step an EmergencyStopException is raised. 226 """ 227 return self._detected_hazards 228 229 @detected_hazards.setter 230 def detected_hazards(self, hazards: Set[Hazard]): 231 if not isinstance(hazards, set): # type: ignore 232 raise TypeError("detected_hazards must be a set of Hazards.") 233 self._detected_hazards = hazards 234
[docs] 235 def add_hazard(self, hazard: Hazard, hazard_level: HazardSeverity = HazardSeverity.EMERGENCY): 236 """ 237 Add the specified hazard to the detected hazards, in parallel the `hazard_level` can be set which 238 which is stored in :any:`detected_hazards_info`. 239 """ 240 if hazard not in Hazard: 241 logger.warning(f"Adding {hazard} to the detected hazards which is not a member of the Hazard enum.") 242 self.detected_hazards.add(hazard) 243 self.detected_hazards_info[hazard] = hazard_level
244
[docs] 245 def discard_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "subset"): 246 """ 247 Discards a hazard from the detected hazards. 248 249 Parameters: 250 hazard: Hazard to remove from :py:attr:`detected_hazards`. 251 match: How to match the hazard to remove. 252 253 - "exact" removes if the exact :py:class:`~classes.constants.Hazard` flag is present. 254 - "subset" removes if the hazard is a subset of the detected hazard, e.g.: 255 256 - :python:`discard_hazard(Hazard.VEHICLE, match="subset")` would remove 257 :any:`Hazard.OBSTACLE` = ``Hazard.VEHICLE | PEDESTRIAN | STATIC_OBSTACLE``. 258 259 - :python:`discard_hazard(Hazard.TRAFFIC_LIGHT, match="subset")` would *not* remove 260 :any:`Hazard.TRAFFIC_LIGHT_RED`. 261 262 """ 263 if match == "subset": 264 self.detected_hazards = {h for h in self.detected_hazards if hazard not in h} # supports flags 265 elif match == "exact": 266 self.detected_hazards.discard(hazard) 267 elif match == "intersection": 268 self.detected_hazards = {h for h in self.detected_hazards if not hazard & h} 269 else: 270 msg = f"match must be 'exact', 'subset' or 'intersection', not {match}." 271 raise ValueError(msg)
272
[docs] 273 def has_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "intersection") -> bool: 274 """ 275 Checks if the hazard intersects with any of the detected hazards. 276 277 See :py:meth:`discard_hazard` for the different matching options. 278 """ 279 if match == "exact": 280 return hazard in self.detected_hazards 281 if match == "subset": 282 return any(hazard in h for h in self.detected_hazards) 283 if match == "intersection": 284 return any(hazard & h for h in self.detected_hazards) 285 msg = f"match must be 'exact', 'subset' or 'intersection', not {match}." 286 raise ValueError(msg)
287 288 # Convenience function when using detect_vehicles 289 from agents.tools.lunatic_agent_tools import max_detection_distance # noqa 290 291 @property 292 def live_info(self) -> "LiveInfo": 293 return self.config.live_info 294 295 @property 296 def active_blocking_rules(self) -> Set["BlockingRule"]: 297 return self.agent._active_blocking_rules # pyright: ignore[reportPrivateUsage]
298 299 300# @ConditionFunction["Rule", [], Literal[True]] # python 3.9+ syntax
[docs] 301@ConditionFunction 302def always_execute(ctx: Context) -> Literal[True]: # pylint: disable=unused-argument, # noqa: ARG001 303 """ 304 This is an :py:class:`.ConditionFunction` that always returns :python:`True`. It can be used to always execute an action. 305 """ 306 return True
307 308 309def _use_temporary_config( 310 func: Callable[Concatenate[_Rule, "Context", Optional[Dict[str, Any]], _P], _T], 311) -> Callable[Concatenate[_Rule, "Context", Optional[Dict[str, Any]], _P], _T]: 312 """ 313 During the condition evaluation the ctx.config should have the overwrite settings applied 314 but not in a permanent way. 315 """ 316 # TODO: To avoid unused argument error, consume the dict; however this might be harder to understand 317 318 @wraps(func) 319 def wrapper( 320 self: _Rule, ctx: Context, overwrite: Optional[Dict[str, Any]] = None, *args: _P.args, **kwargs: _P.kwargs 321 ) -> _T: 322 settings = self.overwrite_settings.copy() # Dict with "self" : SelfConfig 323 if overwrite: 324 settings.update(overwrite) 325 326 original_ctx_config = ctx.config 327 ctx.config["self"] = "???" # do not merge old self_config 328 329 temp: "ContextSettings" = OmegaConf.merge(ctx.config, settings) # type: ignore 330 331 OmegaConf.set_readonly(temp, True) 332 ctx.config = temp 333 self.self_config = self.overwrite_settings["self"] = ctx.config["self"] 334 OmegaConf.set_readonly( 335 self.self_config, False 336 ) # The Rule's settings should still be dynamic; expected in overwrite 337 try: 338 return func(self, ctx, overwrite, *args, **kwargs) 339 finally: 340 ctx.config = original_ctx_config 341 342 return wrapper 343 344 345class _CountdownRule: 346 # TODO: low prio: make cooldown dependant of tickrate or add a conversion from seconds to ticks OR make time-based 347 tickrate: ClassVar[int] = NotImplemented 348 """ 349 :meta private: 350 """ 351 352 DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0 353 """ 354 Value the cooldown is reset to when :py:meth:`reset_cooldown` is called without a value. 355 356 Used *only* when :py:attr:`cooldown_reset_value` is not set. 357 """ 358 359 start_cooldown: ClassVar[int] = 0 360 """Initial :py:attr:`cooldown` when initialized. if >0 the rule will not be ready for the first **start_cooldown** ticks.""" 361 362 _instances: ClassVar["WeakSet[_CountdownRule]"] = WeakSet() 363 """Keep track of all Rule instances for the cooldowns""" 364 365 _cooldown: int 366 """If 0 the rule is ready to be executed.""" 367 368 blocked: bool = False # NOTE: not a property 369 """Indicates if the rule is blocked for this tick only. Is reset to False after the tick.""" 370 371 if TYPE_CHECKING: 372 373 class _InitParameters(TypedDict): 374 """Dict describing the parameters of the __init__ of the class method.""" 375 376 cooldown_reset_value: NotRequired[Optional[int]] 377 enabled: NotRequired[bool] 378 379 def __init__(self, cooldown_reset_value: Optional[int] = None, enabled: bool = True): 380 self._instances.add(self) 381 self._cooldown = self.start_cooldown 382 self.max_cooldown = cooldown_reset_value if cooldown_reset_value is not None else self.DEFAULT_COOLDOWN_RESET 383 self._enabled = enabled 384 385 def is_ready(self) -> bool: 386 """Group aware check if a rule is ready.""" 387 return ( 388 self.cooldown == 0 and self.enabled and not self.blocked 389 ) # Note: uses property getters. Group aware for GroupRules 390 391 def reset_cooldown(self, value: Optional[int] = None): 392 if value is None: 393 self._cooldown = self.max_cooldown 394 elif value >= 0: 395 self._cooldown = int(value) 396 else: 397 raise ValueError("Cooldown value must be a None or a non-negative integer.") 398 399 @property 400 def cooldown(self) -> int: 401 """ 402 Cooldown of the rule in ticks until it can be executed again. Only if 0 the rule can be executed. 403 """ 404 return self._cooldown 405 406 @cooldown.setter 407 def cooldown(self, value: int): 408 self._cooldown = value 409 410 def update_cooldown(self): 411 """ 412 Update the :py:attr:`cooldown` of *this* rule. 413 414 :meta private: 415 """ 416 if self._cooldown > 0: 417 self._cooldown -= 1 418 419 @classmethod 420 def update_all_cooldowns(cls): 421 """Updates the cooldowns of **all** rules.""" 422 for instance in cls._instances: 423 instance.update_cooldown() 424 425 @classmethod 426 def unblock_all_rules(cls): 427 """Unblocks all rules""" 428 for instance in cls._instances: 429 instance.blocked = False 430 431 @property 432 def enabled(self) -> bool: 433 """If :code:`False` the rule will not be evaluated. Contrary to :py:attr:`blocked` this permanently disables the rule.""" 434 return self._enabled 435 436 @enabled.setter 437 def enabled(self, value: bool): 438 self._enabled = value 439 440 def set_active(self, value: bool): 441 """Enables or disables the rule. Contrary to :py:attr:`blocked` it will not be reset after the tick.""" 442 self._enabled = value 443 444 class CooldownFramework: 445 """ 446 Context manager that can reduce all cooldowns at the end of a `with` statement. 447 """ 448 449 def __enter__(self): 450 return self 451 452 def __exit__(self, exc_type, exc_value, traceback): # type: ignore 453 self.tick() 454 455 @staticmethod 456 def tick(): 457 """ 458 Update all cooldowns and unblock all rules. 459 460 Calls: 461 - :py:meth:`Rule.update_all_cooldowns` 462 - :py:meth:`Rule.unblock_all_rules` 463 """ 464 Rule.update_all_cooldowns() 465 Rule.unblock_all_rules() 466 467 468_GroupInstanceValues = TypedDict( 469 "_GroupInstanceValues", {"cooldown": int, "max_cooldown": int, "instances": "WeakSet[_GroupRule]"} 470) 471"""TypeHint that describes the values of `_GroupRule._group_instances`.""" 472 473 474class _GroupRule(_CountdownRule): 475 group: Optional[str] = None # group of rules that share a cooldown 476 """ 477 Group name of rules that should share their cooldown. 478 479 None for a rule to not share its cooldown. 480 """ 481 482 # first two values in the list are current and max cooldown, the third is a set of all instances 483 _group_instances: ClassVar[Dict[str, _GroupInstanceValues]] = {} 484 """ 485 Dictionary of all group instances. 486 Keys: 487 The group name. 488 Values: 489 Is a list of the *current cooldown*, the *max cooldown for reset* and a *WeakSet of all instances*. 490 """ 491 492 if TYPE_CHECKING: 493 494 class _InitParameters(_CountdownRule._InitParameters): # pyright: ignore[reportPrivateUsage] 495 """Dict describing the parameters of the __init__ of the class method.""" 496 497 group: NotRequired[Optional[str]] 498 499 def __init__(self, group: Optional[str] = None, cooldown_reset_value: Optional[int] = None, enabled: bool = True): 500 super().__init__(cooldown_reset_value, enabled) 501 self.group = group 502 if group is None: 503 return 504 if group not in self._group_instances: 505 self._group_instances[group] = {"cooldown": 0, "max_cooldown": 0, "instances": WeakSet()} 506 self._group_instances[group]["instances"].add(self) # add to weak set 507 508 @property 509 def cooldown(self) -> int: 510 """ 511 Cooldown of the rule in ticks until it can be executed again after its action was executed. 512 If 0 the rule is ready to be executed. 513 """ 514 if self.group: 515 return _GroupRule._group_instances[self.group]["cooldown"] 516 return super().cooldown 517 518 @property 519 def has_group(self) -> bool: 520 """Indicates if the rule belongs to a group, i.e. :python:`self.group is not None`.""" 521 return self.group is not None 522 523 @cooldown.setter 524 def cooldown(self, value: int): 525 if self.group: 526 _GroupRule._group_instances[self.group]["cooldown"] = value 527 return 528 super().cooldown = value 529 530 def set_my_group_cooldown(self, value: Optional[int] = None): 531 """Update the cooldown of the group this rule belong to""" 532 if self.group is None: 533 logger.warning("Rule does not belong to a group, but set_my_group_cooldown was called. Ignoring.") 534 return 535 if value is None: 536 self._group_instances[self.group]["cooldown"] = self._group_instances[self.group][ 537 "max_cooldown" 538 ] # set to max 539 else: 540 self._group_instances[self.group]["cooldown"] = value 541 542 def reset_cooldown(self, value: Optional[int] = None): 543 """Reset or set the cooldown; for a group rule it resets the group cooldown.""" 544 if self.group: 545 self.set_my_group_cooldown(value) 546 else: 547 super().reset_cooldown() 548 549 @classmethod 550 def set_cooldown_of_group(cls, group: str, value: int): 551 """Updates the cooldown of the specified group to a specific value""" 552 if group in cls._group_instances: 553 cls._group_instances[group]["cooldown"] = value 554 else: 555 msg = f"Group {group} does not exist." 556 raise ValueError(msg) 557 558 __filter_not_ready_instances: Callable[["_CountdownRule"], bool] = ( 559 lambda instance: instance.group is None and instance._cooldown > 0 560 ) # pylint: disable=line-too-long # type: ignore[attr-defined] 561 """ 562 Filter function to get all instances that are not ready. see `update_all_cooldowns` 563 Group rules should not be affected by this filter. 564 """ 565 566 @classmethod 567 def update_all_cooldowns(cls): 568 """Globally updates the cooldown of *all* rules.""" 569 # Update Groups 570 for instance_data in cls._group_instances.values(): 571 if instance_data["cooldown"] > 0: 572 instance_data["cooldown"] -= 1 573 for instance in filter(cls.__filter_not_ready_instances, cls._instances): 574 instance._cooldown -= 1 575 576
[docs] 577class Rule(_GroupRule): 578 _auto_init_: ClassVar[bool] = True 579 """ 580 If set to False the automatic :code:`__init__` creation is disabled when subclassing. 581 This automatic :code:`__init__` will fix parameters like :any:`phases` and :any:`condition` to the class. 582 583 Declaring an :code:`__init__` method in the class has the same effect as setting :code:`_auto_init_` to False. 584 585 Note: 586 Using :python:`class NewRuleType(metarule=Rule)` is nearly equivalent to :python:`_auto_init_=False`, but is not inherited. 587 """ 588 589 NOT_APPLICABLE: ClassVar[Literal[RuleResult.NOT_APPLICABLE]] = RuleResult.NOT_APPLICABLE 590 """ 591 Unique object :py:attr:`.RuleResult.NOT_APPLICABLE` that indicates that no action was executed. 592 593 :meta hide-value: 594 595 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly 596 """ 597 598 NO_RESULT: ClassVar[Literal[RuleResult.NO_RESULT]] = RuleResult.NO_RESULT 599 """ 600 Unique object :py:attr:`.RuleResult.NO_RESULT` that indicates that the rules :py:meth:`action` did not return a result, 601 e.g. because an exception was raised. 602 603 :meta hide-value: 604 605 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly 606 """ 607 608 _PROPERTY_MEMBERS: ClassVar[Set[str]] = {"cooldown", "has_group", "enabled"} 609 """ 610 A subclass can only overwrite these attributes with properties. This prevents a user accidentally overwriting the property, 611 e.g. `cooldown = 20` with a method or variable. 612 613 TODO: 614 Consider making enabled a variable and not a property. 615 """ 616 617 description: str 618 """Description of what this rule should do""" 619 620 phases: frozenset[Phase] 621 """ 622 The phase or phases in which the rule should be evaluated. 623 For instantiation the phases attribute can be any :term:`Iterable` [:py:class:`.Phase`]. 624 """ 625 626 phase: Phase 627 """For the Class API the phase attribute be set to a single Phase object.""" 628 629 if TYPE_CHECKING: 630 condition: ConditionFunctionLike[Self, [], Hashable] 631 """ 632 The condition that determines if the rule's actions should be executed. 633 634 Simple Variant: 635 return True if the action should be executed, False otherwise. 636 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`. 637 638 Advanced Variant: 639 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict. 640 641 NOTE: 642 The readthedocs description is set in the __init__ function. 643 """ 644 645 actions: dict[Any, CallableAction[Self, [], Any]] 646 """Dictionary that maps rule results to the action that should be executed.""" 647 648 action: Annotated[CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"] 649 """ 650 Action that should be executed if the rule is True. If `actions` is set, this is ignored. 651 """ 652 653 false_action: Annotated[ 654 CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`" 655 ] 656 """Action that should be executed if the rule is False. May not be set if `actions` is set.""" 657 658 if READTHEDOCS and not TYPE_CHECKING: 659 false_action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"] 660 action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"] 661 actions: dict[Any, CallableActionT] 662 663 # group : Optional[str] 664 # """Group name for rules that should share their cooldown.""" 665 666 overwrite_settings: dict[str, Any] 667 """ 668 Settings that should overwrite the agent's settings for this rule. 669 670 Note: 671 The overwrite settings are primitive :py:class:`dict` objects, 672 :py:class:`omegaconf.DictConfig` objects are converted. 673 """ 674 675 self_config: "RuleConfig" 676 """ 677 A custom sub-config for the rule that is not included in the agents settings. 678 Automatically gets a `instance` key added with the rule instance. 679 680 Can be accessed via `ctx.config.current_rule` or `self.config.self`. 681 682 Note: 683 Internally `self.config` and `ctx.config` is the same object, which makes 684 interpolations to the agent's settings possible. 685 686 Attention: 687 The **self_config object is *not* constant** it is recreated each time the 688 rule is evaluated to have the current context available. 689 """ 690 691 priority: Union[float, int, RulePriority] = RulePriority.NORMAL 692 """Rules are executed in order of their priority, from high to low.""" 693 694 # Initialization functions 695 696 _ctx: Optional[Context] = None 697 """No hard attachment, to not keep the context objects alive, use with care. Check where it is set in a rule.""" 698
[docs] 699 def clone(self): 700 """ 701 Create a new instance of the rule with the same settings. 702 703 Note: 704 - The current cooldown **is not** taken into account. 705 - The current enabled state **is** taken into account. 706 """ 707 return self.__class__(self) # Make use over overloaded __init__
708 709 @overload 710 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None) -> "Self": 711 ... 712 # No argument call; should be redundant 713 714 @overload 715 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None, **kwargs: Unpack[_InitParameters]) -> "Self": 716 ... 717 # argument call 718 719 @overload 720 def __new__(cls, **kwargs: Unpack[_InitParametersComplete]) -> "Self": 721 ... 722 # Normal init 723 724 @overload 725 def __new__(cls, phases: "type[Any] | Self") -> "Self": 726 ... 727 # Rule.copy(other_rule) 728 729 def __new__( 730 cls, 731 phases: Optional[Union[Phase, Iterable[Phase], "type[Any]", Self, str, None]] = None, 732 bases: Optional[tuple[type[Any], ...]] = None, 733 clsdict: Optional[dict[str, Any]] = None, 734 **kwargs: Any, 735 ): 736 """ 737 The @Rule decorator allows to instantiate a Rule class directly for easier out-of-the-box usage. 738 Further check for metaclass initialization, else this is a normal instance creation. 739 740 Parameters: 741 kwargs: These will be passed to :python:`__init_subclass__`. 742 """ 743 # @Rule 744 # class NewRuleInstance: 745 # or Rule(other_rule) -> copy 746 if isclass(phases): 747 if issubclass(phases, _CountdownRule): 748 raise ValueError( 749 "When using @Rule the class may not be a subclass of Rule. Do not inherit from rule or subclass from Rule instead with the decorator." 750 "Do not do:\n\t@Rule\n\tclass MyRule(>>Rule<<): ...\n" 751 ) 752 # cls is the Rule used to decorate the decorated_class 753 decorated_class = phases 754 755 # Create the new class, # NOTE: goes to __init_subclass_ 756 new_decorated_class = type( 757 decorated_class.__name__, (cls,), decorated_class.__dict__.copy(), _init_by_decorator=True, **kwargs 758 ) # > calls init_subclass; copy() for correct type! 759 if TYPE_CHECKING: 760 assert issubclass(new_decorated_class, cls) and issubclass(new_decorated_class, decorated_class) # noqa: PT018 761 762 return super().__new__(new_decorated_class) 763 764 # class NewRuleType(metaclass=Rule) # deprecated 765 if isinstance(phases, str): 766 try: 767 logger.warning("Using NewRule(metaclass=Rule) is deprecated. Use NewRule(Rule, metarule=True) instead.") 768 assert clsdict 769 assert isinstance(bases, tuple) 770 class_name = phases 771 new_rule_class = type(class_name, (cls, *bases), clsdict, metarule=True, **kwargs) 772 except Exception: 773 print( 774 "ERROR: If you want to initialize a rule be sure that you pass a Phase object as the first argument." 775 "A string assumes that you've used class NewSubclass(metaclass=Rule) " 776 "This is DEPRECATED use class NewSubclass(Rule, metarule=True) instead." 777 ) 778 raise 779 else: 780 assert issubclass(new_rule_class, cls) # for type-hints 781 return new_rule_class 782 # Normal instance 783 return super().__new__(cls) 784 785 if TYPE_CHECKING: 786 787 class _InitParameters(_GroupRule._InitParameters, total=False, closed=False): # pyright: ignore[reportPrivateUsage] 788 """Dict describing the parameters of the __init__ of the class method.""" 789 790 # phases: Union[Phase, Iterable[Phase]] # leave this and use explicitly later on 791 # condition: Optional[ConditionFunctionLike[Rule, ..., Hashable]] 792 action: Optional[Union[CallableAction[Rule, [], Any], Dict[Any, CallableAction[Rule, [], Any]]]] 793 false_action: Optional[CallableAction[Rule, [], Any]] 794 actions: Optional[Dict[Any, CallableAction[Rule, [], Any]]] 795 description: str 796 overwrite_settings: Optional[Dict[str, Any]] 797 self_config: Optional[Dict[str, Any]] 798 priority: RulePriority 799 800 class _InitParametersComplete(_InitParameters): 801 """Dict describing the parameters of the __init__ of the class method.""" 802 803 phases: Required[Union[Phase, Iterable[Phase]]] 804 condition: NotRequired[Optional[ConditionFunctionLike[Rule, ..., Hashable]]] 805 806 class _CallKwargs(TypedDict, closed=True): 807 ignore_phase: bool 808 """Default False""" 809 ignore_cooldown: bool 810 """Default False""" 811 812 @singledispatchmethod 813 def __init__( 814 self, 815 phases: Union[Phase, Iterable[Phase]], # iterable of Phases 816 # /, # phases must be positional; python3.8+ only 817 condition: Optional[ConditionFunctionLikeT] = None, 818 action: Optional[Union[CallableAction[Self, []], Dict[Any, CallableAction[Self, []]]]] = None, 819 false_action: Optional[CallableAction[Self, []]] = None, 820 *, 821 actions: Optional[Dict[Any, CallableAction[Self, []]]] = None, 822 description: str = "What does this rule do?", 823 overwrite_settings: Optional[Dict[str, Any]] = None, 824 self_config: Optional[Dict[str, Any]] = None, 825 priority: RulePriority = RulePriority.NORMAL, 826 cooldown_reset_value: Optional[int] = None, 827 group: Optional[str] = None, 828 enabled: bool = True, 829 ): 830 """ 831 Initializes a Rule object. 832 833 Parameters: 834 phases: The phase(s) when the rule should be evaluated. 835 An iterable of Phase objects or a single Phase object. 836 condition: A function that takes a Context object as input and returns a Hashable value. 837 If not provided, the class must implement a `condition` function. 838 action: A function or a dictionary of functions that take a Context object as input. 839 If `action` behaves like `actions`. 840 Only one of `action` and `actions` can be set. 841 false_action: A function that takes a Context object as input and returns any value. 842 It is used when `action` is a single function and represents the action to be taken when the condition is False. 843 actions: A dictionary of `action` functions 844 It should map the return values of `condition` to the corresponding action function. 845 If `action` is None, `actions` must be provided. 846 description: A string that describes what this rule does. 847 overwrite_settings: A dictionary of settings that will overwrite the 848 agent's setting for this Rule. 849 priority: The priority of the rule. It can be a float, an integer, or a RulePriority enum value. 850 cooldown_reset_value: An optional integer value that represents the cooldown reset value for the rule. 851 If not provided falls back to the class attribute :py:attr:`.Rule.DEFAULT_COOLDOWN_RESET`. 852 group: An optional string that specifies the group to which this rule belongs. 853 enabled: A boolean value indicating whether the rule is enabled or not. 854 855 Raises: 856 ValueError: If ``phases`` is empty or None, or if ``phases`` contains an object that is not of type Phase. 857 TypeError: If ``condition`` is None and the class does not implement a :py:meth:`condition` function, 858 or if both ``action`` and ``actions`` are None and the class does not have an :py:attr:`actions` attribute or an :py:meth:`action` function. 859 TypeError: if ``actions`` is not a Mapping object. 860 ValueError: If both ``action`` and ``actions`` are provided. 861 ValueError: If ``action`` is a Mapping and either ``false_action`` or ``actions`` is not None. 862 ValueError: If an ``action`` function is not callable. 863 ValueError: If ``description`` is not a string. 864 """ 865 866 # Check phases 867 if not phases: 868 raise ValueError("phases must not be empty or None") 869 if not isinstance(phases, frozenset): 870 phases = frozenset(phases) if isinstance(phases, Iterable) else frozenset([phases]) 871 for p in phases: 872 if not isinstance(p, Phase): 873 msg = f"phase must be of type Phases, not {type(p)}" 874 raise TypeError(msg) 875 self.phases = phases 876 877 # Check Rule 878 if condition is None and not hasattr(self, "condition"): 879 msg = ( 880 f"{self.__class__.__name__}.__init__() missing 1 required positional argument: 'condition'. " 881 "Alternatively the class must implement a `condition` function." 882 ) 883 raise TypeError(msg) 884 885 if False and TYPE_CHECKING: 886 # Idea have type hint here 887 self.condition: Any = ... 888 """ 889 The condition that determines if the rule's actions should be executed. 890 891 Simple Variant: 892 return True if the action should be executed, False otherwise. 893 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`. 894 895 Advanced Variant: 896 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict. 897 """ 898 if condition is not None: 899 self.condition = condition 900 else: 901 self.condition = self.__class__.condition 902 903 if condition is not None: 904 if not hasattr(self, "condition"): 905 self.condition = condition 906 else: 907 # Warn if cls.condition and passes condition are different 908 self_func = getattr(self.condition, "__func__", getattr(self.condition, "func", self.condition)) 909 if self_func != condition: # Compare method with function 910 logger.warning( 911 f"Warning 'condition' argument passed but class {self.__class__.__name__} " 912 "already implements a different function 'self.condition'. " 913 f"Overwriting {self.condition} with passed condition {getattr(condition, '__name__', str(condition))}. " 914 "This might lead to undesired results." 915 ) 916 917 # NOTE: IMPORTANT: self.condition = condition overwrites methods with functions 918 # To keep methods as methods the condition parameter is removed with 919 # `do_not_overwrite.append("condition")` used during __init_subclass__ 920 # Could move this check also to here, however, it will then not be checked during class creation 921 self.condition = condition 922 # else: self.__class__.condition must be true already, which was checked in the subclass initialization 923 924 # Check Actions 925 if action is not None and actions is not None: 926 try: 927 if len(actions) == 1 and action is actions[True]: 928 logger.info( 929 "`action` and `actions` have been both been used when initializing %s. " 930 "Did you use `condition.register_action`? Then you can omit the action parameter.", 931 self, 932 ) 933 action = None 934 else: 935 raise ValueError("Only one of 'action' and 'actions' can be set.") 936 except (KeyError, ValueError) as e: 937 raise ValueError( 938 "Either only one of 'action' and 'actions' can be set, or actions[True] must be the same as action " 939 "- other actions are currently not supported." 940 ) from e 941 if action is None and actions is None and not hasattr(self, "actions"): 942 # NOTE: the k in params check below is essential for this to work correctly. 943 msg = ( 944 f"{self.__class__.__name__}.__init__() arguments `action` and `actions` are both None. " 945 "Provide at least one argument alternatively the class must have an `actions` " 946 "attribute or an `action` function." 947 ) 948 raise TypeError(msg) 949 950 if action is None: 951 if not isinstance(actions, Mapping): 952 msg = f"actions must be a Mapping, not {type(actions)}" 953 raise TypeError(msg) 954 955 self.actions = actions 956 elif isinstance(action, Mapping): 957 self.actions = action 958 if false_action is not None or actions is not None: 959 raise ValueError("When passing a dict to action, false_action and actions must be None") 960 else: 961 # NOTE: Might overwrite actions attribute 962 self.actions = {} # type: ignore 963 if action is not None: 964 self.actions[True] = action 965 if false_action is not None: 966 self.actions[False] = false_action 967 # actions can be the dict of the class, we do not want the same dict instance 968 self.actions = dict(self.actions) 969 970 # Assure that method(self, ctx) like functions are accessible like them 971 for key, func in self.actions.items(): 972 if not callable(func): 973 msg = f"Action for key {key} must be callable, not {type(func)}" 974 raise TypeError(msg) 975 multiple_parameters = len(inspect.signature(func).parameters) >= 2 976 if multiple_parameters and getattr(func, "use_self", True): 977 # NOTE: could use types.MethodType 978 self.actions[key] = partial(func, self) # bind to self 979 980 # Check Description 981 if not isinstance(description, str): 982 msg = f"description must be of type str, not {type(description)}" 983 raise TypeError(msg) 984 self.description = description 985 super().__init__(group or self.group, cooldown_reset_value, enabled) # or self.group for subclassing 986 self.priority: float | int | RulePriority = priority # used by agent.add_rule 987 988 self.overwrite_settings = overwrite_settings or {} 989 if not isinstance(self.overwrite_settings, dict): 990 # NOTE: If DictConfig only the outermost will be a dict, 991 # i.e. this could be dict[str, DictConfig] 992 self.overwrite_settings = dict(self.overwrite_settings) 993 if self_config and "self" in self.overwrite_settings and self.overwrite_settings["self"] != self_config: 994 logger.debug("Warning: self_config and self.overwrite_settings['self'] must be the same object.") 995 996 default_self_config: "RuleConfig" = cast( 997 "RuleConfig", getattr(self, "self_config", getattr(self, "SelfConfig", {})) 998 ) 999 if isclass(default_self_config): 1000 if not is_dataclass(default_self_config): 1001 logger.warning( 1002 f"Class {self.__class__.__name__} has a self_config class that is not a dataclass. " 1003 "This might lead to undesired results, i.e. missing keys in the config." 1004 ) 1005 default_self_config = default_self_config() 1006 if not isinstance(default_self_config, DictConfig): 1007 default_self_config = cast( 1008 "RuleConfig", OmegaConf.create(default_self_config, flags={"allow_objects": True}) 1009 ) # type: ignore 1010 if self_config: 1011 self.self_config = cast("RuleConfig", OmegaConf.merge(default_self_config, self_config)) 1012 else: 1013 self.self_config = default_self_config 1014 assert self.self_config._get_flag("allow_objects"), "self_config must allow objects to be used as values." # pyright: ignore[reportPrivateUsage] 1015 1016 self.overwrite_settings["self"] = self.self_config 1017 self.overwrite_settings["self"]["instance"] = self 1018 1019 # Called on subclass creation. Can be used for class API 1020 def __init_subclass__(cls, _init_by_decorator: bool = False, metarule: bool = False): 1021 """ 1022 Automatically creates a :python:`__init__` function to allow for a simple to use 1023 class-interface to create rule classes. 1024 1025 By setting :python:`_auto_init_ = False` in the class definition, the automatic __init__ 1026 creation is disabled. Similarly, this is also the case if :python:`metarule=Rule` is used 1027 for the class creation. 1028 """ 1029 if hasattr(cls, "phases") and hasattr(cls, "phase") and cls.phases and cls.phase: 1030 raise ValueError( 1031 f"Both 'phases' and 'phase' are set in class {cls.__name__}. Use only one. %s, %s" 1032 % (cls.phases, cls.phase) 1033 ) 1034 1035 for attr in cls._PROPERTY_MEMBERS: 1036 if hasattr(cls, attr) and not hasattr(getattr(cls, attr), "__get__"): 1037 msg = ( 1038 f"Class {cls.__name__} has overwritten property {attr} with {getattr(cls, attr)}." 1039 " You may only overwrite the following attributes with properties: {cls._PROPERTY_MEMBERS}." 1040 "Did you mean `start_cooldown` or `cooldown_reset_value` instead of `cooldown`?" 1041 ) 1042 raise ValueError(msg) 1043 if not cls._auto_init_ or metarule: 1044 return 1045 1046 custom_init = cls.__dict__.get("__init__", False) 1047 1048 # Members that are not overwritten and passed as default arguments (None) into the __init__ 1049 do_not_overwrite = ["phases"] 1050 1051 if hasattr(cls, "condition"): 1052 # Check if the condition should be treated as a method or function 1053 rule_func: ConditionFunctionLike[Self, ..., Hashable] 1054 if isinstance(cls.condition, ConditionFunction): 1055 rule_func = cls.condition.evaluation_function # pyright: ignore[reportUnknownMemberType, reportAssignmentType] 1056 else: 1057 rule_func = cls.condition 1058 if isinstance(rule_func, staticmethod): 1059 rule_func = rule_func.__func__ 1060 1061 # Decide method(self, ctx) vs. function(ctx) 1062 if hasattr(cls.condition, "use_self") and cls.condition.use_self is not None: # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportFunctionMemberAccess, reportAttributeAccessIssue] 1063 condition_as_method: bool = cls.condition.use_self # type: ignore[attr-defined] 1064 else: 1065 params = len(inspect.signature(rule_func).parameters) 1066 if params >= 2: 1067 if params > 2: 1068 logger.warning( 1069 f"Rule {getattr(cls.condition, '__name__', cls.condition)}" # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 1070 "has more than 2 parameters. " 1071 "Treating it as a method(self, ctx, **kwargs) with self argument!" 1072 "To avoid this message or to use it as a function use" 1073 "ConditionFunction(use_self=True|False) explicitly." 1074 ) 1075 condition_as_method = True 1076 else: 1077 condition_as_method = False 1078 if condition_as_method: 1079 logger.debug( 1080 "Implementing %s as method(self, ctx) - " 1081 "If you need it as a function(ctx, *args) decorate it with " 1082 "@ConditionFunction(use_self=False).", 1083 getattr(cls.condition, "__name__", cls.condition), 1084 ) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 1085 do_not_overwrite.append("condition") 1086 else: 1087 # If the signature has only one parameter its a function; else its user decision. 1088 logger.debug( 1089 "Implementing %s as function(ctx) without a self argument " 1090 "- If you need it as a method(self, ctx, *args) decorate it with " 1091 "@ConditionFunction(use_self=True).", 1092 getattr(cls.condition, "__name__", cls.condition), 1093 ) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 1094 1095 # Actions provided by condition.actions, e.g. ConditionFunction.register_action 1096 if hasattr(cls.condition, "actions") and cls.condition.actions: # type: ignore[attr-defined] 1097 if hasattr(cls, "actions") and cls.actions: 1098 msg = ( 1099 f"Class {cls.__name__} already has an 'actions' attribute. " 1100 "It will be overwritten by the 'actions' attribute of the condition." 1101 " This is the case if ConditionFunction.register_action has been used." 1102 ) 1103 raise ValueError(msg) 1104 cls.actions = cls.condition.actions # type: ignore[attr-defined] 1105 1106 if not hasattr(cls, "description"): 1107 cls.description = cls.__doc__ or "No description provided." 1108 1109 if hasattr(cls, "self_config"): 1110 do_not_overwrite.append("self_config") 1111 1112 # Create a __init__ function that sets some of the parameters. 1113 # find overlapping parameters 1114 params = inspect.signature(cls.__init__).parameters 1115 1116 # TODO: to be lazy Rule arguments need to be added to params so that "k in params" works 1117 1118 def partial_init(self: Self, phases: Optional[Iterable[Phase]] = None, *args, **kwargs: Rule._InitParameters): # pyright: ignore[reportMissingParameterType, reportUnknownParameterType] 1119 # Need phases as first argument 1120 cls_phases = getattr(cls, "phases", None) # allow for both wordings 1121 cls_phase = getattr(cls, "phase", None) 1122 if _init_by_decorator: 1123 # Using @Rule phases as first argument is the class 1124 phases = cls_phases or cls_phase 1125 else: 1126 if phases is None: # NOTE: Could be Phase.NONE 1127 phases = cls_phases or cls_phase 1128 if phases is None: 1129 msg = f"`phases` or `phase` must be provided for class {cls.__name__}" 1130 raise ValueError(msg) 1131 # Removing condition to not overwrite it 1132 kwargs.update({k: v for k, v in cls.__dict__.items() if k in params and k not in do_not_overwrite}) # type: ignore 1133 try: 1134 if custom_init: 1135 custom_init(self, phases, *args, **kwargs) 1136 else: 1137 # note: could be single dispatch function 1138 # super will not be _GroupRule but at least Rule 1139 super(cls, self).__init__(phases, *args, **kwargs) # type: ignore[arg-type] 1140 except IndexError: # functools <= python3.10 1141 logger.error( 1142 "\nError in __init__ of %s. Possible reason: Check if the __init__ method has the correct signature. `phases` must be a positional argument.\n", 1143 cls.__name__, 1144 ) 1145 raise 1146 except TypeError as e: 1147 # e.g. forgot a action, or rules attribute (MultiRule) 1148 if "missing" in str(e): 1149 logger.error( 1150 "Class %s has likely missing attributes that cannot be passed to init. Check if all required attributes are set in the class definition.", 1151 cls.__name__, 1152 ) 1153 raise 1154 1155 update_wrapper(partial_init, cls.__init__) # pyright: ignore[reportUnknownArgumentType] 1156 cls.__init__ = partial_init # pyright: ignore[reportIncompatibleMethodOverride, reportAttributeAccessIssue] 1157 1158 @__init__.register(_CountdownRule) 1159 @__init__.register(type) 1160 def __init_by_decorating_class(self, cls: "Rule"): # pyright: ignore[reportUnusedFunction] 1161 """ 1162 Initialize by passing a Rule or class object to the __init__ method. 1163 1164 This allows the usage of the @Rule decorator and easy copying. 1165 """ 1166 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for spelling mistake 1167 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1168 1169 # TODO: Automate this via inspect.signature or TypedDict of the __init__ signature 1170 self.__init__( 1171 phases, 1172 cls.condition, 1173 getattr(cls, "action", None), 1174 getattr(cls, "false_action", None), 1175 actions=getattr(cls, "actions", None), 1176 description=cls.description, 1177 overwrite_settings=getattr(cls, "overwrite_settings", None), 1178 self_config=getattr(cls, "self_config", None), 1179 priority=getattr(cls, "priority", RulePriority.NORMAL), 1180 cooldown_reset_value=cooldown_reset_value, 1181 group=getattr(cls, "group", None), 1182 enabled=getattr(cls, "enabled", True), 1183 ) 1184 1185 @__init__.register(Mapping) 1186 def __init_from_mapping(self, source: "_InitParametersComplete"): # pyright: ignore[reportUnusedFunction] 1187 # NOTE: This is weakly tested and not much supported. 1188 condition = source.get("condition", None) 1189 if condition is not None: 1190 condition = getattr(self, "condition", None) 1191 self.__init__( 1192 source.get("phases", source.get("phase")), 1193 condition, 1194 source.get("action"), 1195 source.get("false_action"), 1196 actions=source.get("actions"), 1197 description=source.get("description", self.description), 1198 overwrite_settings=source.get("overwrite_settings"), 1199 self_config=source.get("self_config"), 1200 priority=source.get("priority", RulePriority.NORMAL), 1201 cooldown_reset_value=source.get("cooldown_reset_value"), 1202 group=source.get("group"), 1203 enabled=source.get("enabled", True), 1204 ) 1205 1206 # ----------------------- 1207 1208 @classmethod 1209 def _get_init_signature(cls): 1210 """ 1211 Get the signature of the __init__ function. 1212 1213 Returns: 1214 The signature of the __init__ function. 1215 1216 :meta private: 1217 """ 1218 return inspect.signature(cls.__init__).parameters.keys() 1219
[docs] 1220 def execute_phase( 1221 self, phase: Phase, *, prior_results: Any = None, update_controls: Optional[carla.VehicleControl] = None 1222 ): 1223 """ 1224 Attention: 1225 - **Use with care to avoid loops or recursions.** 1226 1227 - If a :py:class:`.Context` is available prefer using 1228 :py:meth:`ctx.agent.execute_phase <.LunaticAgent.execute_phase>` instead. 1229 1230 Helper function to execute a phase from within a rule, 1231 wrapper of :py:meth:`agents.lunatic_agent.LunaticAgent.execute_phase`. 1232 1233 Warns: 1234 ReferenceError: If the weak proxy pointing to the :py:class:`Context` object has been 1235 deleted. The phase will not be executed. 1236 1237 See Also: 1238 - :py:meth:`.LunaticAgent.execute_phase` 1239 """ 1240 try: 1241 self._ctx.agent.execute_phase( 1242 phase=phase, # pyright: ignore[reportOptionalMemberAccess] 1243 prior_results=prior_results, 1244 update_controls=update_controls, 1245 ) 1246 except ReferenceError: 1247 logger.error("ReferenceError in Rule.execute_phase. Weakproxy deleted.")
1248 1249 # ----------------------- 1250 # Evaluation functions 1251 # ----------------------- 1252 1253 @_use_temporary_config 1254 def evaluate( 1255 self, 1256 ctx: Context, 1257 overwrite: Optional[Dict[str, Any]] = None, # pylint: ignore=unused-argument # noqa: ARG002 1258 ) -> Union[bool, Hashable, "Literal[RuleResult.NO_RESULT]"]: 1259 """ 1260 Note: 1261 This is an **interface** function of rules that is executed during :py:meth:`__call__`. 1262 This method does not check if a rule is applicable. 1263 i.e. if the rule is in the correct phase of if it py:meth:`is_ready` 1264 this is done in :py:meth:`__call__`. 1265 1266 Meta rules with children should overwrite this method and call 1267 :py:meth:`evaluate_children` from here. 1268 1269 Executes the :py:attr:`condition` function of the rule. 1270 The decorator automatically takes care of temporarily setting the :py:attr:`overwrite_settings`. 1271 1272 Parameters: 1273 ctx: The context object that is passed to the condition function. 1274 overwrite: A dictionary of settings that will overwrite the agent's setting for this Rule. 1275 Used by the :code:`@_use_temporary_config` decorator. 1276 1277 :meta private: 1278 """ 1279 self._ctx = proxy(ctx) # use with care and access over function 1280 return self.condition(ctx) # pyright: ignore[reportCallIssue] 1281
[docs] 1282 def evaluate_children(self, ctx: Context) -> "NoReturn": # pylint: disable=unused-argument 1283 """ 1284 Not implemented for this rule class. 1285 1286 Note: 1287 This is an **interface** function of meta rules 1288 that is executed during :py:meth:`__call__` to call further rules. 1289 """ 1290 raise NotImplementedError("This method should be implemented in a subclass")
1291
[docs] 1292 def __call__( 1293 self, 1294 ctx: Context, 1295 overwrite: Optional[Dict[str, Any]] = None, 1296 *, 1297 ignore_phase: bool = False, 1298 ignore_cooldown: bool = False, 1299 ) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]: 1300 """ 1301 1. First checks if the rule is *applicable*, i.e. is its :py:attr:`cooldown == 0 <cooldown>`, 1302 if not returns :py:attr:`NOT_APPLICABLE`. 1303 2. Afterwards evaluates the rules :py:meth:`condition` function. 1304 - if the result is not in :py:attr:`actions` returns :py:attr:`NOT_APPLICABLE`. 1305 - otherwise merges the :py:attr:`overwrite_settings` with the py:attr:`.Context.config` and executes the action. 1306 1307 Parameters: 1308 ctx: The context object that is passed to the condition function. 1309 overwrite: Extends :py:attr:`overwrite_settings` for this call only. Defaults to :code:`None`. 1310 ignore_phase: If :python:`True` the phase check is skipped. Defaults to :code:`False`. 1311 ignore_cooldown: If True the cooldown check is skipped. Defaults to :code:`False`. 1312 """ 1313 # Check phase 1314 assert ignore_phase or ctx.agent.current_phase in self.phases 1315 1316 if not self.is_ready() and not ignore_cooldown: 1317 return RuleResult.NOT_APPLICABLE 1318 if ( 1319 not ignore_phase and ctx.agent.current_phase not in self.phases 1320 ): # NOTE: This is currently never False as checked in execute_phase and the agents dictionary. 1321 return RuleResult.NOT_APPLICABLE # not applicable for this phase 1322 1323 exception = None 1324 result = Rule.NO_RESULT 1325 try: 1326 result = self.evaluate(ctx, overwrite) 1327 except BaseException as e: 1328 exception = e 1329 else: 1330 ctx.evaluation_results[ctx.agent.current_phase] = result 1331 if result in self.actions: 1332 self.reset_cooldown() 1333 # Apply overwrite settings permanently 1334 ctx.config.merge_with(self.overwrite_settings) 1335 if overwrite: 1336 ctx.config.merge_with(overwrite) 1337 1338 action_result = self.actions[result]( 1339 ctx 1340 ) # todo allow priority, random chance # pyright: ignore[reportCallIssue] 1341 ctx.action_results[ctx.agent.current_phase] = action_result 1342 return action_result 1343 return RuleResult.NOT_APPLICABLE # No action was executed 1344 finally: 1345 self._ctx = None 1346 if exception: 1347 self.reset_cooldown() # 1348 raise exception
1349 1350 def __str__(self) -> str: 1351 try: 1352 if isinstance(self.condition, partial): 1353 return ( 1354 self.__class__.__name__ + f"(description='{self.description}', " 1355 f"phases={self.phases}, group={self.group}, " 1356 f"priority={self.priority}, " 1357 f"actions={self.actions}, " 1358 f"condition={self.condition.func}, " # pyright: ignore[reportUnknownMemberType] 1359 f"cooldown={self.cooldown})" 1360 ) 1361 return ( 1362 self.__class__.__name__ + f"(description='{self.description}', " 1363 f"phases={self.phases}, group={self.group}, " 1364 f"priority={self.priority}, " 1365 f"actions={self.actions}, " 1366 f"condition={self.condition.__name__}, " 1367 f"cooldown={self.cooldown})" 1368 ) 1369 except AttributeError as e: 1370 logger.info("Error during string creation: " + str(e)) 1371 return ( 1372 self.__class__.__name__ + "(Error in condition.__str__: Rule has not been initialized correctly. " 1373 "Missing attributes: " + str(e) + ")" 1374 ) 1375 1376 def __repr__(self) -> str: 1377 return str(self)
1378 1379
[docs] 1380class MultiRule(Rule, metarule=True): 1381 """ 1382 This metarule allows to execute one or multiple rules if it is applicable. 1383 Depending on :py:attr:`execute_all_rules` it will either execute all rules or only the first 1384 applicable rule from its :py:attr:`rules` list. 1385 """ 1386 1387 rules: List[Rule] 1388 """The list of child rules to be called if this rule's condition is true.""" 1389 1390 def _wrap_action(self, action: Callable[[Context], Any]): 1391 """ 1392 Wrap the passed action. 1393 First the action is executed afterwards the child rules are evaluated. 1394 1395 Note: 1396 There is no extra condition that is checked between the two actions. 1397 """ 1398 1399 @wraps(action) 1400 def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any: 1401 try: 1402 result = action(ctx, *args, **kwargs) 1403 except DoNotEvaluateChildRules as e: 1404 return e, None 1405 else: 1406 results = self.evaluate_children(ctx) # execute given rules as well 1407 return result, results 1408 1409 return wrapper 1410 1411 if TYPE_CHECKING: 1412 1413 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage,reportGeneralTypeIssues] 1414 rules: Required[List[Rule]] 1415 sort_rules: NotRequired[bool] 1416 execute_all_rules: NotRequired[bool] 1417 1418 @singledispatchmethod 1419 def __init__( 1420 self, 1421 phases: Union[Phase, Iterable[Phase]], 1422 # /, # phases must be positional; python3.8+ only 1423 rules: List[Rule], 1424 condition: Optional[ConditionFunctionLikeT] = None, 1425 *, 1426 description: str = "If its own condition is true calls the passed rules.", 1427 priority: RulePriority = RulePriority.NORMAL, 1428 sort_rules: bool = True, 1429 execute_all_rules: bool = False, 1430 action: Optional[Callable[[Context], Any]] = None, 1431 ignore_phase: bool = True, 1432 overwrite_settings: Optional[Dict[str, Any]] = None, 1433 self_config: Optional[Dict[str, Any]] = None, 1434 cooldown_reset_value: Optional[int] = None, 1435 group: Optional[str] = None, 1436 enabled: bool = True, 1437 ): 1438 """ 1439 Initializes a Rule object that can have further rules as children. 1440 1441 Args: 1442 phases (Union[Phase, Iterable]): The phase or phases in which the rule should be active. 1443 rules (List[Rule]): The list of child rules to be called if the rule's condition is true. 1444 condition (Callable[[Context]], optional): The condition that determines if the rules should be evaluated. Defaults to :py:func:`always_execute`. 1445 execute_all_rules (bool, optional): 1446 If False will only execute the first rule with a applicable condition, i.e. this MultiRule is like a node in a decision tree. 1447 If True all rules are evaluated, unless one raises a `DoNotEvaluateChildRules` exception. 1448 Defaults to :python:`False`. 1449 sort_rules (bool, optional): Flag indicating whether to sort the rules by priority. Defaults to :python:`True`. 1450 action (Callable[[Context]], optional): The action to be executed before the passed rules are evaluated. Defaults to :python:`None`. 1451 ignore_phase (bool, optional): Flag indicating whether to ignore the Phase of the passed child rules. Defaults to :python:`True`. 1452 overwrite_settings (Dict[str, Any], optional): Additional settings to overwrite the agent's settings. Defaults to :python:`None`. 1453 priority (RulePriority, optional): The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`. 1454 description (str, optional): The description of the rule. Defaults to :python:`"If its own rule is true calls the passed rules."`. 1455 group (str | None, optional): The group name of the rule. Defaults to :python:`None`. 1456 enabled (bool, optional): Flag indicating whether the rule is enabled after creation. Defaults to :python:`True`. 1457 """ 1458 self.ignore_phase = ignore_phase 1459 if rules is None: # type: ignore 1460 logger.warning( 1461 "Warning: No rules passed to %s: %s. You can still add rules to the rules attribute later.", 1462 self.__class__.mro()[1].__name__, 1463 self.__class__.__name__, 1464 ) 1465 rules = [] 1466 if len(rules) == 0: 1467 # NOTE: This will be logged twice, once more by 1468 logger.warning("Rules list is empty. %s will always return NOT_APPLICABLE.", self.__class__.__name__) 1469 self.rules = rules 1470 self.execute_all_rules = execute_all_rules 1471 if sort_rules: 1472 self.rules.sort(key=lambda r: r.priority, reverse=True) 1473 # if an action is passed to be executed before the passed rules it is wrapped to execute both 1474 if action is not None: 1475 action = self._wrap_action(action) 1476 else: 1477 action = self.evaluate_children # will be called elsewhere 1478 if condition is None and not hasattr(self, "condition"): 1479 condition_arg = always_execute 1480 else: 1481 condition_arg = condition 1482 super().__init__( 1483 phases, 1484 condition=condition_arg, 1485 action=action, 1486 description=description, 1487 overwrite_settings=overwrite_settings, 1488 self_config=self_config, 1489 priority=priority, 1490 cooldown_reset_value=cooldown_reset_value, 1491 enabled=enabled, 1492 group=group, 1493 ) 1494 1495 @__init__.register(_CountdownRule) # For similar_rule = Rule(some_rule), easier cloning 1496 @__init__.register(type) # For @Rule class MyRule: ... 1497 def __init_by_decorating_class(self, cls: "MultiRule"): # pyright: ignore[reportUnusedFunction] 1498 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow both 1499 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1500 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided." 1501 self.__init__( 1502 phases, 1503 cls.rules, 1504 cls.condition, # type: ignore 1505 description=cls.description, 1506 overwrite_settings=getattr(cls, "overwrite_settings", None), 1507 self_config=getattr(cls, "self_config", None), 1508 priority=getattr(cls, "priority", RulePriority.NORMAL), 1509 cooldown_reset_value=cooldown_reset_value, 1510 group=getattr(cls, "group", None), 1511 enabled=getattr(cls, "enabled", True), 1512 sort_rules_by_priority=getattr(cls, "sort_rules_by_priority", True), 1513 execute_all_rules=getattr(cls, "execute_all_rules", False), 1514 prior_action=getattr(cls, "prior_action", None), 1515 ignore_phase=getattr(cls, "ignore_phase", True), 1516 ) 1517 1518 @__init__.register(Mapping) 1519 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction] 1520 self.__init__( 1521 cls.get("phases", cls.get("phase")), 1522 cls["rules"], 1523 cls.get("condition"), 1524 description=cls.get("description", self.description), 1525 overwrite_settings=cls.get("overwrite_settings"), 1526 self_config=cls.get("self_config"), 1527 priority=cls.get("priority", RulePriority.NORMAL), 1528 cooldown_reset_value=cls.get("cooldown_reset_value"), 1529 group=cls.get("group"), 1530 enabled=cls.get("enabled", True), 1531 sort_rules_by_priority=cls.get("sort_rules_by_priority", True), 1532 execute_all_rules=cls.get("execute_all_rules", False), 1533 prior_action=cls.get("prior_action", None), 1534 ignore_phase=cls.get("ignore_phase", True), 1535 ) 1536
[docs] 1537 def evaluate_children(self, ctx: Context) -> Union[List[Any], Any]: # pyright: ignore[reportIncompatibleMethodOverride] 1538 """ 1539 Evaluates the children rules of the current rule in the given context. 1540 1541 Args: 1542 ctx : The context in which the child rules are evaluated. 1543 1544 Returns: 1545 The results of evaluating the children rules. 1546 Returns a list of results if execute_all_rules is True, 1547 otherwise the result of the first rule that was applied. 1548 """ 1549 results: List[Any] = [] 1550 for rule in self.rules: 1551 try: 1552 result = rule(ctx, ignore_phase=self.ignore_phase) 1553 except DoNotEvaluateChildRules: 1554 return results 1555 if not self.execute_all_rules and result is not Rule.NOT_APPLICABLE: # one rule was applied end. 1556 return result 1557 results.append(result) 1558 if not self.execute_all_rules: 1559 return Rule.NOT_APPLICABLE # does not expect a list 1560 return results
1561 1562
[docs] 1563class RandomRule(MultiRule, metarule=True): 1564 """ 1565 A rule that selects and evaluates one or more random child rules from a set of rules. 1566 1567 Args: 1568 phases : The phase or phases in which the rule is applicable. 1569 rules : The set of rules from which to select random child rules. 1570 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`. 1571 condition : 1572 A callable that determines if the rule is applicable in a given context. 1573 If None and the rule does not implement a :py:attr:`condition` attribute the rule always executes. 1574 Defaults to :python:`None`. 1575 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`. 1576 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`. 1577 priority : The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`. 1578 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`. 1579 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`. 1580 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`. 1581 group : The group to which the rule belongs. Defaults to :python:`None`. 1582 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`. 1583 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`. 1584 1585 Raises: 1586 ValueError: When passing **rules** as a dict with weights, the **weights** argument must be None. 1587 """ 1588 1589 # TODO: add a dummy attribute for one additional weight, that skips the evaluation. Should only considered once. 1590 1591 if TYPE_CHECKING: 1592 1593 class _InitParameters(MultiRule._InitParameters): # pyright: ignore[reportPrivateUsage] 1594 repeat_if_not_applicable: NotRequired[bool] 1595 weights: NotRequired[Optional[List[float]]] 1596 1597 @singledispatchmethod 1598 def __init__( 1599 self, 1600 phases: Union[Phase, Iterable[Phase]], # /, # phases must be positional; python3.8+ only 1601 rules: Union[Dict[Rule, float], List[Rule]], 1602 repeat_if_not_applicable: bool = True, 1603 condition: Optional[Callable[[Context], Any]] = None, 1604 *, 1605 action: Optional[Callable[[Context], Any]] = None, 1606 ignore_phase: bool = True, 1607 priority: RulePriority = RulePriority.NORMAL, 1608 description: str = "If its own condition is true calls one or more random child rules from the passed rules.", 1609 overwrite_settings: Optional[Dict[str, Any]] = None, 1610 self_config: Optional[Dict[str, Any]] = None, 1611 cooldown_reset_value: Optional[int] = None, 1612 group: Optional[str] = None, 1613 enabled: bool = True, 1614 weights: Optional[List[float]] = None, 1615 ): 1616 """ 1617 Initializes a Rule object that can trigger one or more random child rules. 1618 1619 Args: 1620 phases : The phase or phases in which the rule is applicable. 1621 rules : The set of rules from which to select random child rules. 1622 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`. 1623 condition : 1624 A callable that determines if the rule is applicable in a given context. 1625 If None and the rule does not implement a `condition` attribute the rule always executes. 1626 Defaults to :python:`None`. 1627 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`. 1628 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`. 1629 priority : The priority of the rule. Defaults to :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`. 1630 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`. 1631 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`. 1632 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`. 1633 group : The group to which the rule belongs. Defaults to :python:`None`. 1634 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`. 1635 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`. 1636 """ 1637 if isinstance(rules, dict): 1638 if weights is not None: 1639 raise ValueError("When passing rules as a dict with weights, the weights argument must be None") 1640 self.weights = list(accumulate(rules.values())) # cumulative weights for random.choices are more efficient 1641 self.rules = list(rules.keys()) 1642 else: 1643 self.weights = weights or list(accumulate(r.priority for r in rules)) 1644 self.rules = rules 1645 self.repeat_if_not_applicable = repeat_if_not_applicable 1646 super().__init__( 1647 phases, 1648 rules, 1649 condition=condition, 1650 action=action, 1651 description=description, 1652 priority=priority, 1653 ignore_phase=ignore_phase, 1654 overwrite_settings=overwrite_settings, 1655 self_config=self_config, 1656 cooldown_reset_value=cooldown_reset_value, 1657 enabled=enabled, 1658 group=group, 1659 ) 1660 1661 @__init__.register(_CountdownRule) 1662 @__init__.register(type) 1663 def __init_by_decorating_class(self, cls: "RandomRule"): # pyright: ignore[reportUnusedFunction] 1664 phases = getattr(cls, "phases", getattr(cls, "phase", None)) 1665 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1666 1667 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided." 1668 self.__init__( 1669 phases, 1670 cls.rules, 1671 repeat_if_not_applicable=cls.repeat_if_not_applicable, 1672 condition=cls.condition, 1673 description=cls.description, 1674 overwrite_settings=getattr(cls, "overwrite_settings", None), 1675 self_config=getattr(cls, "self_config", None), 1676 priority=getattr(cls, "priority", RulePriority.NORMAL), 1677 cooldown_reset_value=cooldown_reset_value, 1678 group=getattr(cls, "group", None), 1679 enabled=getattr(cls, "enabled", True), 1680 prior_action=getattr(cls, "prior_action", None), 1681 ignore_phase=getattr(cls, "ignore_phase", True), 1682 ) 1683 1684 @__init__.register(Mapping) 1685 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction] 1686 self.__init__( 1687 cls.get("phases", cls.get("phase")), 1688 cls["rules"], 1689 repeat_if_not_applicable=cls.get("repeat_if_not_applicable", True), 1690 condition=cls.get("condition"), 1691 description=cls.get("description", self.description), 1692 overwrite_settings=cls.get("overwrite_settings"), 1693 self_config=cls.get("self_config"), 1694 priority=cls.get("priority", RulePriority.NORMAL), 1695 cooldown_reset_value=cls.get("cooldown_reset_value"), 1696 group=cls.get("group"), 1697 enabled=cls.get("enabled", True), 1698 sort_rules_by_priority=cls.get("sort_rules_by_priority", True), 1699 execute_all_rules=cls.get("execute_all_rules", False), 1700 prior_action=cls.get("prior_action", None), 1701 ignore_phase=cls.get("ignore_phase", True), 1702 ) 1703
[docs] 1704 def evaluate_children(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None) -> Any: 1705 """ 1706 Evaluate a random child rule. 1707 If `self.repeat_if_not_applicable=False` and the randomly chosen rule is not applicable, 1708 then no further rules are evaluated 1709 For `self.repeat_if_not_applicable=False` possible rules are evaluated in a random fashion 1710 until one rule was applicable. 1711 """ 1712 if len(self.rules) == 0: 1713 logger.warning("No rules to evaluate in %s. Returning NOT_APPLICABLE.", self.__class__.__name__) 1714 return RuleResult.NOT_APPLICABLE 1715 if self.repeat_if_not_applicable: 1716 rules = self.rules.copy() 1717 weights = self.weights.copy() 1718 else: 1719 rules = self.rules 1720 weights = self.weights 1721 1722 result = RuleResult.NOT_APPLICABLE 1723 while rules: 1724 rule = random.choices(rules, cum_weights=weights, k=1)[0] 1725 try: 1726 result = rule( 1727 ctx, overwrite, ignore_phase=self.ignore_phase 1728 ) # NOTE: Context action/evaluation results only store result of LAST rule 1729 except DoNotEvaluateChildRules: 1730 return None 1731 if not self.repeat_if_not_applicable or result is not Rule.NOT_APPLICABLE: 1732 # break after the first rule of `self.repeat_if_not_applicable=False` 1733 # else break if it was applicable. 1734 break 1735 rules.remove(rule) 1736 weights = list(accumulate(r.priority for r in rules)) 1737 return result
1738 1739
[docs] 1740class BlockingRule(Rule, metarule=True): 1741 """ 1742 This meta rule allows to define rules that are able to takeover the agent's workflow and 1743 apply the :py:class:`.VehicleControl` directly from withhin the rule. 1744 1745 """ 1746 1747 _gameframework: ClassVar[Union["GameFramework", "CallableProxyType[GameFramework]", None]] = None 1748 """ 1749 Set when a :py:class:`GameFramework` is initialized. 1750 Alternatively can be set when any BlockingRule is created. 1751 """ 1752 1753 ticks_passed: int 1754 """Count how many ticks have been performed by this rule and blocked the agent.""" 1755 1756 MAX_TICKS = 5000 # 5000 * 1/20 = 250 seconds 1757 """ 1758 The amount of ticks that can be performed by this rule before it is automatically disabled. 1759 If the rule has looped for this amount of ticks if will then call :py:attr:`max_tick_callback` 1760 and raise an :py:exc:`.UnblockRuleException` afterwards. 1761 1762 As a hack :py:attr:`max_tick_callback` can change :py:attr:`ticks_passed` to prevent the 1763 exception and continue the rule. 1764 """ 1765 1766 max_tick_callback: Optional[Callable[[Self, Context], Any]] = None 1767 """ 1768 An optional callback that is executed when :py:attr:`ticks_passed` reaches :py:attr:`MAX_TICKS`. 1769 """ 1770 1771 if TYPE_CHECKING: 1772 1773 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage] 1774 gameframework: Required[Optional[GameFramework]] 1775 1776 @singledispatchmethod 1777 def __init__( 1778 self, 1779 phases: Union[Phase, Iterable[Phase]], # iterable of Phases 1780 # /, # phases must be positional; python3.8+ only 1781 condition: Optional[ConditionFunctionLikeT] = None, 1782 action: Optional[Union[CallableAction[Self, []], Dict[Any, CallableAction[Self, []]]]] = None, 1783 false_action: Optional[CallableAction[Self, []]] = None, 1784 *, 1785 gameframework: Optional[GameFramework], 1786 actions: Optional[Dict[Any, CallableAction[Rule, []]]] = None, 1787 description: str = "What does this rule do?", 1788 overwrite_settings: Optional[Dict[str, Any]] = None, 1789 self_config: Optional[Dict[str, Any]] = None, 1790 priority: RulePriority = RulePriority.NORMAL, 1791 cooldown_reset_value: Optional[int] = None, 1792 group: Optional[str] = None, 1793 enabled: bool = True, 1794 ): 1795 super().__init__( 1796 phases, 1797 condition, 1798 action, 1799 false_action, 1800 actions=actions, 1801 description=description, 1802 overwrite_settings=overwrite_settings, 1803 self_config=self_config, 1804 priority=priority, 1805 cooldown_reset_value=cooldown_reset_value, 1806 group=group, 1807 enabled=enabled, 1808 ) 1809 if gameframework: 1810 BlockingRule._gameframework = gameframework 1811 if not GameFramework.clock or GameFramework.display is None: 1812 # Not much we can do about it 1813 # logger.info("%s : GameFramework should be initialized before using this rule.", self.__class__.__name__) 1814 # NOTE: This now does not GameFramework.display 1815 GameFramework.init_pygame() 1816 self.ticks_passed = 0 1817 1818 @__init__.register(_CountdownRule) 1819 @__init__.register(type) 1820 def __init_by_decorating_class(self, cls: "BlockingRule"): # pyright: ignore[reportUnusedFunction] 1821 """ 1822 Initialize by passing a Rule or class object to the __init__ method. 1823 1824 This allows the usage of the @Rule decorator and easy copying. 1825 """ 1826 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for both 1827 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1828 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided." 1829 self.__init__( 1830 phases, 1831 cls.condition, # type: ignore 1832 action=getattr(cls, "action", None), 1833 false_action=getattr(cls, "false_action", None), 1834 gameframework=getattr(cls, "gameframework", BlockingRule._gameframework), 1835 actions=getattr(cls, "actions", None), 1836 description=cls.description, 1837 overwrite_settings=getattr(cls, "overwrite_settings", None), 1838 self_config=getattr(cls, "self_config", None), 1839 priority=getattr(cls, "priority", RulePriority.NORMAL), 1840 cooldown_reset_value=cooldown_reset_value, 1841 group=getattr(cls, "group", None), 1842 enabled=getattr(cls, "enabled", True), 1843 ) 1844 1845 @__init__.register(Mapping) 1846 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction] 1847 super().__init__(cls.get("phases", cls.get("phase")), **cls) 1848 1849 def _render_everything(self, ctx: Context): 1850 if self._gameframework: 1851 self._gameframework.render_everything() 1852 else: 1853 world_model = ctx.agent._world_model # pyright: ignore[reportPrivateUsage] 1854 display = GameFramework.display 1855 world_model.tick(GameFramework.clock) # does not tick the world! # pyright: ignore[reportArgumentType] 1856 if display: 1857 world_model.render(display, finalize=False) # pyright: ignore[reportArgumentType] 1858 try: 1859 world_model.controller.render(display) # type: ignore[attr-defined] # noqa: SIM105 1860 except AttributeError: # in case not available for controller class 1861 pass 1862 1863 dm_render_conf: "DetectionMatrix.RenderOptions" = world_model._args.camera.hud.detection_matrix # pyright: ignore[reportPrivateUsage, reportAssignmentType] 1864 if dm_render_conf and ctx.agent: 1865 ctx.agent.render_detection_matrix(display, **dm_render_conf) 1866 world_model.finalize_render(display) # pyright: ignore[reportArgumentType] 1867 if GameFramework.display: 1868 pygame.display.flip() 1869 1870 @overload 1871 def loop_agent( 1872 self, 1873 ctx: Context, 1874 control: Optional[carla.VehicleControl] = None, 1875 *, 1876 execute_planner: Literal[True], 1877 execute_phases: Any, 1878 ) -> carla.VehicleControl: ... 1879 1880 @overload 1881 def loop_agent( 1882 self, 1883 ctx: Context, 1884 control: Optional[carla.VehicleControl] = None, 1885 *, 1886 execute_planner: Literal[False], 1887 execute_phases: Any, 1888 ) -> None: ... 1889
[docs] 1890 def loop_agent( 1891 self, 1892 ctx: Context, 1893 control: Optional[carla.VehicleControl] = None, 1894 *, 1895 execute_planner: bool, 1896 execute_phases: Any = True, 1897 ) -> "carla.VehicleControl | None": 1898 """ 1899 A combination of `LunaticAgent.parse_keyboard_input`, `LunaticAgent.apply_control`, `BlockingRule.update_world`, 1900 and `Context.get_or_calculate_control` to advance agent and world. 1901 1902 Args: 1903 ctx (Context): The current context object 1904 control (Optional[carla.VehicleControl], optional): The control to apply; will overwrite the context's control. 1905 If None takes the context's control. 1906 Defaults to :python:`None`. 1907 1908 See Also: 1909 Executes the following methods: 1910 1911 1. :py:meth:`.LunaticAgent.parse_keyboard_input` 1912 2. :py:meth:`.LunaticAgent.apply_control` 1913 3. :py:meth:`.BlockingRule.update_world` ticks the :py:class:`carla.World` and renders everything. 1914 4. :py:meth:`.Context.get_or_calculate_control` to acquire the :py:class:`carla.VehicleControl` object. 1915 """ 1916 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent 1917 ctx.agent.apply_control(control) 1918 1919 # NOTE: This ticks the world forward by one step 1920 # The ctx.control is reset to None; execute_plan 1921 # > Phase.UPDATE_INFORMATION | Phase.BEGIN 1922 self.update_world(ctx, execute_phases=execute_phases) 1923 if execute_planner: 1924 return ctx.get_or_calculate_control() 1925 return None
1926
[docs] 1927 @staticmethod 1928 def get_world() -> carla.World: 1929 """Method to access the world object""" 1930 return CarlaDataProvider.get_world()
1931 1932 def _begin_tick(self, ctx: Context): 1933 self._gameframework.clock.tick() # self.args.fps) # type: ignore[attr-defined,union-type] 1934 frame = None 1935 if ( 1936 self._gameframework and self._gameframework.launch_config.handle_ticks 1937 ): # i.e. no scenario runner doing it for us 1938 if CarlaDataProvider.is_sync_mode(): 1939 frame = self.get_world().tick() 1940 else: 1941 frame = self.get_world().wait_for_tick().frame 1942 CarlaDataProvider.on_carla_tick() 1943 else: 1944 # CRITICAL: The framework expects a return value before it ticks the world, 1945 # however the blocking rules should take over the ticks; so it should still do this! 1946 # TODO: Should implement some return <-> pass trough mechanism; maybe implement a 1947 # generator-like variant of the rule. 1948 if CarlaDataProvider.is_sync_mode(): 1949 frame = self.get_world().tick() 1950 else: 1951 frame = self.get_world().wait_for_tick().frame 1952 CarlaDataProvider.on_carla_tick() 1953 1954 if CarlaDataProvider.is_sync_mode(): 1955 # We do this only in sync mode as frames could pass between gathering this information 1956 # and an agent calling InformationManager.tick(), which in turn calls global_tick 1957 # with possibly a DIFFERENT frame wasting computation. 1958 if frame is None: 1959 frame = self.get_world().get_snapshot().frame 1960 InformationManager.global_tick(frame) 1961 1962 # Tick world and render everything 1963 # control should be reset, we are at the "start of the tick again" 1964 ctx.set_control(None) 1965 self.ticks_passed += 1 1966
[docs] 1967 def update_world( 1968 self, ctx: Context, *, execute_phases: Union[bool, Container[Phase]] = True 1969 ) -> "carla.VehicleControl | None": 1970 """ 1971 Ticks the world and takes care of the rendering. 1972 1973 Will call 1974 - :python:`ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=<this Rule instance>)` 1975 - :py:meth:`.LunaticAgent.update_information`, with or without executing the phases, depending on **execute_phases**. 1976 1977 Args: 1978 ctx: The context to use 1979 execute_update_information: 1980 Whether to execute the :python:`Phase.UPDATE_INFORMATION | Phase.[BEGIN|END]` with this function. 1981 Can also be a container containing one of both of these phases. 1982 Defaults to :python:`True`. 1983 1984 Raises: 1985 UnblockRuleException: If the ticks passed are over :py:attr:`MAX_TICKS` 1986 1987 Attention: 1988 The usage with Leaderboard_ is working but experimental. 1989 The scenario expects the agent to return a control object in every step, however as this rule 1990 takes over the ticks completely an outside ScenarioManager might not work as expected. 1991 """ 1992 self._begin_tick(ctx) 1993 ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=self) 1994 1995 # Update the agent's information 1996 if execute_phases and (execute_phases is True or Phase.UPDATE_INFORMATION | Phase.BEGIN in execute_phases): 1997 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.BEGIN, prior_results=self) 1998 ctx.agent._update_information() # pyright: ignore[reportPrivateUsage] 1999 if execute_phases and Phase.UPDATE_INFORMATION | Phase.END not in self.phases: 2000 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.END, prior_results=self) 2001 if self.ticks_passed > self.MAX_TICKS: 2002 logger.info( 2003 "Rule %s has passed its max_ticks %s, calling max_tick_callback and unblocking it", self, self.MAX_TICKS 2004 ) 2005 if self.max_tick_callback: 2006 self.max_tick_callback(ctx) # pyright: ignore[reportCallIssue] 2007 if self.ticks_passed > self.MAX_TICKS: 2008 raise UnblockRuleException 2009 # max_tick_callback can override the ticks passed 2010 else: 2011 raise UnblockRuleException 2012 2013 self._render_everything(ctx)
2014
[docs] 2015 def __call__( 2016 self, 2017 ctx: Context, 2018 overwrite: Optional[Dict[str, Any]] = None, 2019 in_loop: bool = False, 2020 *, # Kwargs from Rule.__call__ 2021 ignore_phase: bool = False, 2022 ignore_cooldown: bool = False, 2023 ) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]: 2024 if not in_loop: 2025 self.ticks_passed = 0 2026 else: 2027 ignore_phase = True 2028 ignore_cooldown = True 2029 if self in ctx.agent._active_blocking_rules: # pyright: ignore[reportPrivateUsage] 2030 logger.warning( 2031 "Rule %s is already blocking the agent, this is a recursive call. Not executing the rule.", self 2032 ) 2033 return Rule.NOT_APPLICABLE 2034 try: 2035 return super().__call__(ctx, overwrite, ignore_phase=ignore_phase, ignore_cooldown=ignore_cooldown) 2036 except UnblockRuleException as e: 2037 return e.result 2038 finally: 2039 ctx.agent._active_blocking_rules.discard(self) # pyright: ignore[reportPrivateUsage]
2040 2041 def evaluate( 2042 self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None 2043 ) -> Union[bool, Hashable, Literal[RuleResult.NO_RESULT]]: 2044 result = super().evaluate(ctx, overwrite) 2045 if result in self.actions: 2046 ctx.agent._active_blocking_rules.add(self) # pyright: ignore[reportPrivateUsage] 2047 return result
2048 2049 2050# Provide necessary imports for the evaluation_function module and prevents circular imports 2051 2052import classes.evaluation_function as __evaluation_function # noqa 2053 2054__evaluation_function.Rule = Rule 2055__evaluation_function.Context = Context 2056del __evaluation_function