Source code for classes.evaluation_function

  1# pyright: strict
  2# pyright: reportInconsistentConstructor=information
  3# pyright: reportGeneralTypeIssues=warning
  4# pyright: reportAttributeAccessIssue=warning
  5
  6from __future__ import annotations
  7
  8import collections.abc
  9import inspect
 10import typing
 11from functools import partial, update_wrapper, wraps
 12from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Generic, Hashable, Optional, Union, cast
 13
 14from typing_extensions import Annotated, Concatenate, Literal, Never, ParamSpec, Self, TypeGuard, TypeVar, overload
 15
 16from classes.constants import READTHEDOCS
 17from classes.type_protocols import (
 18    AnyCallableAction,
 19    AnyCallableCondition,
 20    CallableAction,
 21    CallableCondition,
 22    CallableConditionT,
 23    CallableT,
 24    RuleT,
 25)
 26from launch_tools import singledispatchmethod
 27
 28if TYPE_CHECKING:
 29    from functools import _Wrapped  # type: ignore
 30    from typing import NoReturn
 31
 32    # NOTE: to prevent this circular import when classes.rule are imported Rule and Context are set accordingly for this module
 33    from classes.rule import Context, Rule  # circular import  # noqa: TCH004,RUF100
 34
 35
 36_T = TypeVar("_T")
 37_H = TypeVar("_H", bound=Hashable)   # Free
 38_CH = TypeVar("_CH", bound=Hashable)  # Generic of ConditionFunction
 39
 40_P = ParamSpec("_P")   # Free, e.g. for action function.
 41_CP = ParamSpec("_CP")  # Generic of ConditionFunction
 42
 43
 44if TYPE_CHECKING:
 45    # Condition.functions might be wrapped, this is reminder.
 46    _W = Union[_Wrapped[Concatenate["Rule", "Context", ...], Any,
 47                        Concatenate["Rule", "Context", ...], Any],
 48               _Wrapped[Concatenate["Context", ...], Any,
 49                        Concatenate["Context", ...], Any]]
 50    _ActionsDictValues = Union[AnyCallableAction, _W]
 51else:
 52    _ActionsDictValues = AnyCallableAction
 53
 54
