Source code for classes.rule

   1# pyright: strict
   2# pyright: reportIndexIssue=information, reportCallIssue=information
   3# pyright: reportGeneralTypeIssues=warning
   4# pyright: reportUnusedFunction=information, reportUnusedImport=information
   5# pyright: reportUnnecessaryIsInstance=false, reportUnnecessaryComparison=false
   6# pyright: reportPrivateUsage=none
   7
   8from __future__ import annotations
   9
  10import inspect
  11
  12import random
  13from collections.abc import Mapping
  14from dataclasses import is_dataclass
  15from functools import partial, update_wrapper, wraps
  16from inspect import isclass
  17from itertools import accumulate
  18from typing import (
  19    TYPE_CHECKING,
  20    Any,
  21    Callable,
  22    ClassVar,
  23    Container,
  24    Dict,
  25    Hashable,
  26    Iterable,
  27    List,
  28    NoReturn,
  29    Optional,
  30    Set,
  31    TypeVar,
  32    Union,
  33    cast,
  34)
  35from weakref import CallableProxyType, WeakSet, proxy
  36
  37import pygame
  38from omegaconf import DictConfig, OmegaConf
  39from typing_extensions import (
  40    Annotated,
  41    Concatenate,
  42    Literal,
  43    NotRequired,
  44    ParamSpec,
  45    Required,
  46    Self,
  47    TypedDict,
  48    Unpack,
  49    overload,
  50)
  51
  52from agents.tools.logs import logger
  53from classes.constants import READTHEDOCS, Hazard, HazardSeverity, Phase, RulePriority, RuleResult
  54from classes.evaluation_function import ConditionFunction
  55from classes.exceptions import DoNotEvaluateChildRules, UnblockRuleException
  56from classes.worldmodel import GameFramework
  57from classes.information_manager import InformationManager
  58from launch_tools import CarlaDataProvider, singledispatchmethod
  59
  60if TYPE_CHECKING:
  61    import carla
  62    from classes.type_protocols import CallableAction, CallableActionT, ConditionFunctionLike, ConditionFunctionLikeT
  63    from agents.lunatic_agent import LunaticAgent
  64    from agents.tools.config_creation import ContextSettings, LiveInfo, RuleConfig
  65    # NOTE: gameframework.py adds GameFramework to this module's variables
  66    # at this position it would be a circular import
  67
  68_T = TypeVar("_T")
  69_P = ParamSpec("_P")
  70_Rule = TypeVar("_Rule", bound="Rule")
  71
  72
