Source code for classes.evaluation_function

  1# pyright: strict
  2# pyright: reportInconsistentConstructor=information
  3# pyright: reportGeneralTypeIssues=warning
  4# pyright: reportAttributeAccessIssue=warning
  5
  6from __future__ import annotations
  7
  8import collections.abc
  9import inspect
 10import typing
 11from functools import partial, update_wrapper, wraps
 12from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Generic, Hashable, Optional, Union, cast
 13
 14from typing_extensions import Annotated, Concatenate, Literal, Never, ParamSpec, Self, TypeGuard, TypeVar, overload
 15
 16from classes.constants import READTHEDOCS
 17from classes.type_protocols import (
 18    AnyCallableAction,
 19    AnyCallableCondition,
 20    CallableAction,
 21    CallableCondition,
 22    CallableConditionT,
 23    CallableT,
 24    RuleT,
 25)
 26from launch_tools import singledispatchmethod
 27
 28if TYPE_CHECKING:
 29    from functools import _Wrapped  # type: ignore
 30    from typing import NoReturn
 31
 32    # NOTE: to prevent this circular import when classes.rule are imported Rule and Context are set accordingly for this module
 33    from classes.rule import Context, Rule  # circular import  # noqa: TC004,RUF100
 34
 35
 36_T = TypeVar("_T")
 37_H = TypeVar("_H", bound=Hashable)  # Free
 38_CH = TypeVar("_CH", bound=Hashable)  # Generic of ConditionFunction
 39
 40_P = ParamSpec("_P")  # Free, e.g. for action function.
 41_CP = ParamSpec("_CP")  # Generic of ConditionFunction
 42
 43
 44if TYPE_CHECKING:
 45    # Condition.functions might be wrapped, this is reminder.
 46    _W = Union[
 47        _Wrapped[Concatenate["Rule", "Context", ...], Any, Concatenate["Rule", "Context", ...], Any],
 48        _Wrapped[Concatenate["Context", ...], Any, Concatenate["Context", ...], Any],
 49    ]
 50    _ActionsDictValues = Union[AnyCallableAction, _W]
 51else:
 52    _ActionsDictValues = AnyCallableAction
 53
 54