[docs] 55class ConditionFunction(Generic[_CP, _CH]): 56 """ 57 Implements a decorator to wrap function to be used with :any:`Rule` classes. 58 The function must return a hashable type, which is used to access the action to be taken 59 by the :any:`Rule.condition` function of the rule. 60 61 Evaluation functions can be combined using the AND, OR and NOT operators to build up more complex rules 62 from simpler ones. 63 The operators :code:`+, &`, :code:`|` and :code:`~` are aliases for :code:`AND, OR` and :code:`NOT` respectively. 64 e.g. with these two functions: 65 :python:`func1 = ConditionFunction(lambda ctx: ctx.speed > 10)` 66 :python:`func2 = ConditionFunction(lambda ctx: ctx.speed < 20)` 67 68 These statements are all equivalent: 69 * :python:`ConditionFunction(lambda ctx: 10 < ctx.speed < 20)` 70 * :python:`func1 + func2` 71 * :python:`func1 & func2` 72 * :python:`func1.AND(func2)` 73 * :python:`ConditionFunction.AND(func1, func2)` 74 75 Hint: 76 ConditionFunctions also allow for more specific returns types 77 78 Example: 79 80 .. code-block:: python 81 82 @ConditionFunction 83 def is_speeding(ctx: Context) -> Hashable: 84 config = ctx.agent.config 85 if config.speed > config.speed_limit+20: 86 return "very fast" 87 elif config.speed > config.speed_limit+5: 88 return "fast" 89 elif: config.speed < config.speed_limit-20: 90 return "very slow" 91 else: 92 return "normal" 93 94 Rule(is_speeding, 95 action={ 96 "very fast": lambda ctx: ctx.agent.config.follow_speed_limits(), 97 "fast" : lambda ctx: ctx.agent.config.set_target_speed(ctx.speed_limit+5) 98 }) 99 100 Parameters: 101 first_argument : 102 If :code:`None` or a string, the class will create a :term:`decorator` 103 that expects a callable as the first argument. 104 If a string is passed it substitutes as the **name** argument. 105 Otherwise a callable is expected like in the snippet used above. 106 name : The name to represent the function. Defaults to :python:`"ConditionFunction"`. 107 truthy : If True, the function will always cast the return value to a boolean value. Defaults to :code:`False`. 108 use_self : If :python:`True`, the function will be treated as a method and the first argument will be the instance of the :py:class:`.Rule` that uses this function. 109 If :code:`None`, the decision depends on the signature of the function, if it has only one argument only the :py:class:`.Context` object is passed, 110 if it has two or more arguments the first argument that is passed is the instance of the :py:class:`.Rule`. 111 Use :code:`False` to not use the instance of the :py:class:`.Rule` as the first argument. 112 Defaults to :code:`None`. 113 114 Returns: 115 ConditionFunction | partial[ConditionFunction] : 116 A :py:class:`ConditionFunction` or a partially initialized version to be used as a decorator 117 when the **first_argument** is not a callable. 118 119 Generics: 120 - _Rule : 121 Generic :py:class:`.Rule` type that could appear in the 122 :py:attr:`evaluation_function's <evaluation_function>` signature. 123 - _CP : :py:class:`typing.ParamSpec` of the passed :py:attr:`evaluation_function`. 124 - _CH : The :term:`Hashable` return type of the :py:attr:`evaluation_function`. 125 """ 126 127 _default_actions: ClassVar[Dict[Hashable, _ActionsDictValues]] = {} 128 """ 129 Default values for the :py:attr:`actions` dictionary. Could be used for subclassing. 130 By default :py:attr:`actions` is a shallow copy of this one. 131 """ 132 133 actions: Dict[Hashable, _ActionsDictValues] 134 """ 135 Mapping of return values to actions to be executed. 136 If this dictionary is not empty it will be used as the :py:attr:`.Rule.actions` dictionary. 137 """ 138 139 if READTHEDOCS and not TYPE_CHECKING: 140 actions: dict[Hashable, CallableActionT] = {} # noqa: F821 141 142 _INVALID_NAMES: ClassVar[set[str]] = {'action', 'actions', 'false_action'} 143 """Forbidden names for action functions.""" 144 145 @overload 146 def __new__(cls, first_argument: Optional[Annotated[str, "name"]] = None, name: str = "ConditionFunction", *, 147 truthy: Literal[False] = False, use_self: Optional[bool] = None) -> Callable[[CallableCondition[RuleT, _P, _H]], ConditionFunction[_P, _H]]: ... 148 149 @overload 150 def __new__(cls, first_argument: Optional[Annotated[str, "name"]] = None, name: str = "ConditionFunction", *, 151 truthy: Literal[True], use_self: Optional[bool] = None) -> Callable[[CallableCondition[RuleT, _P, Any]], ConditionFunction[_P, bool]]: ... 152 153 @overload 154 def __new__(cls, first_argument: CallableCondition[RuleT, _CP, _CH], name: str = "ConditionFunction", *, 155 truthy: bool = False, use_self: Optional[bool] = None) -> Self: ... 156 157 def __new__(cls, 158 first_argument: Optional[Annotated[str, "name"] | CallableCondition[RuleT, _CP, _CH]] = None, 159 name: str = "ConditionFunction", 160 *, 161 truthy: bool = False, 162 use_self: Optional[bool] = None) \ 163 -> ( 164 Callable[[CallableCondition[RuleT, _P, _H]], ConditionFunction[_P, _H]] # default decorator 165 | Callable[[CallableCondition[RuleT, _P, Any]], ConditionFunction[_P, bool]] # truthy=True 166 | Self # function way 167 ): 168 # example usage: @ConditionFunction("name") 169 if isinstance(first_argument, str): 170 # Calling decorator with a string @ConditionFunction("name") 171 assert name == "ConditionFunction", "The `name` argument must be the default." 172 return partial(cls, name=first_argument, truthy=truthy, use_self=use_self) # type: ignore[return-value] 173 174 # example_usage: @ConditionFunction(name="name") or @ConditionFunction(truthy=True) 175 if first_argument is None: 176 return partial(cls, name=name, truthy=truthy, use_self=use_self) # type: ignore[return-value] 177 assert isinstance(first_argument, (Callable, staticmethod)), f"First argument must be a callable, not {type(first_argument)}" 178 # @ConditionFunction or ConditionFunction(function) 179 return super().__new__(cls) # new instance 180 181 if READTHEDOCS and not TYPE_CHECKING: 182 __new__.__annotations__["first_argument"] = Optional["str(name)" | AnyCallableCondition] # noqa: F821, TCH010 183 184 def __init__(self, 185 evaluation_function: CallableCondition[RuleT, _CP, _CH], 186 name: str = "ConditionFunction", 187 *, 188 truthy: bool = False, 189 use_self: Optional[bool] = None): 190 update_wrapper(self, 191 evaluation_function, # pyright: ignore[reportArgumentType] 192 assigned=("__qualname__", "__module__", "__annotations__", "__doc__")) 193 if isinstance(evaluation_function, staticmethod): 194 evaluation_function = evaluation_function.__func__ # type: ignore 195 196 self.evaluation_function = evaluation_function 197 """ 198 The function that is wrapped by the :py:class:`ConditionFunction`. 199 Uses the generic type hints :py:obj:`_CP`, :py:obj:`_CH` of the class. 200 """ 201 if READTHEDOCS and not TYPE_CHECKING: 202 self.evaluation_function: CallableConditionT 203 204 self.truthy: bool = truthy 205 if name != "ConditionFunction": 206 self.name = name 207 elif hasattr(evaluation_function, "__name__"): 208 self.name = evaluation_function.__name__ 209 else: 210 self.name = str(evaluation_function) 211 self.use_self: bool | None = use_self 212 self.actions = self._default_actions.copy() 213 214 # Possible Pyright <1.1.377 : bug when this is active 215 @overload 216 def __call__(self, ctx: "Rule", _: "Context", *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ... 217 218 @overload 219 def __call__(self, ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ... 220 221 @overload 222 def __call__(self, ctx: Union["Rule", "Context"], *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ... 223 224 def __call__(self, ctx: Union["Rule", "Context"], *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: # pyright: ignore[reportInconsistentOverload] 225 """ 226 Note: 227 To handle the method vs. function difference depending on __get__ 228 `ctx` can be either a Context (condition as function) or a Rule (condition as method), 229 In the method case the real Context object is args[0]! 230 """ 231 try: 232 rule_result = self.evaluation_function(ctx, *args, **kwargs) # type: ignore[arg-type] 233 except Exception: 234 print(f"ERROR: in Rule {self.name} with function {self.evaluation_function}") 235 raise 236 # Handle function vs. method 237 if not isinstance(ctx, Context): 238 ctx = args[0] # type: ignore 239 assert isinstance(ctx, Context), f"This should not happen: In the method case the argument must be a Context object, not {type(ctx)}" 240 ctx.rule_result = rule_result 241 #if self.truthy: 242 # return bool(rule_result) # type: ignore 243 #assert isinstance(rule_result, Hashable), f"evaluation_function must return a hashable type, not {type(rule_result)}" # type: ignore 244 return rule_result 245 246 @overload 247 def __get__(self, instance: "Rule", objtype: Any = None) -> "Self": ... 248 249 @overload 250 def __get__(self, instance: None, objtype: Optional["type[Rule]"]) -> "partial[_CH]": ... 251 252 @overload 253 def __get__(self, instance: None, objtype: "type[Rule]") -> "partial[_CH]": ... 254 255 def __get__(self, instance: "Optional[Rule]", objtype: Optional["type[Rule]"] = None) -> "Self | partial[_CH]": 256 """ 257 :term:`Descriptor Protocol <descriptor>`, for in class usage like Rule.condition 258 """ 259 # NOTE: instance.condition is not an ConditionFunction, it is a partial of one. 260 if instance is None: 261 return self # called on class Rule.condition 262 return partial(self, instance) # NOTE: This fixes "ctx" to instance in __call__, the real "ctx" in __call__ is provided through *args 263
[docs] 264 def copy(self, copy_actions: bool = False): 265 """ 266 Copies the class by creating a new instance. 267 268 Parameters: 269 copy_actions: If :python:`True`, the :py:attr:`.ConditionFunction.actions` 270 dictionary is copied as well. 271 Defaults to :code:`False`. 272 273 Returns: 274 ConditionFunction: A new instance, with the same :python:`__init__` arguments. 275 276 Warning: 277 Be aware that when using **copy_actions** the actions themselves are not copied; 278 they are identical and shared. 279 """ 280 instance = super().__new__(self.__class__) 281 self.__class__.__init__(instance, self.evaluation_function, self.name, truthy=self.truthy, use_self=self.use_self) 282 if copy_actions: 283 instance.actions = self.actions.copy() 284 return instance
285 286 #Helpers to extract a useful string representation of the function 287 @staticmethod 288 def _complete_func_to_string(func: Callable[..., Any]) -> str: 289 func_lines = inspect.getsourcelines(func)[0] 290 return "".join(func_lines).strip() # as string 291 292 @staticmethod 293 def _func_to_string(func: Callable[..., Any]) -> str: 294 if not hasattr(func, "__name__"): 295 return str(func) 296 if func.__name__ == "<lambda>": 297 return ConditionFunction._complete_func_to_string(func) 298 return func.__name__ 299 300 @staticmethod 301 def _is_partial_action(action_function: Callable[_P, _T]) -> TypeGuard["partial[_T] | Callable[_P, _T]"]: 302 return isinstance(action_function, partial) 303 304 @overload 305 def _check_action(self, action_function: CallableT, key: Hashable, 306 *, use_self: Optional[bool] = None) -> CallableT: ... 307 308 @overload 309 def _check_action(self, action_function: Callable[_P, _T], key: Hashable, 310 *, use_self: Optional[bool] = None, 311 **preset_kwargs: _P.kwargs 312 ) -> "_Wrapped[_P, _T, _P, _T]": ... 313 314 def _check_action(self, action_function: Callable[_P, _T], key: Hashable, 315 *, use_self: Optional[bool] = None, 316 **preset_kwargs: _P.kwargs 317 ) -> "Callable[_P, _T] | _Wrapped[_P, _T, _P, _T]": 318 """ 319 Checks if an action has an invalid name, a key is already registered. 320 If kwargs are passed these will be fixed for the returned action. 321 322 Parameters: 323 action_function: The action function to be checked. 324 key: The key to be checked if it is already registered. 325 use_self: If not :code:`None`, a `use_self` attribute is set on the function to indicate 326 if the function is a method or not. 327 preset_kwargs: The kwargs to be preset. 328 """ 329 if action_function.__name__ in ConditionFunction._INVALID_NAMES: 330 raise ValueError("When using ConditionFunction.add_action, the action function's name " 331 f"may not be in {ConditionFunction._INVALID_NAMES}, " 332 f"got '{action_function.__name__}'.") 333 if key in self.actions: 334 print("Warning: Overwriting already registered action", self.actions[key], 335 "with key", f"'{key}'", "in", self.name) 336 if hasattr(action_function, "use_self") and action_function.use_self is not use_self: # pyright: ignore[reportFunctionMemberAccess] 337 print("Warning: Registering action:", action_function.__name__, 338 "with a different use_self value than before. old:", action_function.use_self, # pyright: ignore[reportFunctionMemberAccess] 339 "new:", use_self) 340 if not preset_kwargs: 341 # NOTE: This does not set in on a wrapper, if the function is used multiple times 342 # the value might be overwritten and above warning is shown. 343 if use_self is not None: 344 action_function.use_self = use_self # type: ignore[attr-defined] 345 return action_function # return original function 346 347 # Preset kwargs. 348 @wraps(action_function) 349 def action_function_with_kwargs(*args: _P.args, **kwargs: _P.kwargs) -> _T: 350 # inserts args when called and kwargs from check action 351 # allows overrides 352 return action_function(*args, **preset_kwargs, **kwargs) # type: ignore 353 if use_self is not None: 354 action_function_with_kwargs.use_self = use_self # type: ignore[attr-defined] 355 return action_function_with_kwargs 356 357 if TYPE_CHECKING: 358 class _RegisterActionDecorator: 359 """ 360 Helper that describes the return type of :py:meth:`.ConditionFunction.register_action`. 361 """ 362 @overload 363 def __call__(self, func: Callable[Concatenate[RuleT, Context, ...], _T]) -> Callable[[RuleT, Context], _T]: ... 364 365 @overload 366 def __call__(self, func: Callable[Concatenate[Context, ...], _T]) -> Callable[[Context], _T]: ... 367 368 def __call__(self, func: CallableAction[RuleT, _P, _T]) -> CallableAction[RuleT, [], _T]: 369 ... 370
[docs] 371 @singledispatchmethod 372 def register_action(self, 373 key: typing.Hashable = True, 374 *, 375 use_self: Optional[bool] = None, 376 **kwargs: _P.kwargs) \ 377 -> "_RegisterActionDecorator": 378 """ 379 Add an action to be executed when the condition function returns a specific value. 380 381 This function can be used as a decorator or as a method: 382 383 .. code-block:: python 384 385 # As decorator 386 @ConditionFunction 387 def is_speeding(ctx: Context) -> Literal["very fast", "fast", "normal", "slow", True]: 388 ... 389 390 @is_speeding.register_action(key="very fast") 391 # or 392 @is_speeding.register_action # default key is True 393 def very_fast_action(ctx: Context): 394 ctx.agent.set_target_speed(ctx.config.target_speed-5) 395 396 # Or as function 397 def fast_action(ctx: Context, speed: int): 398 ctx.agent.set_target_speed(speed) 399 400 is_speeding.register_action("fast", fast_action) 401 402 # Custom keywords without Rule instance 403 is_speeding.register_action("normal", 404 use_self=False, # no Rule in the signature 405 speed=ctx.config.target_speed) # keyword argument 406 def custom_speed(ctx: Context, speed: int): 407 ctx.agent.set_target_speed(speed) 408 409 Parameters: 410 key: If the condition function returns this value, this action will be executed. 411 Defaults to :python:`True`. 412 action_function: (When not used as decorator) The action to be executed. 413 use_self: If not :code:`None`, a :code:`use_self` attribute is set on the function to indicate 414 if the function is a method or not. 415 kwargs : Keyword arguments for the decorated function. 416 417 Returns: 418 Decorator to be used or the decorated function as ConditionFunction. 419 420 Note: 421 Only one action is allowed per key. If an action is already registered for the key, 422 it will be overwritten. 423 """ 424 def decorator(action_function: AnyCallableAction) -> AnyCallableAction: 425 checked_action = self._check_action(action_function, key, 426 use_self=use_self, **kwargs) 427 self.actions[key] = checked_action 428 return checked_action 429 return cast("ConditionFunction._RegisterActionDecorator", decorator)
430 431 # no correct overload when dispatching on a callable 432 @register_action.register(collections.abc.Callable) # type: ignore 433 def _register_action_directly(self, 434 action_function: CallableT, 435 key: Hashable = True, 436 *, 437 use_self: Optional[bool] = None, 438 **kwargs: _P.kwargs) -> CallableT: 439 """ 440 - Case 1 441 @register_action # key is True 442 def action_function(ctx: Context): 443 ... 444 445 - Case 2 446 register_action(action_function, key="very fast", **action_function_kwargs) 447 """ 448 checked_action = self._check_action(action_function, key, 449 use_self=use_self, **kwargs) 450 # Case 1 and 2 (without kwargs): checked_action is the action function, else wrapped. 451 self.actions[key] = checked_action # register action 452 return checked_action 453 454 # ---------------------- 455 456 if not TYPE_CHECKING: 457 # This is shadowed as pyright interprets self.__class__.__name__ as this function 458 @property 459 def __name__(self) -> str: 460 """ 461 Returns the name of the function. 462 """ 463 return self.name 464 else: 465 __name__: str 466 467 def __str__(self): 468 return self.name 469 470 def __repr__(self) -> str: 471 if self.name == "ConditionFunction": 472 s = self.__class__.__name__ + f"({self.evaluation_function}" 473 else: 474 s = self.__class__.__name__ + f'(name="{self.name}", evaluation_function={self.evaluation_function}' 475 if self.truthy: 476 s += ", truthy=True" 477 s += ")" 478 return s 479
[docs] 480 @classmethod 481 def AND(cls, func1: "ConditionFunction[_CP, _CH]", func2: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 482 """Combine two functions with :python:`and`, i.e. to return True if both return True.""" 483 # NOTE: arguments are, lost. For type hints need to make ConditionFunction generic 484 def combined_func(ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs): 485 return func1(ctx, *args, **kwargs) and func2(ctx, *args, **kwargs) 486 return cls(combined_func, name=f"{func1.name}_and_{func2.name}") # type: ignore
487
[docs] 488 @classmethod 489 def OR(cls, func1: "ConditionFunction[_CP, _CH]", func2: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 490 """Combine two functions to return True if either returns True.""" 491 def combined_func(ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs): 492 return func1(ctx, *args, **kwargs) or func2(ctx, *args, **kwargs) 493 494 return cls(combined_func, name=f"{func1.name}_or_{func2.name}") # type: ignore
495
[docs] 496 @classmethod 497 def NOT(cls, func: "ConditionFunction[_CP, _CH]") -> "ConditionFunction[_CP, bool]": 498 """Invert the return value of a function.""" 499 def combined_func(ctx: "Context | Rule", *args: _CP.args, **kwargs: _CP.kwargs): 500 return not func(ctx, *args, **kwargs) 501 return cls(combined_func, name=f"not_{func.name}") # type: ignore
502
[docs] 503 def __add__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 504 """Combine with another function using :py:meth:`AND`.""" 505 return self.AND(self, other)
506
[docs] 507 def __and__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 508 """Combine with another function using :py:meth:`AND`.""" 509 return self.AND(self, other)
510
[docs] 511 def __or__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]": 512 """Combine with another function using :py:meth:`OR`.""" 513 return self.OR(self, other)
514
[docs] 515 def __invert__(self) -> "ConditionFunction[_CP, bool]": 516 """ 517 Invert the return value of the function. 518 """ 519 return self.NOT(self)
520 521 522class ActionFunction(ConditionFunction[_P, _T]): 523 """ 524 A decorator that can be used with :any:`Rule.action`. It is nearly equivalent to :any:`ConditionFunction`, 525 only calling the function is more simple, i.e. does not assert a Hashable return type. 526 527 .. deprecated:: 528 Will likely be removed as no strong use case. 529 530 :meta private: 531 """ 532 533 def __init__(self, action_function: CallableAction[RuleT, _P, _T], name: str = "ActionFunction", *, use_self: Optional[bool] = None): 534 super().__init__(action_function, name, use_self=use_self) 535 536 @classmethod 537 def NOT(cls, _: Never) -> "NoReturn": # type: ignore 538 """ 539 Raises: 540 NotImplementedError: NOT is not implemented for ActionFunction. 541 """ 542 raise NotImplementedError("NOT is not implemented for ActionFunction") 543 544 def __call__(self, ctx: "Rule | Context", *args: _P.args, **kwargs: _P.kwargs) -> _T: 545 return self.evaluation_function(ctx, *args, **kwargs) # type: ignore[arg-type]