[docs] 73class Context(CarlaDataProvider): 74 """ 75 Object to be passed as the first argument (instead of self) to rules, actions and evaluation functions. 76 77 The :py:class:`Context` class derives from the scenario runner's :py:class:`CarlaDataProvider` to allow access to the world, map, etc. 78 79 Tip: 80 There is normally no need to initialize the context object manually. 81 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`. 82 """ 83 84 agent: "LunaticAgent" 85 """Backreference to the agent.""" 86 87 config: "ContextSettings" 88 """A copy of the agents config. Overwritten by the condition's settings.""" 89 90 evaluation_results: dict[Phase, Hashable] # ambiguous wording, which result? here evaluation result 91 """ 92 Stores the result from the :py:meth:`Rule.condition` of the last rule that was evaluated in a phase. 93 94 .. deprecated:: in consideration 95 """ 96 97 action_results: dict[Phase, Any] 98 """ 99 Stores the result from the :py:meth:`action` of the last rule that was applicable in a phase. 100 101 .. deprecated:: in consideration 102 """ 103 104 _control: Optional[carla.VehicleControl] 105 """ 106 Current control the agent should use. 107 108 Accessible from the :py:attr:`control` property. 109 """ 110 111 prior_result: Optional[Any] # TODO: maybe rename 112 """Result of the current phase.""" 113 114 phase_results: dict[Phase, Any] 115 """ 116 Stores the results of the phases the agent has been in. 117 By default the keys are set to :py:attr:`Context.PHASE_NOT_EXECUTED`. 118 """ 119 120 last_context: Optional["Context"] 121 """The context object of the last tick. Used to access the last phase's results.""" 122 123 second_pass: Optional[bool] = None 124 """ 125 Whether or not the run_step function performs a second pass, i.e. 126 after the route has been replanned. 127 128 Warning: 129 The correctness should *not* be assumed. 130 The user is responsible for setting this value to True if a second pass is required. 131 """ 132 133 _detected_hazards: Set[Hazard] 134 """ 135 Detected hazards in the current phase. 136 137 If not empty at the end of the inner step an EmergencyStopException is raised. 138 """ 139 140 detected_hazards_info: dict[Hazard, Union[HazardSeverity, Any]] 141 """ 142 Information about the detected hazards. :py:meth:`add_hazard` inserts the given :py:class:`.HazardSeverity` as value, 143 however, note that the values are can be arbitrary if used otherwise. 144 """ 145 146 PHASE_NOT_EXECUTED: object = object() 147 """ 148 Value in :py:attr:`phase_results` to indicate that :python:`agent.execute_phase(phase)` was called for the respective phase 149 150 :meta hide-value: 151 """ 152 153 def __init__(self, agent: "LunaticAgent", **kwargs: Any): 154 """ 155 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`. 156 """ 157 self.agent = agent 158 self._control = kwargs.pop("control", None) 159 self._init_arguments = kwargs 160 self.phase_results = dict.fromkeys(Phase.get_phases(), Context.PHASE_NOT_EXECUTED) 161 162 self.config: "ContextSettings" = agent.config.copy() # type: ignore 163 self.config._content["live_info"] = agent.live_info # not a copy! # pyright: ignore 164 165 self.detected_hazards = set() 166 self.detected_hazards_info = dict.fromkeys(Hazard, HazardSeverity.NONE) 167 self.__dict__.update(kwargs) 168 169 # Less used attributes 170 self.evaluation_results = {} 171 self.action_results = {} 172 173 @property 174 def current_phase(self) -> Phase: 175 """Current :py:class:`Phase` the agent is in.""" 176 return self.agent.current_phase 177 178 @property 179 def control(self) -> Union[carla.VehicleControl, None]: 180 """ 181 Current control the agent should use. Set by :py:meth:`execute_phase(update_controls=...) <.LunaticAgent.execute_phase>`. 182 183 Note: 184 Safeguarded to be not set to None. Setting it to :code:`None` is discouraged. 185 Use :py:meth:`set_control` if setting it to :code:`None` is really needed. 186 """ 187 return self._control 188 189 @control.setter 190 def control(self, control: carla.VehicleControl): 191 if control is None: # type: ignore[comparison] 192 raise ValueError("Context.control must not be None. " 193 "To set it to None explicitly use set_control.") 194 self._control = control 195
[docs] 196 def set_control(self, control: Optional[carla.VehicleControl]): 197 """Set the control, allows to set it to None.""" 198 self._control = control
199
[docs] 200 def get_or_calculate_control(self) -> carla.VehicleControl: 201 """ 202 Get the control if it is set, otherwise calculate it by 203 executing the local planner. 204 205 Returns: 206 The control the agent should use. 207 208 Note: 209 Use this function inside rules to acquire a control object- 210 211 Warning: 212 This is equivalent to ending the inner step of the agent. 213 214 See Also: 215 :py:meth:`LunaticAgent._calculate_control` 216 """ 217 if self.control: 218 return self.control 219 self.control = self.agent._calculate_control() # pyright: ignore[reportPrivateUsage] 220 return self.control # pyright: ignore[reportReturnType]
221 222 @property 223 def detected_hazards(self) -> Set[Hazard]: 224 """ 225 Detected hazards in the current phase. 226 227 If not empty at the end of the inner step an EmergencyStopException is raised. 228 """ 229 return self._detected_hazards 230 231 @detected_hazards.setter 232 def detected_hazards(self, hazards: Set[Hazard]): 233 if not isinstance(hazards, set): # type: ignore 234 raise TypeError("detected_hazards must be a set of Hazards.") 235 self._detected_hazards = hazards 236
[docs] 237 def add_hazard(self, hazard: Hazard, hazard_level: HazardSeverity = HazardSeverity.EMERGENCY): 238 """ 239 Add the specified hazard to the detected hazards, in parallel the `hazard_level` can be set which 240 which is stored in :any:`detected_hazards_info`. 241 """ 242 if hazard not in Hazard: 243 logger.warning(f"Adding {hazard} to the detected hazards which is not a member of the Hazard enum.") 244 self.detected_hazards.add(hazard) 245 self.detected_hazards_info[hazard] = hazard_level
246
[docs] 247 def discard_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "subset"): 248 """ 249 Discards a hazard from the detected hazards. 250 251 Parameters: 252 hazard: Hazard to remove from :py:attr:`detected_hazards`. 253 match: How to match the hazard to remove. 254 255 - "exact" removes if the exact :py:class:`~classes.constants.Hazard` flag is present. 256 - "subset" removes if the hazard is a subset of the detected hazard, e.g.: 257 258 - :python:`discard_hazard(Hazard.VEHICLE, match="subset")` would remove 259 :any:`Hazard.OBSTACLE` = ``Hazard.VEHICLE | PEDESTRIAN | STATIC_OBSTACLE``. 260 261 - :python:`discard_hazard(Hazard.TRAFFIC_LIGHT, match="subset")` would *not* remove 262 :any:`Hazard.TRAFFIC_LIGHT_RED`. 263 264 """ 265 if match == "subset": 266 self.detected_hazards = {h for h in self.detected_hazards if hazard not in h} # supports flags 267 elif match == "exact": 268 self.detected_hazards.discard(hazard) 269 elif match == "intersection": 270 self.detected_hazards = {h for h in self.detected_hazards if not hazard & h} 271 else: 272 raise ValueError(f"match must be 'exact', 'subset' or 'intersection', not {match}.")
273
[docs] 274 def has_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "intersection") -> bool: 275 """ 276 Checks if the hazard intersects with any of the detected hazards. 277 278 See :py:meth:`discard_hazard` for the different matching options. 279 """ 280 if match == "exact": 281 return hazard in self.detected_hazards 282 if match == "subset": 283 return any(hazard in h for h in self.detected_hazards) 284 if match == "intersection": 285 return any(hazard & h for h in self.detected_hazards) 286 raise ValueError(f"match must be 'exact', 'subset' or 'intersection', not {match}.")
287 288 # Convenience function when using detect_vehicles 289 from agents.tools.lunatic_agent_tools import max_detection_distance # noqa 290 291 @property 292 def live_info(self) -> "LiveInfo": 293 return self.config.live_info 294 295 @property 296 def active_blocking_rules(self) -> Set["BlockingRule"]: 297 return self.agent._active_blocking_rules # pyright: ignore[reportPrivateUsage]
298 299 300#@ConditionFunction["Rule", [], Literal[True]] # python 3.9+ syntax
[docs] 301@ConditionFunction 302def always_execute(ctx: Context) -> Literal[True]: # pylint: disable=unused-argument, # noqa: ARG001 303 """ 304 This is an :py:class:`.ConditionFunction` that always returns :python:`True`. It can be used to always execute an action. 305 """ 306 return True
307 308 309def _use_temporary_config(func: Callable[Concatenate[_Rule, 310 "Context", 311 Optional[Dict[str, Any]], _P], _T] 312 ) -> Callable[Concatenate[_Rule, "Context", Optional[Dict[str, Any]], _P], _T]: 313 """ 314 During the condition evaluation the ctx.config should have the overwrite settings applied 315 but not in a permanent way. 316 """ 317 # TODO: To avoid unused argument error, consume the dict; however this might be harder to understand 318 319 @wraps(func) 320 def wrapper(self: _Rule, ctx: Context, overwrite: Optional[Dict[str, Any]] = None, *args: _P.args, **kwargs: _P.kwargs) -> _T: 321 settings = self.overwrite_settings.copy() # Dict with "self" : SelfConfig 322 if overwrite: 323 settings.update(overwrite) 324 325 original_ctx_config = ctx.config 326 ctx.config["self"] = "???" # do not merge old self_config 327 328 temp: "ContextSettings" = OmegaConf.merge(ctx.config, settings) # type: ignore 329 330 OmegaConf.set_readonly(temp, True) 331 ctx.config = temp 332 self.self_config = self.overwrite_settings["self"] = ctx.config["self"] 333 OmegaConf.set_readonly(self.self_config, False) # The Rule's settings should still be dynamic; expected in overwrite 334 try: 335 return func(self, ctx, overwrite, *args, **kwargs) 336 finally: 337 ctx.config = original_ctx_config 338 return wrapper 339 340 341class _CountdownRule: 342 343 # TODO: low prio: make cooldown dependant of tickrate or add a conversion from seconds to ticks OR make time-based 344 tickrate: ClassVar[int] = NotImplemented 345 """ 346 :meta private: 347 """ 348 349 DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0 350 """ 351 Value the cooldown is reset to when :py:meth:`reset_cooldown` is called without a value. 352 353 Used *only* when :py:attr:`cooldown_reset_value` is not set. 354 """ 355 356 start_cooldown: ClassVar[int] = 0 357 """Initial :py:attr:`cooldown` when initialized. if >0 the rule will not be ready for the first **start_cooldown** ticks.""" 358 359 _instances: ClassVar["WeakSet[_CountdownRule]"] = WeakSet() 360 """Keep track of all Rule instances for the cooldowns""" 361 362 _cooldown: int 363 """If 0 the rule is ready to be executed.""" 364 365 blocked: bool = False # NOTE: not a property 366 """Indicates if the rule is blocked for this tick only. Is reset to False after the tick.""" 367 368 if TYPE_CHECKING: 369 class _InitParameters(TypedDict): 370 """Dict describing the parameters of the __init__ of the class method.""" 371 cooldown_reset_value: NotRequired[Optional[int]] 372 enabled: NotRequired[bool] 373 374 def __init__(self, cooldown_reset_value: Optional[int] = None, enabled: bool = True): 375 self._instances.add(self) 376 self._cooldown = self.start_cooldown 377 self.max_cooldown = cooldown_reset_value if cooldown_reset_value is not None else self.DEFAULT_COOLDOWN_RESET 378 self._enabled = enabled 379 380 def is_ready(self) -> bool: 381 """Group aware check if a rule is ready.""" 382 return self.cooldown == 0 and self.enabled and not self.blocked # Note: uses property getters. Group aware for GroupRules 383 384 def reset_cooldown(self, value: Optional[int] = None): 385 if value is None: 386 self._cooldown = self.max_cooldown 387 elif value >= 0: 388 self._cooldown = int(value) 389 else: 390 raise ValueError("Cooldown value must be a None or a non-negative integer.") 391 392 @property 393 def cooldown(self) -> int: 394 """ 395 Cooldown of the rule in ticks until it can be executed again. Only if 0 the rule can be executed. 396 """ 397 return self._cooldown 398 399 @cooldown.setter 400 def cooldown(self, value: int): 401 self._cooldown = value 402 403 def update_cooldown(self): 404 """ 405 Update the :py:attr:`cooldown` of *this* rule. 406 407 :meta private: 408 """ 409 if self._cooldown > 0: 410 self._cooldown -= 1 411 412 @classmethod 413 def update_all_cooldowns(cls): 414 """Updates the cooldowns of **all** rules.""" 415 for instance in cls._instances: 416 instance.update_cooldown() 417 418 @classmethod 419 def unblock_all_rules(cls): 420 """Unblocks all rules""" 421 for instance in cls._instances: 422 instance.blocked = False 423 424 @property 425 def enabled(self) -> bool: 426 """If :code:`False` the rule will not be evaluated. Contrary to :py:attr:`blocked` this permanently disables the rule.""" 427 return self._enabled 428 429 @enabled.setter 430 def enabled(self, value: bool): 431 self._enabled = value 432 433 def set_active(self, value: bool): 434 """Enables or disables the rule. Contrary to :py:attr:`blocked` it will not be reset after the tick.""" 435 self._enabled = value 436 437 class CooldownFramework: 438 """ 439 Context manager that can reduce all cooldowns at the end of a `with` statement. 440 """ 441 442 def __enter__(self): 443 return self 444 445 def __exit__(self, exc_type, exc_value, traceback): # type: ignore 446 self.tick() 447 448 @staticmethod 449 def tick(): 450 """ 451 Update all cooldowns and unblock all rules. 452 453 Calls: 454 - :py:meth:`Rule.update_all_cooldowns` 455 - :py:meth:`Rule.unblock_all_rules` 456 """ 457 Rule.update_all_cooldowns() 458 Rule.unblock_all_rules() 459 460 461_GroupInstanceValues = TypedDict('_GroupInstanceValues', {"cooldown": int, "max_cooldown": int, "instances": "WeakSet[_GroupRule]"}) 462"""TypeHint that describes the values of `_GroupRule._group_instances`.""" 463 464 465class _GroupRule(_CountdownRule): 466 group: Optional[str] = None # group of rules that share a cooldown 467 """ 468 Group name of rules that should share their cooldown. 469 470 None for a rule to not share its cooldown. 471 """ 472 473 # first two values in the list are current and max cooldown, the third is a set of all instances 474 _group_instances: ClassVar[Dict[str, _GroupInstanceValues]] = {} 475 """ 476 Dictionary of all group instances. 477 Keys: 478 The group name. 479 Values: 480 Is a list of the *current cooldown*, the *max cooldown for reset* and a *WeakSet of all instances*. 481 """ 482 483 if TYPE_CHECKING: 484 class _InitParameters(_CountdownRule._InitParameters): # pyright: ignore[reportPrivateUsage] 485 """Dict describing the parameters of the __init__ of the class method.""" 486 group: NotRequired[Optional[str]] 487 488 def __init__(self, group: Optional[str] = None, cooldown_reset_value: Optional[int] = None, enabled: bool = True): 489 super().__init__(cooldown_reset_value, enabled) 490 self.group = group 491 if group is None: 492 return 493 if group not in self._group_instances: 494 self._group_instances[group] = {'cooldown': 0, 'max_cooldown': 0, 'instances': WeakSet()} 495 self._group_instances[group]["instances"].add(self) # add to weak set 496 497 @property 498 def cooldown(self) -> int: 499 """ 500 Cooldown of the rule in ticks until it can be executed again after its action was executed. 501 If 0 the rule is ready to be executed. 502 """ 503 if self.group: 504 return _GroupRule._group_instances[self.group]["cooldown"] 505 return super().cooldown 506 507 @property 508 def has_group(self) -> bool: 509 """Indicates if the rule belongs to a group, i.e. :python:`self.group is not None`.""" 510 return self.group is not None 511 512 @cooldown.setter 513 def cooldown(self, value: int): 514 if self.group: 515 _GroupRule._group_instances[self.group]["cooldown"] = value 516 return 517 super().cooldown = value 518 519 def set_my_group_cooldown(self, value: Optional[int] = None): 520 """Update the cooldown of the group this rule belong to""" 521 if self.group is None: 522 logger.warning("Rule does not belong to a group, but set_my_group_cooldown was called. Ignoring.") 523 return 524 if value is None: 525 self._group_instances[self.group]["cooldown"] = self._group_instances[self.group]["max_cooldown"] # set to max 526 else: 527 self._group_instances[self.group]["cooldown"] = value 528 529 def reset_cooldown(self, value: Optional[int] = None): 530 """Reset or set the cooldown; for a group rule it resets the group cooldown.""" 531 if self.group: 532 self.set_my_group_cooldown(value) 533 else: 534 super().reset_cooldown() 535 536 @classmethod 537 def set_cooldown_of_group(cls, group: str, value: int): 538 """Updates the cooldown of the specified group to a specific value""" 539 if group in cls._group_instances: 540 cls._group_instances[group]["cooldown"] = value 541 else: 542 raise ValueError(f"Group {group} does not exist.") 543 544 __filter_not_ready_instances: Callable[["_CountdownRule"], bool] = lambda instance: instance.group is None and instance._cooldown > 0 # pylint: disable=line-too-long # type: ignore[attr-defined] 545 """ 546 Filter function to get all instances that are not ready. see `update_all_cooldowns` 547 Group rules should not be affected by this filter. 548 """ 549 550 @classmethod 551 def update_all_cooldowns(cls): 552 """Globally updates the cooldown of *all* rules.""" 553 # Update Groups 554 for instance_data in cls._group_instances.values(): 555 if instance_data["cooldown"] > 0: 556 instance_data["cooldown"] -= 1 557 for instance in filter(cls.__filter_not_ready_instances, cls._instances): 558 instance._cooldown -= 1 559 560
[docs] 561class Rule(_GroupRule): 562 _auto_init_: ClassVar[bool] = True 563 """ 564 If set to False the automatic :code:`__init__` creation is disabled when subclassing. 565 This automatic :code:`__init__` will fix parameters like :any:`phases` and :any:`condition` to the class. 566 567 Declaring an :code:`__init__` method in the class has the same effect as setting :code:`_auto_init_` to False. 568 569 Note: 570 Using :python:`class NewRuleType(metarule=Rule)` is nearly equivalent to :python:`_auto_init_=False`, but is not inherited. 571 """ 572 573 NOT_APPLICABLE: ClassVar[Literal[RuleResult.NOT_APPLICABLE]] = RuleResult.NOT_APPLICABLE 574 """ 575 Unique object :py:attr:`.RuleResult.NOT_APPLICABLE` that indicates that no action was executed. 576 577 :meta hide-value: 578 579 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly 580 """ 581 582 NO_RESULT: ClassVar[Literal[RuleResult.NO_RESULT]] = RuleResult.NO_RESULT 583 """ 584 Unique object :py:attr:`.RuleResult.NO_RESULT` that indicates that the rules :py:meth:`action` did not return a result, 585 e.g. because an exception was raised. 586 587 :meta hide-value: 588 589 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly 590 """ 591 592 _PROPERTY_MEMBERS: ClassVar[Set[str]] = {"cooldown", "has_group", "enabled"} 593 """ 594 A subclass can only overwrite these attributes with properties. This prevents a user accidentally overwriting the property, 595 e.g. `cooldown = 20` with a method or variable. 596 597 TODO: 598 Consider making enabled a variable and not a property. 599 """ 600 601 description: str 602 """Description of what this rule should do""" 603 604 phases: frozenset[Phase] 605 """ 606 The phase or phases in which the rule should be evaluated. 607 For instantiation the phases attribute can be any :term:`Iterable` [:py:class:`.Phase`]. 608 """ 609 610 phase: Phase 611 """For the Class API the phase attribute be set to a single Phase object.""" 612 613 if TYPE_CHECKING: 614 condition: ConditionFunctionLike[Self, [], Hashable] 615 """ 616 The condition that determines if the rule's actions should be executed. 617 618 Simple Variant: 619 return True if the action should be executed, False otherwise. 620 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`. 621 622 Advanced Variant: 623 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict. 624 625 NOTE: 626 The readthedocs description is set in the __init__ function. 627 """ 628 629 actions: dict[Any, CallableAction[Self, [], Any]] 630 """Dictionary that maps rule results to the action that should be executed.""" 631 632 action: Annotated[CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"] 633 """ 634 Action that should be executed if the rule is True. If `actions` is set, this is ignored. 635 """ 636 637 false_action: Annotated[CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"] 638 """Action that should be executed if the rule is False. May not be set if `actions` is set.""" 639 640 if READTHEDOCS and not TYPE_CHECKING: 641 false_action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"] 642 action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"] 643 actions: dict[Any, CallableActionT] 644 645 #group : Optional[str] 646 #"""Group name for rules that should share their cooldown.""" 647 648 overwrite_settings: dict[str, Any] 649 """ 650 Settings that should overwrite the agent's settings for this rule. 651 652 Note: 653 The overwrite settings are primitive :py:class:`dict` objects, 654 :py:class:`omegaconf.DictConfig` objects are converted. 655 """ 656 657 self_config: "RuleConfig" 658 """ 659 A custom sub-config for the rule that is not included in the agents settings. 660 Automatically gets a `instance` key added with the rule instance. 661 662 Can be accessed via `ctx.config.current_rule` or `self.config.self`. 663 664 Note: 665 Internally `self.config` and `ctx.config` is the same object, which makes 666 interpolations to the agent's settings possible. 667 668 Attention: 669 The **self_config object is *not* constant** it is recreated each time the 670 rule is evaluated to have the current context available. 671 """ 672 673 priority: Union[float, int, RulePriority] = RulePriority.NORMAL 674 """Rules are executed in order of their priority, from high to low.""" 675 676 # Initialization functions 677 678 _ctx: Optional[Context] = None 679 """No hard attachment, to not keep the context objects alive, use with care. Check where it is set in a rule.""" 680
[docs] 681 def clone(self): 682 """ 683 Create a new instance of the rule with the same settings. 684 685 Note: 686 - The current cooldown **is not** taken into account. 687 - The current enabled state **is** taken into account. 688 """ 689 return self.__class__(self) # Make use over overloaded __init__
690 691 @overload 692 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None) -> "Self": ... 693 # No argument call; should be redundant 694 695 @overload 696 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None, **kwargs: Unpack[_InitParameters]) -> "Self": ... 697 # argument call 698 699 @overload 700 def __new__(cls, **kwargs: Unpack[_InitParametersComplete]) -> "Self": ... 701 # Normal init 702 703 @overload 704 def __new__(cls, phases: "type[Any] | Self") -> "Self": ... 705 # Rule.copy(other_rule) 706 707 def __new__(cls, 708 phases: Optional[Union[Phase, Iterable[Phase], None, "type[Any]", Self, str]] = None, 709 bases: Optional[tuple[type[Any], ...]] = None, 710 clsdict: Optional[dict[str, Any]] = None, 711 **kwargs: Any): 712 """ 713 The @Rule decorator allows to instantiate a Rule class directly for easier out-of-the-box usage. 714 Further check for metaclass initialization, else this is a normal instance creation. 715 716 Parameters: 717 kwargs: These will be passed to :python:`__init_subclass__`. 718 """ 719 # @Rule 720 # class NewRuleInstance: 721 # or Rule(other_rule) -> copy 722 if isclass(phases): 723 if issubclass(phases, _CountdownRule): 724 raise ValueError("When using @Rule the class may not be a subclass of Rule. Do not inherit from rule or subclass from Rule instead with the decorator." 725 "Do not do:\n\t@Rule\n\tclass MyRule(>>Rule<<): ...\n") 726 # cls is the Rule used to decorate the decorated_class 727 decorated_class = phases 728 729 # Create the new class, # NOTE: goes to __init_subclass_ 730 new_decorated_class = type(decorated_class.__name__, (cls, ), decorated_class.__dict__.copy(), 731 _init_by_decorator=True, **kwargs) # > calls init_subclass; copy() for correct type! 732 if TYPE_CHECKING: 733 assert issubclass(new_decorated_class, cls) and issubclass(new_decorated_class, decorated_class) 734 735 return super().__new__(new_decorated_class) 736 737 # class NewRuleType(metaclass=Rule) # deprecated 738 if isinstance(phases, str): 739 try: 740 logger.warning("Using NewRule(metaclass=Rule) is deprecated. Use NewRule(Rule, metarule=True) instead.") 741 assert clsdict and isinstance(bases, tuple) 742 class_name = phases 743 new_rule_class = type(class_name, (cls, *bases), clsdict, metarule=True, **kwargs) 744 except Exception: 745 print("ERROR: If you want to initialize a rule be sure that you pass a Phase object as the first argument." 746 "A string assumes that you've used class NewSubclass(metaclass=Rule) " 747 "This is DEPRECATED use class NewSubclass(Rule, metarule=True) instead.") 748 raise 749 else: 750 assert issubclass(new_rule_class, cls) # for type-hints 751 return new_rule_class 752 # Normal instance 753 return super().__new__(cls) 754 755 if TYPE_CHECKING: 756 class _InitParameters(_GroupRule._InitParameters, total=False, closed=False): # pyright: ignore[reportPrivateUsage] 757 """Dict describing the parameters of the __init__ of the class method.""" 758 #phases: Union[Phase, Iterable[Phase]] # leave this and use explicitly later on 759 #condition: Optional[ConditionFunctionLike[Rule, ..., Hashable]] 760 action: Optional[Union[CallableAction[Rule, [], Any], Dict[Any, CallableAction[Rule, [], Any]]]] 761 false_action: Optional[CallableAction[Rule, [], Any]] 762 actions: Optional[Dict[Any, CallableAction[Rule, [], Any]]] 763 description: str 764 overwrite_settings: Optional[Dict[str, Any]] 765 self_config: Optional[Dict[str, Any]] 766 priority: RulePriority 767 768 class _InitParametersComplete(_InitParameters): 769 """Dict describing the parameters of the __init__ of the class method.""" 770 phases: Required[Union[Phase, Iterable[Phase]]] 771 condition: NotRequired[Optional[ConditionFunctionLike[Rule, ..., Hashable]]] 772 773 class _CallKwargs(TypedDict, closed=True): 774 ignore_phase: bool 775 """Default False""" 776 ignore_cooldown: bool 777 """Default False""" 778 779 @singledispatchmethod 780 def __init__(self, 781 phases: Union[Phase, Iterable[Phase]], # iterable of Phases 782 #/, # phases must be positional; python3.8+ only 783 condition: Optional[ConditionFunctionLikeT] = None, 784 action: Optional[Union[CallableAction[Self, []], Dict[Any, CallableAction[Self, []]]]] = None, 785 false_action: Optional[CallableAction[Self, []]] = None, 786 *, 787 actions: Optional[Dict[Any, CallableAction[Self, []]]] = None, 788 description: str = "What does this rule do?", 789 overwrite_settings: Optional[Dict[str, Any]] = None, 790 self_config: Optional[Dict[str, Any]] = None, 791 priority: RulePriority = RulePriority.NORMAL, 792 cooldown_reset_value: Optional[int] = None, 793 group: Optional[str] = None, 794 enabled: bool = True, 795 ): 796 """ 797 Initializes a Rule object. 798 799 Parameters: 800 phases: The phase(s) when the rule should be evaluated. 801 An iterable of Phase objects or a single Phase object. 802 condition: A function that takes a Context object as input and returns a Hashable value. 803 If not provided, the class must implement a `condition` function. 804 action: A function or a dictionary of functions that take a Context object as input. 805 If `action` behaves like `actions`. 806 Only one of `action` and `actions` can be set. 807 false_action: A function that takes a Context object as input and returns any value. 808 It is used when `action` is a single function and represents the action to be taken when the condition is False. 809 actions: A dictionary of `action` functions 810 It should map the return values of `condition` to the corresponding action function. 811 If `action` is None, `actions` must be provided. 812 description: A string that describes what this rule does. 813 overwrite_settings: A dictionary of settings that will overwrite the 814 agent's setting for this Rule. 815 priority: The priority of the rule. It can be a float, an integer, or a RulePriority enum value. 816 cooldown_reset_value: An optional integer value that represents the cooldown reset value for the rule. 817 If not provided falls back to the class attribute :py:attr:`.Rule.DEFAULT_COOLDOWN_RESET`. 818 group: An optional string that specifies the group to which this rule belongs. 819 enabled: A boolean value indicating whether the rule is enabled or not. 820 821 Raises: 822 ValueError: If ``phases`` is empty or None, or if ``phases`` contains an object that is not of type Phase. 823 TypeError: If ``condition`` is None and the class does not implement a :py:meth:`condition` function, 824 or if both ``action`` and ``actions`` are None and the class does not have an :py:attr:`actions` attribute or an :py:meth:`action` function. 825 TypeError: if ``actions`` is not a Mapping object. 826 ValueError: If both ``action`` and ``actions`` are provided. 827 ValueError: If ``action`` is a Mapping and either ``false_action`` or ``actions`` is not None. 828 ValueError: If an ``action`` function is not callable. 829 ValueError: If ``description`` is not a string. 830 """ 831 832 # Check phases 833 if not phases: 834 raise ValueError("phases must not be empty or None") 835 if not isinstance(phases, frozenset): 836 phases = frozenset(phases) if isinstance(phases, Iterable) else frozenset([phases]) 837 for p in phases: 838 if not isinstance(p, Phase): 839 raise TypeError(f"phase must be of type Phases, not {type(p)}") 840 self.phases = phases 841 842 # Check Rule 843 if condition is None and not hasattr(self, "condition"): 844 raise TypeError( 845 f"{self.__class__.__name__}.__init__() missing 1 required positional argument: 'condition'. " 846 "Alternatively the class must implement a `condition` function.") 847 848 if False and TYPE_CHECKING: 849 # Idea have type hint here 850 self.condition: Any = ... 851 """ 852 The condition that determines if the rule's actions should be executed. 853 854 Simple Variant: 855 return True if the action should be executed, False otherwise. 856 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`. 857 858 Advanced Variant: 859 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict. 860 """ 861 if condition is not None: 862 self.condition = condition 863 else: 864 self.condition = self.__class__.condition 865 866 if condition is not None: 867 if not hasattr(self, "condition"): 868 self.condition = condition 869 else: 870 # Warn if cls.condition and passes condition are different 871 self_func = getattr(self.condition, 872 "__func__", 873 getattr(self.condition, "func", self.condition)) 874 if self_func != condition: # Compare method with function 875 logger.warning( 876 f"Warning 'condition' argument passed but class {self.__class__.__name__} " 877 "already implements a different function 'self.condition'. " 878 f"Overwriting {self.condition} with passed condition {getattr(condition, '__name__', str(condition))}. " 879 "This might lead to undesired results.") 880 881 # NOTE: IMPORTANT: self.condition = condition overwrites methods with functions 882 # To keep methods as methods the condition parameter is removed with 883 # `do_not_overwrite.append("condition")` used during __init_subclass__ 884 # Could move this check also to here, however, it will then not be checked during class creation 885 self.condition = condition 886 # else: self.__class__.condition must be true already, which was checked in the subclass initialization 887 888 # Check Actions 889 if action is not None and actions is not None: 890 try: 891 if len(actions) == 1 and action is actions[True]: 892 logger.info( 893 "`action` and `actions` have been both been used when initializing %s. " 894 "Did you use `condition.register_action`? Then you can omit the action parameter.", 895 self) 896 action = None 897 else: 898 raise ValueError("Only one of 'action' and 'actions' can be set.") 899 except (KeyError, ValueError) as e: 900 raise ValueError( 901 "Either only one of 'action' and 'actions' can be set, or actions[True] must be the same as action " 902 "- other actions are currently not supported.") from e 903 if action is None and actions is None and not hasattr(self, "actions"): 904 # NOTE: the k in params check below is essential for this to work correctly. 905 raise TypeError( 906 f"{self.__class__.__name__}.__init__() arguments `action` and `actions` are both None. " 907 "Provide at least one argument alternatively the class must have an `actions` " 908 "attribute or an `action` function.") 909 910 if action is None: 911 if not isinstance(actions, Mapping): 912 raise TypeError(f"actions must be a Mapping, not {type(actions)}") 913 914 self.actions = actions 915 elif isinstance(action, Mapping): 916 self.actions = action 917 if false_action is not None or actions is not None: 918 raise ValueError("When passing a dict to action, false_action and actions must be None") 919 else: 920 # NOTE: Might overwrite actions attribute 921 self.actions = {} # type: ignore 922 if action is not None: 923 self.actions[True] = action 924 if false_action is not None: 925 self.actions[False] = false_action 926 # actions can be the dict of the class, we do not want the same dict instance 927 self.actions = dict(self.actions) 928 929 # Assure that method(self, ctx) like functions are accessible like them 930 for key, func in self.actions.items(): 931 if not callable(func): 932 raise TypeError(f"Action for key {key} must be callable, not {type(func)}") 933 multiple_parameters = len(inspect.signature(func).parameters) >= 2 934 if multiple_parameters and getattr(func, "use_self", True): 935 # NOTE: could use types.MethodType 936 self.actions[key] = partial(func, self) # bind to self 937 938 # Check Description 939 if not isinstance(description, str): 940 raise TypeError(f"description must be of type str, not {type(description)}") 941 self.description = description 942 super().__init__(group or self.group, cooldown_reset_value, enabled) # or self.group for subclassing 943 self.priority: float | int | RulePriority = priority # used by agent.add_rule 944 945 self.overwrite_settings = overwrite_settings or {} 946 if not isinstance(self.overwrite_settings, dict): 947 # NOTE: If DictConfig only the outermost will be a dict, 948 # i.e. this could be dict[str, DictConfig] 949 self.overwrite_settings = dict(self.overwrite_settings) 950 if self_config and "self" in self.overwrite_settings and self.overwrite_settings["self"] != self_config: 951 logger.debug("Warning: self_config and self.overwrite_settings['self'] must be the same object.") 952 953 default_self_config = cast("RuleConfig", getattr(self, "self_config", getattr(self, "SelfConfig", {}))) 954 if isclass(default_self_config): 955 if not is_dataclass(default_self_config): 956 logger.warning( 957 f"Class {self.__class__.__name__} has a self_config class that is not a dataclass. " 958 "This might lead to undesired results, i.e. missing keys in the config.") 959 default_self_config = default_self_config() 960 if not isinstance(default_self_config, DictConfig): 961 default_self_config = cast("RuleConfig", OmegaConf.create(default_self_config, flags={"allow_objects": True})) # type: ignore 962 if self_config: 963 self.self_config = cast("RuleConfig", OmegaConf.merge(default_self_config, self_config)) 964 else: 965 self.self_config = default_self_config 966 assert self.self_config._get_flag("allow_objects"), "self_config must allow objects to be used as values." # pyright: ignore[reportPrivateUsage] 967 968 self.overwrite_settings["self"] = self.self_config 969 self.overwrite_settings["self"]["instance"] = self 970 971 # Called on subclass creation. Can be used for class API 972 def __init_subclass__(cls, _init_by_decorator: bool = False, metarule: bool = False): 973 """ 974 Automatically creates a :python:`__init__` function to allow for a simple to use 975 class-interface to create rule classes. 976 977 By setting :python:`_auto_init_ = False` in the class definition, the automatic __init__ 978 creation is disabled. Similarly, this is also the case if :python:`metarule=Rule` is used 979 for the class creation. 980 """ 981 if hasattr(cls, "phases") and hasattr(cls, "phase") and cls.phases and cls.phase: 982 raise ValueError(f"Both 'phases' and 'phase' are set in class {cls.__name__}. Use only one. %s, %s" % (cls.phases, cls.phase)) 983 984 for attr in cls._PROPERTY_MEMBERS: 985 if hasattr(cls, attr) and not hasattr(getattr(cls, attr), "__get__"): 986 raise ValueError( 987 f"Class {cls.__name__} has overwritten property {attr} with {getattr(cls, attr)}." 988 " You may only overwrite the following attributes with properties: {cls._PROPERTY_MEMBERS}." 989 "Did you mean `start_cooldown` or `cooldown_reset_value` instead of `cooldown`?") 990 if not cls._auto_init_ or metarule: 991 return 992 993 custom_init = cls.__dict__.get("__init__", False) 994 995 # Members that are not overwritten and passed as default arguments (None) into the __init__ 996 do_not_overwrite = ["phases"] 997 998 if hasattr(cls, "condition"): 999 # Check if the condition should be treated as a method or function 1000 rule_func: ConditionFunctionLike[Self, ..., Hashable] 1001 if isinstance(cls.condition, ConditionFunction): 1002 rule_func = cls.condition.evaluation_function # pyright: ignore[reportUnknownMemberType, reportAssignmentType] 1003 else: 1004 rule_func = cls.condition 1005 if isinstance(rule_func, staticmethod): 1006 rule_func = rule_func.__func__ 1007 1008 # Decide method(self, ctx) vs. function(ctx) 1009 if hasattr(cls.condition, "use_self") and cls.condition.use_self is not None: # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportFunctionMemberAccess, reportAttributeAccessIssue] 1010 condition_as_method: bool = cls.condition.use_self # type: ignore[attr-defined] 1011 else: 1012 params = len(inspect.signature(rule_func).parameters) 1013 if params >= 2: 1014 if params > 2: 1015 logger.warning(f"Rule {getattr(cls.condition, '__name__', cls.condition)}" # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 1016 "has more than 2 parameters. " 1017 "Treating it as a method(self, ctx, **kwargs) with self argument!" 1018 "To avoid this message or to use it as a function use" 1019 "ConditionFunction(use_self=True|False) explicitly.") 1020 condition_as_method = True 1021 else: 1022 condition_as_method = False 1023 if condition_as_method: 1024 logger.debug("Implementing %s as method(self, ctx) - " 1025 "If you need it as a function(ctx, *args) decorate it with " 1026 "@ConditionFunction(use_self=False).", 1027 getattr(cls.condition, '__name__', cls.condition)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 1028 do_not_overwrite.append("condition") 1029 else: 1030 # If the signature has only one parameter its a function; else its user decision. 1031 logger.debug("Implementing %s as function(ctx) without a self argument " 1032 "- If you need it as a method(self, ctx, *args) decorate it with " 1033 "@ConditionFunction(use_self=True).", 1034 getattr(cls.condition, '__name__', cls.condition)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] 1035 1036 # Actions provided by condition.actions, e.g. ConditionFunction.register_action 1037 if hasattr(cls.condition, "actions") and cls.condition.actions: # type: ignore[attr-defined] 1038 if hasattr(cls, "actions") and cls.actions: 1039 raise ValueError(f"Class {cls.__name__} already has an 'actions' attribute. " 1040 "It will be overwritten by the 'actions' attribute of the condition." 1041 " This is the case if ConditionFunction.register_action has been used.") 1042 cls.actions = cls.condition.actions # type: ignore[attr-defined] 1043 1044 if not hasattr(cls, "description"): 1045 cls.description = cls.__doc__ or "No description provided." 1046 1047 if hasattr(cls, "self_config"): 1048 do_not_overwrite.append("self_config") 1049 1050 # Create a __init__ function that sets some of the parameters. 1051 # find overlapping parameters 1052 params = inspect.signature(cls.__init__).parameters 1053 1054 # TODO: to be lazy Rule arguments need to be added to params so that "k in params" works 1055 1056 def partial_init(self: Self, phases: Optional[Iterable[Phase]] = None, *args, **kwargs: Rule._InitParameters): # pyright: ignore[reportMissingParameterType, reportUnknownParameterType] 1057 # Need phases as first argument 1058 cls_phases = getattr(cls, "phases", None) # allow for both wordings 1059 cls_phase = getattr(cls, "phase", None) 1060 if _init_by_decorator: 1061 # Using @Rule phases as first argument is the class 1062 phases = cls_phases or cls_phase 1063 else: 1064 if phases is None: # NOTE: Could be Phase.NONE 1065 phases = cls_phases or cls_phase 1066 if phases is None: 1067 raise ValueError(f"`phases` or `phase` must be provided for class {cls.__name__}") 1068 # Removing condition to not overwrite it 1069 kwargs.update({k: v for k, v in cls.__dict__.items() if k in params and k not in do_not_overwrite}) # type: ignore 1070 try: 1071 if custom_init: 1072 custom_init(self, phases, *args, **kwargs) 1073 else: 1074 # note: could be single dispatch function 1075 # super will not be _GroupRule but at least Rule 1076 super(cls, self).__init__(phases, *args, **kwargs) # type: ignore[arg-type] 1077 except IndexError: # functools <= python3.10 1078 logger.error("\nError in __init__ of %s. Possible reason: Check if the __init__ method has the correct signature. `phases` must be a positional argument.\n", cls.__name__) 1079 raise 1080 except TypeError as e: 1081 # e.g. forgot a action, or rules attribute (MultiRule) 1082 if "missing" in str(e): 1083 logger.error("Class %s has likely missing attributes that cannot be passed to init. Check if all required attributes are set in the class definition.", cls.__name__) 1084 raise 1085 update_wrapper(partial_init, cls.__init__) # pyright: ignore[reportUnknownArgumentType] 1086 cls.__init__ = partial_init # pyright: ignore[reportIncompatibleMethodOverride, reportAttributeAccessIssue] 1087 1088 @__init__.register(_CountdownRule) 1089 @__init__.register(type) 1090 def __init_by_decorating_class(self, cls: "Rule"): # pyright: ignore[reportUnusedFunction] 1091 """ 1092 Initialize by passing a Rule or class object to the __init__ method. 1093 1094 This allows the usage of the @Rule decorator and easy copying. 1095 """ 1096 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for spelling mistake 1097 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1098 1099 # TODO: Automate this via inspect.signature or TypedDict of the __init__ signature 1100 self.__init__(phases, 1101 cls.condition, 1102 getattr(cls, "action", None), 1103 getattr(cls, "false_action", None), 1104 actions=getattr(cls, "actions", None), 1105 description=cls.description, 1106 overwrite_settings=getattr(cls, "overwrite_settings", None), 1107 self_config=getattr(cls, "self_config", None), 1108 priority=getattr(cls, "priority", RulePriority.NORMAL), 1109 cooldown_reset_value=cooldown_reset_value, 1110 group=getattr(cls, "group", None), 1111 enabled=getattr(cls, "enabled", True)) 1112 1113 @__init__.register(Mapping) 1114 def __init_from_mapping(self, source: "_InitParametersComplete"): # pyright: ignore[reportUnusedFunction] 1115 # NOTE: This is weakly tested and not much supported. 1116 condition = source.get("condition", None) 1117 if condition is not None: 1118 condition = getattr(self, "condition", None) 1119 self.__init__(source.get("phases", source.get("phase")), 1120 condition, 1121 source.get("action"), 1122 source.get("false_action"), 1123 actions=source.get("actions"), 1124 description=source.get("description", self.description), 1125 overwrite_settings=source.get("overwrite_settings"), 1126 self_config=source.get("self_config"), 1127 priority=source.get("priority", RulePriority.NORMAL), 1128 cooldown_reset_value=source.get("cooldown_reset_value"), 1129 group=source.get("group"), 1130 enabled=source.get("enabled", True)) 1131 1132 # ----------------------- 1133 1134 @classmethod 1135 def _get_init_signature(cls): 1136 """ 1137 Get the signature of the __init__ function. 1138 1139 Returns: 1140 The signature of the __init__ function. 1141 1142 :meta private: 1143 """ 1144 return inspect.signature(cls.__init__).parameters.keys() 1145
[docs] 1146 def execute_phase(self, 1147 phase: Phase, *, 1148 prior_results: Any = None, 1149 update_controls: Optional[carla.VehicleControl] = None): 1150 """ 1151 Attention: 1152 - **Use with care to avoid loops or recursions.** 1153 1154 - If a :py:class:`.Context` is available prefer using 1155 :py:meth:`ctx.agent.execute_phase <.LunaticAgent.execute_phase>` instead. 1156 1157 Helper function to execute a phase from within a rule, 1158 wrapper of :py:meth:`agents.lunatic_agent.LunaticAgent.execute_phase`. 1159 1160 Warns: 1161 ReferenceError: If the weak proxy pointing to the :py:class:`Context` object has been 1162 deleted. The phase will not be executed. 1163 1164 See Also: 1165 - :py:meth:`.LunaticAgent.execute_phase` 1166 """ 1167 try: 1168 self._ctx.agent.execute_phase(phase=phase, # pyright: ignore[reportOptionalMemberAccess] 1169 prior_results=prior_results, 1170 update_controls=update_controls) 1171 except ReferenceError: 1172 logger.error("ReferenceError in Rule.execute_phase. Weakproxy deleted.")
1173 1174 # ----------------------- 1175 # Evaluation functions 1176 # ----------------------- 1177 1178 @_use_temporary_config 1179 def evaluate(self, ctx: Context, 1180 overwrite: Optional[Dict[str, Any]] = None # pylint: ignore=unused-argument # noqa: ARG002 1181 ) -> Union[bool, Hashable, "Literal[RuleResult.NO_RESULT]"]: 1182 """ 1183 Note: 1184 This is an **interface** function of rules that is executed during :py:meth:`__call__`. 1185 This method does not check if a rule is applicable. 1186 i.e. if the rule is in the correct phase of if it py:meth:`is_ready` 1187 this is done in :py:meth:`__call__`. 1188 1189 Meta rules with children should overwrite this method and call 1190 :py:meth:`evaluate_children` from here. 1191 1192 Executes the :py:attr:`condition` function of the rule. 1193 The decorator automatically takes care of temporarily setting the :py:attr:`overwrite_settings`. 1194 1195 Parameters: 1196 ctx: The context object that is passed to the condition function. 1197 overwrite: A dictionary of settings that will overwrite the agent's setting for this Rule. 1198 Used by the :code:`@_use_temporary_config` decorator. 1199 1200 :meta private: 1201 """ 1202 self._ctx = proxy(ctx) # use with care and access over function 1203 return self.condition(ctx) # pyright: ignore[reportCallIssue] 1204
[docs] 1205 def evaluate_children(self, ctx: Context) -> "NoReturn": # pylint: disable=unused-argument 1206 """ 1207 Not implemented for this rule class. 1208 1209 Note: 1210 This is an **interface** function of meta rules 1211 that is executed during :py:meth:`__call__` to call further rules. 1212 """ 1213 raise NotImplementedError("This method should be implemented in a subclass")
1214
[docs] 1215 def __call__(self, 1216 ctx: Context, 1217 overwrite: Optional[Dict[str, Any]] = None, 1218 *, 1219 ignore_phase: bool = False, 1220 ignore_cooldown: bool = False) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]: 1221 """ 1222 1. First checks if the rule is *applicable*, i.e. is its :py:attr:`cooldown == 0 <cooldown>`, 1223 if not returns :py:attr:`NOT_APPLICABLE`. 1224 2. Afterwards evaluates the rules :py:meth:`condition` function. 1225 - if the result is not in :py:attr:`actions` returns :py:attr:`NOT_APPLICABLE`. 1226 - otherwise merges the :py:attr:`overwrite_settings` with the py:attr:`.Context.config` and executes the action. 1227 1228 Parameters: 1229 ctx: The context object that is passed to the condition function. 1230 overwrite: Extends :py:attr:`overwrite_settings` for this call only. Defaults to :code:`None`. 1231 ignore_phase: If :python:`True` the phase check is skipped. Defaults to :code:`False`. 1232 ignore_cooldown: If True the cooldown check is skipped. Defaults to :code:`False`. 1233 """ 1234 # Check phase 1235 assert ignore_phase or ctx.agent.current_phase in self.phases 1236 1237 if not self.is_ready() and not ignore_cooldown: 1238 return RuleResult.NOT_APPLICABLE 1239 if not ignore_phase and ctx.agent.current_phase not in self.phases: # NOTE: This is currently never False as checked in execute_phase and the agents dictionary. 1240 return RuleResult.NOT_APPLICABLE # not applicable for this phase 1241 1242 exception = None 1243 result = Rule.NO_RESULT 1244 try: 1245 result = self.evaluate(ctx, overwrite) 1246 except BaseException as e: 1247 exception = e 1248 else: 1249 ctx.evaluation_results[ctx.agent.current_phase] = result 1250 if result in self.actions: 1251 self.reset_cooldown() 1252 # Apply overwrite settings permanently 1253 ctx.config.merge_with(self.overwrite_settings) 1254 if overwrite: 1255 ctx.config.merge_with(overwrite) 1256 1257 action_result = self.actions[result](ctx) # todo allow priority, random chance # pyright: ignore[reportCallIssue] 1258 ctx.action_results[ctx.agent.current_phase] = action_result 1259 return action_result 1260 return RuleResult.NOT_APPLICABLE # No action was executed 1261 finally: 1262 self._ctx = None 1263 if exception: 1264 self.reset_cooldown() # 1265 raise exception
1266 1267 def __str__(self) -> str: 1268 try: 1269 if isinstance(self.condition, partial): 1270 return (self.__class__.__name__ 1271 + f"(description='{self.description}', " 1272 f"phases={self.phases}, group={self.group}, " 1273 f"priority={self.priority}, " 1274 f"actions={self.actions}, " 1275 f"condition={self.condition.func}, " # pyright: ignore[reportUnknownMemberType] 1276 f"cooldown={self.cooldown})" 1277 ) 1278 return (self.__class__.__name__ 1279 + f"(description='{self.description}', " 1280 f"phases={self.phases}, group={self.group}, " 1281 f"priority={self.priority}, " 1282 f"actions={self.actions}, " 1283 f"condition={self.condition.__name__}, " 1284 f"cooldown={self.cooldown})" 1285 ) 1286 except AttributeError as e: 1287 logger.info("Error during string creation: " + str(e)) 1288 return (self.__class__.__name__ 1289 + "(Error in condition.__str__: Rule has not been initialized correctly. " 1290 "Missing attributes: " + str(e) + ")") 1291 1292 def __repr__(self) -> str: 1293 return str(self)
1294 1295
[docs] 1296class MultiRule(Rule, metarule=True): 1297 """ 1298 This metarule allows to execute one or multiple rules if it is applicable. 1299 Depending on :py:attr:`execute_all_rules` it will either execute all rules or only the first 1300 applicable rule from its :py:attr:`rules` list. 1301 """ 1302 1303 rules: List[Rule] 1304 """The list of child rules to be called if this rule's condition is true.""" 1305 1306 def _wrap_action(self, action: Callable[[Context], Any]): 1307 """ 1308 Wrap the passed action. 1309 First the action is executed afterwards the child rules are evaluated. 1310 1311 Note: 1312 There is no extra condition that is checked between the two actions. 1313 """ 1314 @wraps(action) 1315 def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any: 1316 try: 1317 result = action(ctx, *args, **kwargs) 1318 except DoNotEvaluateChildRules as e: 1319 return e, None 1320 else: 1321 results = self.evaluate_children(ctx) # execute given rules as well 1322 return result, results 1323 return wrapper 1324 1325 if TYPE_CHECKING: 1326 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage,reportGeneralTypeIssues] 1327 rules: Required[List[Rule]] 1328 sort_rules: NotRequired[bool] 1329 execute_all_rules: NotRequired[bool] 1330 1331 @singledispatchmethod 1332 def __init__(self, 1333 phases: Union[Phase, Iterable[Phase]], 1334 #/, # phases must be positional; python3.8+ only 1335 rules: List[Rule], 1336 condition: Optional[ConditionFunctionLikeT] = None, 1337 *, 1338 description: str = "If its own condition is true calls the passed rules.", 1339 priority: RulePriority = RulePriority.NORMAL, 1340 sort_rules: bool = True, 1341 execute_all_rules: bool = False, 1342 action: Optional[Callable[[Context], Any]] = None, 1343 ignore_phase: bool = True, 1344 overwrite_settings: Optional[Dict[str, Any]] = None, 1345 self_config: Optional[Dict[str, Any]] = None, 1346 cooldown_reset_value: Optional[int] = None, 1347 group: Optional[str] = None, 1348 enabled: bool = True, 1349 ): 1350 """ 1351 Initializes a Rule object that can have further rules as children. 1352 1353 Args: 1354 phases (Union[Phase, Iterable]): The phase or phases in which the rule should be active. 1355 rules (List[Rule]): The list of child rules to be called if the rule's condition is true. 1356 condition (Callable[[Context]], optional): The condition that determines if the rules should be evaluated. Defaults to :py:func:`always_execute`. 1357 execute_all_rules (bool, optional): 1358 If False will only execute the first rule with a applicable condition, i.e. this MultiRule is like a node in a decision tree. 1359 If True all rules are evaluated, unless one raises a `DoNotEvaluateChildRules` exception. 1360 Defaults to :python:`False`. 1361 sort_rules (bool, optional): Flag indicating whether to sort the rules by priority. Defaults to :python:`True`. 1362 action (Callable[[Context]], optional): The action to be executed before the passed rules are evaluated. Defaults to :python:`None`. 1363 ignore_phase (bool, optional): Flag indicating whether to ignore the Phase of the passed child rules. Defaults to :python:`True`. 1364 overwrite_settings (Dict[str, Any], optional): Additional settings to overwrite the agent's settings. Defaults to :python:`None`. 1365 priority (RulePriority, optional): The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`. 1366 description (str, optional): The description of the rule. Defaults to :python:`"If its own rule is true calls the passed rules."`. 1367 group (str | None, optional): The group name of the rule. Defaults to :python:`None`. 1368 enabled (bool, optional): Flag indicating whether the rule is enabled after creation. Defaults to :python:`True`. 1369 """ 1370 self.ignore_phase = ignore_phase 1371 if rules is None: # type: ignore 1372 logger.warning("Warning: No rules passed to %s: %s. You can still add rules to the rules attribute later.", self.__class__.mro()[1].__name__, self.__class__.__name__) 1373 rules = [] 1374 if len(rules) == 0: 1375 # NOTE: This will be logged twice, once more by 1376 logger.warning("Rules list is empty. %s will always return NOT_APPLICABLE.", self.__class__.__name__) 1377 self.rules = rules 1378 self.execute_all_rules = execute_all_rules 1379 if sort_rules: 1380 self.rules.sort(key=lambda r: r.priority, reverse=True) 1381 # if an action is passed to be executed before the passed rules it is wrapped to execute both 1382 if action is not None: 1383 action = self._wrap_action(action) 1384 else: 1385 action = self.evaluate_children # will be called elsewhere 1386 if condition is None and not hasattr(self, "condition"): 1387 condition_arg = always_execute 1388 else: 1389 condition_arg = condition 1390 super().__init__(phases, 1391 condition=condition_arg, 1392 action=action, 1393 description=description, 1394 overwrite_settings=overwrite_settings, 1395 self_config=self_config, 1396 priority=priority, 1397 cooldown_reset_value=cooldown_reset_value, 1398 enabled=enabled, 1399 group=group) 1400 1401 @__init__.register(_CountdownRule) # For similar_rule = Rule(some_rule), easier cloning 1402 @__init__.register(type) # For @Rule class MyRule: ... 1403 def __init_by_decorating_class(self, cls: "MultiRule"): # pyright: ignore[reportUnusedFunction] 1404 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow both 1405 cooldown_reset_value = getattr(cls, "cooldown_reset_value", 1406 getattr(cls, "max_cooldown", None)) 1407 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided." 1408 self.__init__(phases, cls.rules, 1409 cls.condition, # type: ignore 1410 description=cls.description, 1411 overwrite_settings=getattr(cls, "overwrite_settings", None), 1412 self_config=getattr(cls, "self_config", None), 1413 priority=getattr(cls, "priority", RulePriority.NORMAL), 1414 cooldown_reset_value=cooldown_reset_value, group=getattr(cls, "group", None), 1415 enabled=getattr(cls, "enabled", True), 1416 sort_rules_by_priority=getattr(cls, "sort_rules_by_priority", True), 1417 execute_all_rules=getattr(cls, "execute_all_rules", False), 1418 prior_action=getattr(cls, "prior_action", None), 1419 ignore_phase=getattr(cls, "ignore_phase", True)) 1420 1421 @__init__.register(Mapping) 1422 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction] 1423 self.__init__(cls.get("phases", cls.get("phase")), 1424 cls["rules"], 1425 cls.get("condition"), 1426 description=cls.get("description", self.description), 1427 overwrite_settings=cls.get("overwrite_settings"), 1428 self_config=cls.get("self_config"), 1429 priority=cls.get("priority", RulePriority.NORMAL), 1430 cooldown_reset_value=cls.get("cooldown_reset_value"), 1431 group=cls.get("group"), 1432 enabled=cls.get("enabled", True), 1433 sort_rules_by_priority=cls.get("sort_rules_by_priority", True), 1434 execute_all_rules=cls.get("execute_all_rules", False), 1435 prior_action=cls.get("prior_action", None), 1436 ignore_phase=cls.get("ignore_phase", True)) 1437
[docs] 1438 def evaluate_children(self, ctx: Context) -> Union[List[Any], Any]: # pyright: ignore[reportIncompatibleMethodOverride] 1439 """ 1440 Evaluates the children rules of the current rule in the given context. 1441 1442 Args: 1443 ctx : The context in which the child rules are evaluated. 1444 1445 Returns: 1446 The results of evaluating the children rules. 1447 Returns a list of results if execute_all_rules is True, 1448 otherwise the result of the first rule that was applied. 1449 """ 1450 results: List[Any] = [] 1451 for rule in self.rules: 1452 try: 1453 result = rule(ctx, ignore_phase=self.ignore_phase) 1454 except DoNotEvaluateChildRules: 1455 return results 1456 if not self.execute_all_rules and result is not Rule.NOT_APPLICABLE: # one rule was applied end. 1457 return result 1458 results.append(result) 1459 if not self.execute_all_rules: 1460 return Rule.NOT_APPLICABLE # does not expect a list 1461 return results
1462 1463
[docs] 1464class RandomRule(MultiRule, metarule=True): 1465 """ 1466 A rule that selects and evaluates one or more random child rules from a set of rules. 1467 1468 Args: 1469 phases : The phase or phases in which the rule is applicable. 1470 rules : The set of rules from which to select random child rules. 1471 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`. 1472 condition : 1473 A callable that determines if the rule is applicable in a given context. 1474 If None and the rule does not implement a :py:attr:`condition` attribute the rule always executes. 1475 Defaults to :python:`None`. 1476 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`. 1477 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`. 1478 priority : The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`. 1479 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`. 1480 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`. 1481 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`. 1482 group : The group to which the rule belongs. Defaults to :python:`None`. 1483 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`. 1484 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`. 1485 1486 Raises: 1487 ValueError: When passing **rules** as a dict with weights, the **weights** argument must be None. 1488 """ 1489 1490 # TODO: add a dummy attribute for one additional weight, that skips the evaluation. Should only considered once. 1491 1492 if TYPE_CHECKING: 1493 class _InitParameters(MultiRule._InitParameters): # pyright: ignore[reportPrivateUsage] 1494 repeat_if_not_applicable: NotRequired[bool] 1495 weights: NotRequired[Optional[List[float]]] 1496 1497 @singledispatchmethod 1498 def __init__(self, 1499 phases: Union[Phase, Iterable[Phase]], # /, # phases must be positional; python3.8+ only 1500 rules: Union[Dict[Rule, float], List[Rule]], 1501 repeat_if_not_applicable: bool = True, 1502 condition: Optional[Callable[[Context], Any]] = None, 1503 *, 1504 action: Optional[Callable[[Context], Any]] = None, 1505 ignore_phase: bool = True, 1506 priority: RulePriority = RulePriority.NORMAL, 1507 description: str = "If its own condition is true calls one or more random child rules from the passed rules.", 1508 overwrite_settings: Optional[Dict[str, Any]] = None, 1509 self_config: Optional[Dict[str, Any]] = None, 1510 cooldown_reset_value: Optional[int] = None, 1511 group: Optional[str] = None, 1512 enabled: bool = True, 1513 weights: Optional[List[float]] = None 1514 ): 1515 """ 1516 Initializes a Rule object that can trigger one or more random child rules. 1517 1518 Args: 1519 phases : The phase or phases in which the rule is applicable. 1520 rules : The set of rules from which to select random child rules. 1521 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`. 1522 condition : 1523 A callable that determines if the rule is applicable in a given context. 1524 If None and the rule does not implement a `condition` attribute the rule always executes. 1525 Defaults to :python:`None`. 1526 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`. 1527 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`. 1528 priority : The priority of the rule. Defaults to :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`. 1529 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`. 1530 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`. 1531 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`. 1532 group : The group to which the rule belongs. Defaults to :python:`None`. 1533 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`. 1534 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`. 1535 """ 1536 if isinstance(rules, dict): 1537 if weights is not None: 1538 raise ValueError("When passing rules as a dict with weights, the weights argument must be None") 1539 self.weights = list(accumulate(rules.values())) # cumulative weights for random.choices are more efficient 1540 self.rules = list(rules.keys()) 1541 else: 1542 self.weights = weights or list(accumulate(r.priority for r in rules)) 1543 self.rules = rules 1544 self.repeat_if_not_applicable = repeat_if_not_applicable 1545 super().__init__(phases, rules, condition=condition, action=action, description=description, priority=priority, ignore_phase=ignore_phase, overwrite_settings=overwrite_settings, self_config=self_config, cooldown_reset_value=cooldown_reset_value, enabled=enabled, group=group) 1546 1547 @__init__.register(_CountdownRule) 1548 @__init__.register(type) 1549 def __init_by_decorating_class(self, cls: "RandomRule"): # pyright: ignore[reportUnusedFunction] 1550 phases = getattr(cls, "phases", getattr(cls, "phase", None)) 1551 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1552 1553 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided." 1554 self.__init__(phases, cls.rules, repeat_if_not_applicable=cls.repeat_if_not_applicable, 1555 condition=cls.condition, 1556 description=cls.description, 1557 overwrite_settings=getattr(cls, "overwrite_settings", None), 1558 self_config=getattr(cls, "self_config", None), 1559 priority=getattr(cls, "priority", RulePriority.NORMAL), 1560 cooldown_reset_value=cooldown_reset_value, group=getattr(cls, "group", None), 1561 enabled=getattr(cls, "enabled", True), 1562 prior_action=getattr(cls, "prior_action", None), 1563 ignore_phase=getattr(cls, "ignore_phase", True)) 1564 1565 @__init__.register(Mapping) 1566 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction] 1567 self.__init__(cls.get("phases", cls.get("phase")), 1568 cls["rules"], 1569 repeat_if_not_applicable=cls.get("repeat_if_not_applicable", True), 1570 condition=cls.get("condition"), 1571 description=cls.get("description", self.description), 1572 overwrite_settings=cls.get("overwrite_settings"), 1573 self_config=cls.get("self_config"), 1574 priority=cls.get("priority", RulePriority.NORMAL), 1575 cooldown_reset_value=cls.get("cooldown_reset_value"), 1576 group=cls.get("group"), 1577 enabled=cls.get("enabled", True), 1578 sort_rules_by_priority=cls.get("sort_rules_by_priority", True), 1579 execute_all_rules=cls.get("execute_all_rules", False), 1580 prior_action=cls.get("prior_action", None), 1581 ignore_phase=cls.get("ignore_phase", True)) 1582
[docs] 1583 def evaluate_children(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None) -> Any: 1584 """ 1585 Evaluate a random child rule. 1586 If `self.repeat_if_not_applicable=False` and the randomly chosen rule is not applicable, 1587 then no further rules are evaluated 1588 For `self.repeat_if_not_applicable=False` possible rules are evaluated in a random fashion 1589 until one rule was applicable. 1590 """ 1591 if len(self.rules) == 0: 1592 logger.warning("No rules to evaluate in %s. Returning NOT_APPLICABLE.", self.__class__.__name__) 1593 return RuleResult.NOT_APPLICABLE 1594 if self.repeat_if_not_applicable: 1595 rules = self.rules.copy() 1596 weights = self.weights.copy() 1597 else: 1598 rules = self.rules 1599 weights = self.weights 1600 1601 result = RuleResult.NOT_APPLICABLE 1602 while rules: 1603 rule = random.choices(rules, cum_weights=weights, k=1)[0] 1604 try: 1605 result = rule(ctx, overwrite, ignore_phase=self.ignore_phase) # NOTE: Context action/evaluation results only store result of LAST rule 1606 except DoNotEvaluateChildRules: 1607 return None 1608 if not self.repeat_if_not_applicable or result is not Rule.NOT_APPLICABLE: 1609 # break after the first rule of `self.repeat_if_not_applicable=False` 1610 # else break if it was applicable. 1611 break 1612 rules.remove(rule) 1613 weights = list(accumulate(r.priority for r in rules)) 1614 return result
1615 1616
[docs] 1617class BlockingRule(Rule, metarule=True): 1618 """ 1619 This meta rule allows to define rules that are able to takeover the agent's workflow and 1620 apply the :py:class:`.VehicleControl` directly from withhin the rule. 1621 1622 """ 1623 1624 _gameframework: ClassVar[Union["GameFramework", "CallableProxyType[GameFramework]", None]] = None 1625 """ 1626 Set when a :py:class:`GameFramework` is initialized. 1627 Alternatively can be set when any BlockingRule is created. 1628 """ 1629 1630 ticks_passed: int 1631 """Count how many ticks have been performed by this rule and blocked the agent.""" 1632 1633 MAX_TICKS = 5000 # 5000 * 1/20 = 250 seconds 1634 """ 1635 The amount of ticks that can be performed by this rule before it is automatically disabled. 1636 If the rule has looped for this amount of ticks if will then call :py:attr:`max_tick_callback` 1637 and raise an :py:exc:`.UnblockRuleException` afterwards. 1638 1639 As a hack :py:attr:`max_tick_callback` can change :py:attr:`ticks_passed` to prevent the 1640 exception and continue the rule. 1641 """ 1642 1643 max_tick_callback: Optional[Callable[[Self, Context], Any]] = None 1644 """ 1645 An optional callback that is executed when :py:attr:`ticks_passed` reaches :py:attr:`MAX_TICKS`. 1646 """ 1647 1648 if TYPE_CHECKING: 1649 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage] 1650 gameframework: Required[Optional[GameFramework]] 1651 1652 @singledispatchmethod 1653 def __init__(self, 1654 phases: Union[Phase, Iterable[Phase]], # iterable of Phases 1655 #/, # phases must be positional; python3.8+ only 1656 condition: Optional[ConditionFunctionLikeT] = None, 1657 action: Optional[Union[CallableAction[Self, []], 1658 Dict[Any, CallableAction[Self, []]]]] = None, 1659 false_action: Optional[CallableAction[Self, []]] = None, 1660 *, 1661 gameframework: Optional[GameFramework], 1662 actions: Optional[Dict[Any, CallableAction[Rule, []]]] = None, 1663 description: str = "What does this rule do?", 1664 overwrite_settings: Optional[Dict[str, Any]] = None, 1665 self_config: Optional[Dict[str, Any]] = None, 1666 priority: RulePriority = RulePriority.NORMAL, 1667 cooldown_reset_value: Optional[int] = None, 1668 group: Optional[str] = None, 1669 enabled: bool = True, 1670 ): 1671 super().__init__(phases, condition, action, false_action, 1672 actions=actions, 1673 description=description, overwrite_settings=overwrite_settings, 1674 self_config=self_config, 1675 priority=priority, 1676 cooldown_reset_value=cooldown_reset_value, 1677 group=group, 1678 enabled=enabled) 1679 if gameframework: 1680 BlockingRule._gameframework = gameframework 1681 if not GameFramework.clock or not GameFramework.display: 1682 # Not much we can do about it 1683 #logger.info("%s : GameFramework should be initialized before using this rule.", self.__class__.__name__) 1684 GameFramework.init_pygame() 1685 self.ticks_passed = 0 1686 1687 @__init__.register(_CountdownRule) 1688 @__init__.register(type) 1689 def __init_by_decorating_class(self, cls: "BlockingRule"): # pyright: ignore[reportUnusedFunction] 1690 """ 1691 Initialize by passing a Rule or class object to the __init__ method. 1692 1693 This allows the usage of the @Rule decorator and easy copying. 1694 """ 1695 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for both 1696 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None)) 1697 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided." 1698 self.__init__(phases, 1699 cls.condition, # type: ignore 1700 action=getattr(cls, "action", None), false_action=getattr(cls, "false_action", None), 1701 gameframework=getattr(cls, "gameframework", BlockingRule._gameframework), 1702 actions=getattr(cls, "actions", None), description=cls.description, 1703 overwrite_settings=getattr(cls, "overwrite_settings", None), 1704 self_config=getattr(cls, "self_config", None), 1705 priority=getattr(cls, "priority", RulePriority.NORMAL), 1706 cooldown_reset_value=cooldown_reset_value, 1707 group=getattr(cls, "group", None), enabled=getattr(cls, "enabled", True)) 1708 1709 @__init__.register(Mapping) 1710 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction] 1711 super().__init__(cls.get("phases", cls.get("phase")), **cls) 1712 1713 def _render_everything(self, ctx: Context): 1714 if self._gameframework: 1715 self._gameframework.render_everything() 1716 else: 1717 world_model = ctx.agent._world_model # pyright: ignore[reportPrivateUsage] 1718 display = GameFramework.display 1719 world_model.tick(GameFramework.clock) # does not tick the world! 1720 world_model.render(display, finalize=False) 1721 try: 1722 world_model.controller.render(display) # type: ignore[attr-defined] # noqa: SIM105 1723 except AttributeError: 1724 pass 1725 1726 dm_render_conf = world_model._args.camera.hud.detection_matrix # pyright: ignore[reportPrivateUsage] 1727 if dm_render_conf and ctx.agent: 1728 ctx.agent.render_detection_matrix(display, **dm_render_conf) # pyright: ignore[reportPrivateUsage] 1729 world_model.finalize_render(display) 1730 pygame.display.flip() 1731 1732 @overload 1733 def loop_agent(self, ctx: Context, control: Optional[carla.VehicleControl] = None, 1734 *, execute_planner: Literal[True], execute_phases: Any) -> carla.VehicleControl: 1735 ... 1736 1737 @overload 1738 def loop_agent(self, ctx: Context, control: Optional[carla.VehicleControl] = None, 1739 *, execute_planner: Literal[False], execute_phases: Any) -> None: 1740 ... 1741
[docs] 1742 def loop_agent(self, ctx: Context, control: Optional[carla.VehicleControl] = None, 1743 *, execute_planner: bool, execute_phases: Any = True) -> "carla.VehicleControl | None": 1744 """ 1745 A combination of `LunaticAgent.parse_keyboard_input`, `LunaticAgent.apply_control`, `BlockingRule.update_world`, 1746 and `Context.get_or_calculate_control` to advance agent and world. 1747 1748 Args: 1749 ctx (Context): The current context object 1750 control (Optional[carla.VehicleControl], optional): The control to apply; will overwrite the context's control. 1751 If None takes the context's control. 1752 Defaults to :python:`None`. 1753 1754 See Also: 1755 Executes the following methods: 1756 1757 1. :py:meth:`.LunaticAgent.parse_keyboard_input` 1758 2. :py:meth:`.LunaticAgent.apply_control` 1759 3. :py:meth:`.BlockingRule.update_world` ticks the :py:class:`carla.World` and renders everything. 1760 4. :py:meth:`.Context.get_or_calculate_control` to acquire the :py:class:`carla.VehicleControl` object. 1761 """ 1762 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent 1763 ctx.agent.apply_control(control) 1764 1765 # NOTE: This ticks the world forward by one step 1766 # The ctx.control is reset to None; execute_plan 1767 # > Phase.UPDATE_INFORMATION | Phase.BEGIN 1768 self.update_world(ctx, execute_phases=execute_phases) 1769 if execute_planner: 1770 return ctx.get_or_calculate_control() 1771 return None
1772
[docs] 1773 @staticmethod 1774 def get_world() -> carla.World: 1775 """Method to access the world object""" 1776 return CarlaDataProvider.get_world()
1777 1778 def _begin_tick(self, ctx: Context): 1779 self._gameframework.clock.tick() # self.args.fps) # type: ignore[attr-defined] 1780 frame = None 1781 if self._gameframework and self._gameframework._args.handle_ticks: # i.e. no scenario runner doing it for us 1782 if CarlaDataProvider.is_sync_mode(): 1783 frame = self.get_world().tick() 1784 else: 1785 frame = self.get_world().wait_for_tick().frame 1786 CarlaDataProvider.on_carla_tick() 1787 else: 1788 # CRITICAL: The framework expects a return value before it ticks the world, 1789 # however the blocking rules should take over the ticks; so it should still do this! 1790 # TODO: Should implement some return <-> pass trough mechanism; maybe implement a 1791 # generator-like variant of the rule. 1792 if CarlaDataProvider.is_sync_mode(): 1793 frame = self.get_world().tick() 1794 else: 1795 frame = self.get_world().wait_for_tick().frame 1796 CarlaDataProvider.on_carla_tick() 1797 1798 if CarlaDataProvider.is_sync_mode(): 1799 # We do this only in sync mode as frames could pass between gathering this information 1800 # and an agent calling InformationManager.tick(), which in turn calls global_tick 1801 # with possibly a DIFFERENT frame wasting computation. 1802 if frame is None: 1803 frame = self.get_world().get_snapshot().frame 1804 InformationManager.global_tick(frame) 1805 1806 # Tick world and render everything 1807 # control should be reset, we are at the "start of the tick again" 1808 ctx.set_control(None) 1809 self.ticks_passed += 1 1810
[docs] 1811 def update_world(self, ctx: Context, *, execute_phases: Union[bool, Container[Phase]] = True) -> "carla.VehicleControl | None": 1812 """ 1813 Ticks the world and takes care of the rendering. 1814 1815 Will call 1816 - :python:`ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=<this Rule instance>)` 1817 - :py:meth:`.LunaticAgent.update_information`, with or without executing the phases, depending on **execute_phases**. 1818 1819 Args: 1820 ctx: The context to use 1821 execute_update_information: 1822 Whether to execute the :python:`Phase.UPDATE_INFORMATION | Phase.[BEGIN|END]` with this function. 1823 Can also be a container containing one of both of these phases. 1824 Defaults to :python:`True`. 1825 1826 Raises: 1827 UnblockRuleException: If the ticks passed are over :py:attr:`MAX_TICKS` 1828 1829 Attention: 1830 The usage with Leaderboard_ is working but experimental. 1831 The scenario expects the agent to return a control object in every step, however as this rule 1832 takes over the ticks completely an outside ScenarioManager might not work as expected. 1833 """ 1834 self._begin_tick(ctx) 1835 ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=self) 1836 1837 # Update the agent's information 1838 if execute_phases and (execute_phases is True or Phase.UPDATE_INFORMATION | Phase.BEGIN in execute_phases): 1839 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.BEGIN, prior_results=self) 1840 ctx.agent._update_information() 1841 if execute_phases and Phase.UPDATE_INFORMATION | Phase.END not in self.phases: 1842 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.END, prior_results=self) 1843 if self.ticks_passed > self.MAX_TICKS: 1844 logger.info("Rule %s has passed its max_ticks %s, calling max_tick_callback and unblocking it", self, self.MAX_TICKS) 1845 if self.max_tick_callback: 1846 self.max_tick_callback(ctx) 1847 if self.ticks_passed > self.MAX_TICKS: 1848 raise UnblockRuleException 1849 # max_tick_callback can override the ticks passed 1850 else: 1851 raise UnblockRuleException 1852 1853 self._render_everything(ctx)
1854
[docs] 1855 def __call__(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None, in_loop: bool = False, 1856 *, # Kwargs from Rule.__call__ 1857 ignore_phase: bool = False, 1858 ignore_cooldown: bool = False) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]: 1859 if not in_loop: 1860 self.ticks_passed = 0 1861 else: 1862 ignore_phase = True 1863 ignore_cooldown = True 1864 if self in ctx.agent._active_blocking_rules: 1865 logger.warning("Rule %s is already blocking the agent, this is a recursive call. Not executing the rule.", self) 1866 return Rule.NOT_APPLICABLE 1867 try: 1868 return super().__call__(ctx, overwrite, ignore_phase=ignore_phase, ignore_cooldown=ignore_cooldown) 1869 except UnblockRuleException as e: 1870 return e.result 1871 finally: 1872 ctx.agent._active_blocking_rules.discard(self) # pyright: ignore[reportPrivateUsage]
1873 1874 def evaluate(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None) -> Union[bool, Hashable, Literal[RuleResult.NO_RESULT]]: 1875 result = super().evaluate(ctx, overwrite) 1876 if result in self.actions: 1877 ctx.agent._active_blocking_rules.add(self) # pyright: ignore[reportPrivateUsage] 1878 return result
1879 1880# Provide necessary imports for the evaluation_function module and prevents circular imports 1881 1882import classes.evaluation_function as __evaluation_function # noqa 1883__evaluation_function.Rule = Rule 1884__evaluation_function.Context = Context 1885del __evaluation_function