[docs] 55class ConditionFunction(Generic[_CP, _CH]): 56 """ 57 Implements a decorator to wrap function to be used with :any:`Rule` classes. 58 The function must return a hashable type, which is used to access the action to be taken 59 by the :any:`Rule.condition` function of the rule. 60 61 Evaluation functions can be combined using the AND, OR and NOT operators to build up more complex rules 62 from simpler ones. 63 The operators :code:`+, &`, :code:`|` and :code:`~` are aliases for :code:`AND, OR` and :code:`NOT` respectively. 64 e.g. with these two functions: 65 :python:`func1 = ConditionFunction(lambda ctx: ctx.speed > 10)` 66 :python:`func2 = ConditionFunction(lambda ctx: ctx.speed < 20)` 67 68 These statements are all equivalent: 69 * :python:`ConditionFunction(lambda ctx: 10 < ctx.speed < 20)` 70 * :python:`func1 + func2` 71 * :python:`func1 & func2` 72 * :python:`func1.AND(func2)` 73 * :python:`ConditionFunction.AND(func1, func2)` 74 75 Hint: 76 ConditionFunctions also allow for more specific returns types 77 78 Example: 79 80 .. code-block:: python 81 82 @ConditionFunction 83 def is_speeding(ctx: Context) -> Hashable: 84 config = ctx.agent.config 85 if config.speed > config.speed_limit+20: 86 return "very fast" 87 elif config.speed > config.speed_limit+5: 88 return "fast" 89 elif: config.speed < config.speed_limit-20: 90 return "very slow" 91 else: 92 return "normal" 93 94 Rule(is_speeding, 95 action={ 96 "very fast": lambda ctx: ctx.agent.config.follow_speed_limits(), 97 "fast" : lambda ctx: ctx.agent.config.set_target_speed(ctx.speed_limit+5) 98 }) 99 100 Parameters: 101 first_argument : 102 If :code:`None` or a string, the class will create a :term:`decorator` 103 that expects a callable as the first argument. 104 If a string is passed it substitutes as the **name** argument. 105 Otherwise a callable is expected like in the snippet used above. 106 name : The name to represent the function. Defaults to :python:`"ConditionFunction"`. 107 truthy : If True, the function will always cast the return value to a boolean value. Defaults to :code:`False`. 108 use_self : If :python:`True`, the function will be treated as a method and the first argument will be the instance of the :py:class:`.Rule` that uses this function. 109 If :code:`None`, the decision depends on the signature of the function, if it has only one argument only the :py:class:`.Context` object is passed, 110 if it has two or more arguments the first argument that is passed is the instance of the :py:class:`.Rule`. 111 Use :code:`False` to not use the instance of the :py:class:`.Rule` as the first argument. 112 Defaults to :code:`None`. 113 114 Returns: 115 ConditionFunction | partial[ConditionFunction] : 116 A :py:class:`ConditionFunction` or a partially initialized version to be used as a decorator 117 when the **first_argument** is not a callable. 118 119 Generics: 120 - _Rule : 121 Generic :py:class:`.Rule` type that could appear in the 122 :py:attr:`evaluation_function's <evaluation_function>` signature. 123 - _CP : :py:class:`typing.ParamSpec` of the passed :py:attr:`evaluation_function`. 124 - _CH : The :term:`Hashable` return type of the :py:attr:`evaluation_function`. 125 """ 126 127 _default_actions: ClassVar[Dict[Hashable, _ActionsDictValues]] = {} 128 """ 129 Default values for the :py:attr:`actions` dictionary. Could be used for subclassing. 130 By default :py:attr:`actions` is a shallow copy of this one. 131 """ 132 133 actions: Dict[Hashable, _ActionsDictValues] 134 """ 135 Mapping of return values to actions to be executed. 136 If this dictionary is not empty it will be used as the :py:attr:`.Rule.actions` dictionary. 137 """ 138 139 if READTHEDOCS and not TYPE_CHECKING: 140 actions: dict[Hashable, CallableActionT] = {} # noqa: F821 141 142 _INVALID_NAMES: ClassVar[set[str]] = {"action", "actions", "false_action"} 143 """Forbidden names for action functions.""" 144 145 @overload 146 def __new__( 147 cls, 148 first_argument: Optional[Annotated[str, "name"]] = None, 149 name: str = "ConditionFunction", 150 *, 151 truthy: Literal[False] = False, 152 use_self: Optional[bool] = None, 153 ) -> Callable[[CallableCondition[RuleT, _P, _H]], ConditionFunction[_P, _H]]: ... 154 155 @overload 156 def __new__( 157 cls, 158 first_argument: Optional[Annotated[str, "name"]] = None, 159 name: str = "ConditionFunction", 160 *, 161 truthy: Literal[True], 162 use_self: Optional[bool] = None, 163 ) -> Callable[[CallableCondition[RuleT, _P, Any]], ConditionFunction[_P, bool]]: ... 164 165 @overload 166 def __new__( 167 cls, 168 first_argument: CallableCondition[RuleT, _CP, _CH], 169 name: str = "ConditionFunction", 170 *, 171 truthy: bool = False, 172 use_self: Optional[bool] = None, 173 ) -> Self: ... 174 175 def __new__( 176 cls, 177 first_argument: Optional[Annotated[str, "name"] | CallableCondition[RuleT, _CP, _CH]] = None, 178 name: str = "ConditionFunction", 179 *, 180 truthy: bool = False, 181 use_self: Optional[bool] = None, 182 ) -> ( 183 Callable[[CallableCondition[RuleT, _P, _H]], ConditionFunction[_P, _H]] # default decorator 184 | Callable[[CallableCondition[RuleT, _P, Any]], ConditionFunction[_P, bool]] # truthy=True 185 | Self # function way 186 ): 187 # example usage: @ConditionFunction("name") 188 if isinstance(first_argument, str): 189 # Calling decorator with a string @ConditionFunction("name") 190 assert name == "ConditionFunction", "The `name` argument must be the default." 191 return partial(cls, name=first_argument, truthy=truthy, use_self=use_self) # type: ignore[return-value] 192 193 # example_usage: @ConditionFunction(name="name") or @ConditionFunction(truthy=True) 194 if first_argument is None: 195 return partial(cls, name=name, truthy=truthy, use_self=use_self) # type: ignore[return-value] 196 assert isinstance(first_argument, (Callable, staticmethod)), ( 197 f"First argument must be a callable, not {type(first_argument)}" 198 ) 199 # @ConditionFunction or ConditionFunction(function) 200 return super().__new__(cls) # new instance 201 202 if READTHEDOCS and not TYPE_CHECKING: 203 __new__.__annotations__["first_argument"] = Optional["str(name)" | AnyCallableCondition] # noqa: F821, TC010 204 205 def __init__( 206 self, 207 evaluation_function: CallableCondition[RuleT, _CP, _CH], 208 name: str = "ConditionFunction", 209 *, 210 truthy: bool = False, 211 use_self: Optional[bool] = None, 212 ): 213 update_wrapper( 214 self, 215 evaluation_function, # pyright: ignore[reportArgumentType] 216 assigned=("__qualname__", "__module__", "__annotations__", "__doc__"), 217 ) 218 if isinstance(evaluation_function, staticmethod): 219 evaluation_function = evaluation_function.__func__ # type: ignore 220 221 self.evaluation_function = evaluation_function 222 """ 223 The function that is wrapped by the :py:class:`ConditionFunction`. 224 Uses the generic type hints :py:obj:`_CP`, :py:obj:`_CH` of the class. 225 """ 226 if READTHEDOCS and not TYPE_CHECKING: 227 self.evaluation_function: CallableConditionT 228 229 self.truthy: bool = truthy 230 if name != "ConditionFunction": 231 self.name = name 232 elif hasattr(evaluation_function, "__name__"): 233 self.name = evaluation_function.__name__ 234 else: 235 self.name = str(evaluation_function) 236 self.use_self: bool | None = use_self 237 self.actions = self._default_actions.copy() 238 239 # Possible Pyright <1.1.377 : bug when this is active 240 @overload 241 def __call__(self, ctx: "Rule", _: "Context", *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ... 242 243 @overload 244 def __call__(self, ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ... 245 246 @overload 247 def __call__(self, ctx: Union["Rule", "Context"], *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ... 248 249 def __call__(self, ctx: Union["Rule", "Context"], *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: # pyright: ignore[reportInconsistentOverload] 250 """ 251 Note: 252 To handle the method vs. function difference depending on __get__ 253 `ctx` can be either a Context (condition as function) or a Rule (condition as method), 254 In the method case the real Context object is args[0]! 255 """ 256 try: 257 rule_result = self.evaluation_function(ctx, *args, **kwargs) # type: ignore[arg-type] 258 except Exception: 259 print(f"ERROR: in Rule {self.name} with function {self.evaluation_function}") 260 raise 261 # Handle function vs. method 262 if not isinstance(ctx, Context): 263 ctx = args[0] # type: ignore 264 assert isinstance(ctx, Context), ( 265 f"This should not happen: In the method case the argument must be a Context object, not {type(ctx)}" 266 ) 267 ctx.rule_result = rule_result 268 # if self.truthy: 269 # return bool(rule_result) # type: ignore 270 # assert isinstance(rule_result, Hashable), f"evaluation_function must return a hashable type, not {type(rule_result)}" # type: ignore 271 return rule_result 272 273 @overload 274 def __get__(self, instance: "Rule", objtype: Any = None) -> "Self": ... 275 276 @overload 277 def __get__(self, instance: None, objtype: Optional["type[Rule]"]) -> "partial[_CH]": ... 278 279 @overload 280 def __get__(self, instance: None, objtype: "type[Rule]") -> "partial[_CH]": ... 281 282 def __get__(self, instance: "Optional[Rule]", objtype: Optional["type[Rule]"] = None) -> "Self | partial[_CH]": 283 """ 284 :term:`Descriptor Protocol <descriptor>`, for in class usage like Rule.condition 285 """ 286 # NOTE: instance.condition is not an ConditionFunction, it is a partial of one. 287 if instance is None: 288 return self # called on class Rule.condition 289 return partial( 290 self, instance 291 ) # NOTE: This fixes "ctx" to instance in __call__, the real "ctx" in __call__ is provided through *args 292
[docs] 293 def copy(self, copy_actions: bool = False): 294 """ 295 Copies the class by creating a new instance. 296 297 Parameters: 298 copy_actions: If :python:`True`, the :py:attr:`.ConditionFunction.actions` 299 dictionary is copied as well. 300 Defaults to :code:`False`. 301 302 Returns: 303 ConditionFunction: A new instance, with the same :python:`__init__` arguments. 304 305 Warning: 306 Be aware that when using **copy_actions** the actions themselves are not copied; 307 they are identical and shared. 308 """ 309 instance = super().__new__(self.__class__) 310 self.__class__.__init__( 311 instance, self.evaluation_function, self.name, truthy=self.truthy, use_self=self.use_self 312 ) 313 if copy_actions: 314 instance.actions = self.actions.copy() 315 return instance
316 317 # Helpers to extract a useful string representation of the function 318 @staticmethod 319 def _complete_func_to_string(func: Callable[..., Any]) -> str: 320 func_lines = inspect.getsourcelines(func)[0] 321 return "".join(func_lines).strip() # as string 322 323 @staticmethod 324 def _func_to_string(func: Callable[..., Any]) -> str: 325 if not hasattr(func, "__name__"): 326 return str(func) 327 if func.__name__ == "<lambda>": 328 return ConditionFunction._complete_func_to_string(func) 329 return func.__name__ 330 331 @staticmethod 332 def _is_partial_action(action_function: Callable[_P, _T]) -> TypeGuard["partial[_T] | Callable[_P, _T]"]: 333 return isinstance(action_function, partial) 334 335 @overload 336 def _check_action( 337 self, action_function: CallableT, key: Hashable, *, use_self: Optional[bool] = None 338 ) -> CallableT: ... 339 340 @overload 341 def _check_action( 342 self, 343 action_function: Callable[_P, _T], 344 key: Hashable, 345 *, 346 use_self: Optional[bool] = None, 347 **preset_kwargs: _P.kwargs, 348 ) -> "_Wrapped[_P, _T, _P, _T]": ... 349 350 def _check_action( 351 self, 352 action_function: Callable[_P, _T], 353 key: Hashable, 354 *, 355 use_self: Optional[bool] = None, 356 **preset_kwargs: _P.kwargs, 357 ) -> "Callable[_P, _T] | _Wrapped[_P, _T, _P, _T]": 358 """ 359 Checks if an action has an invalid name, a key is already registered. 360 If kwargs are passed these will be fixed for the returned action. 361 362 Parameters: 363 action_function: The action function to be checked. 364 key: The key to be checked if it is already registered. 365 use_self: If not :code:`None`, a `use_self` attribute is set on the function to indicate 366 if the function is a method or not. 367 preset_kwargs: The kwargs to be preset. 368 """ 369 if action_function.__name__ in ConditionFunction._INVALID_NAMES: 370 msg = ( 371 "When using ConditionFunction.add_action, the action function's name " 372 f"may not be in {ConditionFunction._INVALID_NAMES}, got '{action_function.__name__}'." 373 ) 374 raise ValueError(msg) 375 if key in self.actions: 376 print( 377 "Warning: Overwriting already registered action", 378 self.actions[key], 379 "with key", 380 f"'{key}'", 381 "in", 382 self.name, 383 ) 384 if hasattr(action_function, "use_self") and action_function.use_self is not use_self: # pyright: ignore[reportFunctionMemberAccess] 385 print( 386 "Warning: Registering action:", 387 action_function.__name__, 388 "with a different use_self value than before. old:", 389 action_function.use_self, # pyright: ignore[reportFunctionMemberAccess] 390 "new:", 391 use_self, 392 ) 393 if not preset_kwargs: 394 # NOTE: This does not set in on a wrapper, if the function is used multiple times 395 # the value might be overwritten and above warning is shown. 396 if use_self is not None: 397 action_function.use_self = use_self # type: ignore[attr-defined] 398 return action_function # return original function 399 400 # Preset kwargs. 401 @wraps(action_function) 402 def action_function_with_kwargs(*args: _P.args, **kwargs: _P.kwargs) -> _T: 403 # inserts args when called and kwargs from check action 404 # allows overrides 405 return action_function(*args, **preset_kwargs, **kwargs) # type: ignore 406 407 if use_self is not None: 408 action_function_with_kwargs.use_self = use_self # type: ignore[attr-defined] 409 return action_function_with_kwargs 410 411 if TYPE_CHECKING: 412 413 class _RegisterActionDecorator: 414 """ 415 Helper that describes the return type of :py:meth:`.ConditionFunction.register_action`. 416 """ 417 418 @overload 419 def __call__( 420 self, func: Callable[Concatenate[RuleT, Context, ...], _T] 421 ) -> Callable[[RuleT, Context], _T]: ... 422 423 @overload 424 def __call__(self, func: Callable[Concatenate[Context, ...], _T]) -> Callable[[Context], _T]: ... 425 426 def __call__(self, func: CallableAction[RuleT, _P, _T]) -> CallableAction[RuleT, [], _T]: ... 427
[docs] 428 @singledispatchmethod 429 def register_action( 430 self, key: typing.Hashable = True, *, use_self: Optional[bool] = None, **kwargs: _P.kwargs 431 ) -> "_RegisterActionDecorator": 432 """ 433 Add an action to be executed when the condition function returns a specific value. 434 435 This function can be used as a decorator or as a method: 436 437 .. code-block:: python 438 439 # As decorator 440 @ConditionFunction 441 def is_speeding(ctx: Context) -> Literal["very fast", "fast", "normal", "slow", True]: 442 ... 443 444 @is_speeding.register_action(key="very fast") 445 # or 446 @is_speeding.register_action # default key is True 447 def very_fast_action(ctx: Context): 448 ctx.agent.set_target_speed(ctx.config.target_speed-5) 449 450 # Or as function 451 def fast_action(ctx: Context, speed: int): 452 ctx.agent.set_target_speed(speed) 453 454 is_speeding.register_action("fast", fast_action) 455 456 # Custom keywords without Rule instance 457 is_speeding.register_action("normal", 458 use_self=False, # no Rule in the signature 459 speed=ctx.config.target_speed) # keyword argument 460 def custom_speed(ctx: Context, speed: int): 461 ctx.agent.set_target_speed(speed) 462 463 Parameters: 464 key: If the condition function returns this value, this action will be executed. 465 Defaults to :python:`True`. 466 action_function: (When not used as decorator) The action to be executed. 467 use_self: If not :code:`None`, a :code:`use_self` attribute is set on the function to indicate 468 if the function is a method or not. 469 kwargs : Keyword arguments for the decorated function. 470 471 Returns: 472 Decorator to be used or the decorated function as ConditionFunction. 473 474 Note: 475 Only one action is allowed per key. If an action is already registered for the key, 476 it will be overwritten. 477 """ 478 479 def decorator(action_function: AnyCallableAction) -> AnyCallableAction: 480 checked_action = self._check_action(action_function, key, use_self=use_self, **kwargs) 481 self.actions[key] = checked_action 482 return checked_action 483 484 return cast("ConditionFunction._RegisterActionDecorator", decorator)
485 486 # no correct overload when dispatching on a callable 487 @register_action.register(collections.abc.Callable) # type: ignore 488 def _register_action_directly( 489 self, action_function: CallableT, key: Hashable = True, *, use_self: Optional[bool] = None, **kwargs: _P.kwargs 490 ) -> CallableT: 491 """ 492 - Case 1 493 @register_action # key is True 494 def action_function(ctx: Context): 495 ... 496 497 - Case 2 498 register_action(action_function, key="very fast", **action_function_kwargs) 499 """ 500 checked_action = self._check_action(action_function, key, use_self=use_self, **kwargs) 501 # Case 1 and 2 (without kwargs): checked_action is the action function, else wrapped. 502 self.actions[key] = checked_action # register action 503 return checked_action 504 505 # ---------------------- 506 507 if not TYPE_CHECKING: 508 # This is shadowed as pyright interprets self.__class__.__name__ as this function 509 @property 510 def __name__(self) -> str: 511 """ 512 Returns the name of the function. 513 """ 514 return self.name 515 516 else: 517 __name__: str 518 519 def __str__(self): 520 return self.name 521 522 def __repr__(self) -> str: 523 if self.name == "ConditionFunction": 524 s = self.__class__.__name__ + f"({self.evaluation_function}" 525 else: 526 s = self.__class__.__name__ + f'(name="{self.name}", evaluation_function={self.evaluation_function}' 527 if self.truthy: 528 s += ", truthy=True" 529 s += ")" 530 return s 531
[docs] 532 @classmethod 533 def AND( 534 cls, func1: "ConditionFunction[_CP, _CH]", func2: "ConditionFunction[_CP, _H]" 535 ) -> "ConditionFunction[_CP, _CH | _H]": 536 """Combine two functions with :python:`and`, i.e. to return True if both return True.""" 537 538 # NOTE: arguments are, lost. For type hints need to make ConditionFunction generic 539 def combined_func(ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs): 540 return func1(ctx, *args, **kwargs) and func2(ctx, *args, **kwargs) 541 542 return cls(combined_func, name=f"{func1.name}_and_{func2.name}") # type: ignore
543
[docs] 544 @classmethod 545 def OR( 546 cls, func1: "ConditionFunction[_CP, _CH]", func2: "ConditionFunction[_CP, _H]" 547 ) -> "ConditionFunction[_CP, _CH | _H]": 548 """Combine two functions to return True if either returns True.""" 549 550 def combined_func(ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs): 551 return func1(ctx, *args, **kwargs) or func2(ctx, *args, **kwargs) 552 553 return cls(combined_func, name=f"{func1.name}_or_{func2.name}") # type: ignore
554
[docs] 555 @classmethod 556 def NOT(cls, func: "ConditionFunction[_CP, _CH]") -> "ConditionFunction[_CP, bool]": 557 """Invert the return value of a function.""" 558 559 def combined_func(ctx: "Context | Rule", *args: _CP.args, **kwargs: _CP.kwargs): 560 return not func(ctx, *args, **kwargs) 561 562 return cls(combined_func, name=f"not_{func.name}") # type: ignore
563
[docs] 564 def __add__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 565 """Combine with another function using :py:meth:`AND`.""" 566 return self.AND(self, other)
567
[docs] 568 def __and__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 569 """Combine with another function using :py:meth:`AND`.""" 570 return self.AND(self, other)
571
[docs] 572 def __or__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 573 """Combine with another function using :py:meth:`OR`.""" 574 return self.OR(self, other)
575
[docs] 576 def __invert__(self) -> "ConditionFunction[_CP, bool]": 577 """ 578 Invert the return value of the function. 579 """ 580 return self.NOT(self)
581 582 583class ActionFunction(ConditionFunction[_P, _T]): 584 """ 585 A decorator that can be used with :any:`Rule.action`. It is nearly equivalent to :any:`ConditionFunction`, 586 only calling the function is more simple, i.e. does not assert a Hashable return type. 587 588 .. deprecated:: 589 Will likely be removed as no strong use case. 590 591 :meta private: 592 """ 593 594 def __init__( 595 self, 596 action_function: CallableAction[RuleT, _P, _T], 597 name: str = "ActionFunction", 598 *, 599 use_self: Optional[bool] = None, 600 ): 601 super().__init__(action_function, name, use_self=use_self) 602 603 @classmethod 604 def NOT(cls, _: Never) -> "NoReturn": # type: ignore 605 """ 606 Raises: 607 NotImplementedError: NOT is not implemented for ActionFunction. 608 """ 609 raise NotImplementedError("NOT is not implemented for ActionFunction") 610 611 def __call__(self, ctx: "Rule | Context", *args: _P.args, **kwargs: _P.kwargs) -> _T: # type: ignore 612 return self.evaluation_function(ctx, *args, **kwargs) # type: ignore[arg-type]