1# pyright: strict
2# pyright: reportInconsistentConstructor=information
3# pyright: reportGeneralTypeIssues=warning
4# pyright: reportAttributeAccessIssue=warning
5
6from __future__ import annotations
7
8import collections.abc
9import inspect
10import typing
11from functools import partial, update_wrapper, wraps
12from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Generic, Hashable, Optional, Union, cast
13
14from typing_extensions import Annotated, Concatenate, Literal, Never, ParamSpec, Self, TypeGuard, TypeVar, overload
15
16from classes.constants import READTHEDOCS
17from classes.type_protocols import (
18 AnyCallableAction,
19 AnyCallableCondition,
20 CallableAction,
21 CallableCondition,
22 CallableConditionT,
23 CallableT,
24 RuleT,
25)
26from launch_tools import singledispatchmethod
27
28if TYPE_CHECKING:
29 from functools import _Wrapped # type: ignore
30 from typing import NoReturn
31
32 # NOTE: to prevent this circular import when classes.rule are imported Rule and Context are set accordingly for this module
33 from classes.rule import Context, Rule # circular import # noqa: TC004,RUF100
34
35
36_T = TypeVar("_T")
37_H = TypeVar("_H", bound=Hashable) # Free
38_CH = TypeVar("_CH", bound=Hashable) # Generic of ConditionFunction
39
40_P = ParamSpec("_P") # Free, e.g. for action function.
41_CP = ParamSpec("_CP") # Generic of ConditionFunction
42
43
44if TYPE_CHECKING:
45 # Condition.functions might be wrapped, this is reminder.
46 _W = Union[
47 _Wrapped[Concatenate["Rule", "Context", ...], Any, Concatenate["Rule", "Context", ...], Any],
48 _Wrapped[Concatenate["Context", ...], Any, Concatenate["Context", ...], Any],
49 ]
50 _ActionsDictValues = Union[AnyCallableAction, _W]
51else:
52 _ActionsDictValues = AnyCallableAction
53
54
[docs]
55class ConditionFunction(Generic[_CP, _CH]):
56 """
57 Implements a decorator to wrap function to be used with :any:`Rule` classes.
58 The function must return a hashable type, which is used to access the action to be taken
59 by the :any:`Rule.condition` function of the rule.
60
61 Evaluation functions can be combined using the AND, OR and NOT operators to build up more complex rules
62 from simpler ones.
63 The operators :code:`+, &`, :code:`|` and :code:`~` are aliases for :code:`AND, OR` and :code:`NOT` respectively.
64 e.g. with these two functions:
65 :python:`func1 = ConditionFunction(lambda ctx: ctx.speed > 10)`
66 :python:`func2 = ConditionFunction(lambda ctx: ctx.speed < 20)`
67
68 These statements are all equivalent:
69 * :python:`ConditionFunction(lambda ctx: 10 < ctx.speed < 20)`
70 * :python:`func1 + func2`
71 * :python:`func1 & func2`
72 * :python:`func1.AND(func2)`
73 * :python:`ConditionFunction.AND(func1, func2)`
74
75 Hint:
76 ConditionFunctions also allow for more specific returns types
77
78 Example:
79
80 .. code-block:: python
81
82 @ConditionFunction
83 def is_speeding(ctx: Context) -> Hashable:
84 config = ctx.agent.config
85 if config.speed > config.speed_limit+20:
86 return "very fast"
87 elif config.speed > config.speed_limit+5:
88 return "fast"
89 elif: config.speed < config.speed_limit-20:
90 return "very slow"
91 else:
92 return "normal"
93
94 Rule(is_speeding,
95 action={
96 "very fast": lambda ctx: ctx.agent.config.follow_speed_limits(),
97 "fast" : lambda ctx: ctx.agent.config.set_target_speed(ctx.speed_limit+5)
98 })
99
100 Parameters:
101 first_argument :
102 If :code:`None` or a string, the class will create a :term:`decorator`
103 that expects a callable as the first argument.
104 If a string is passed it substitutes as the **name** argument.
105 Otherwise a callable is expected like in the snippet used above.
106 name : The name to represent the function. Defaults to :python:`"ConditionFunction"`.
107 truthy : If True, the function will always cast the return value to a boolean value. Defaults to :code:`False`.
108 use_self : If :python:`True`, the function will be treated as a method and the first argument will be the instance of the :py:class:`.Rule` that uses this function.
109 If :code:`None`, the decision depends on the signature of the function, if it has only one argument only the :py:class:`.Context` object is passed,
110 if it has two or more arguments the first argument that is passed is the instance of the :py:class:`.Rule`.
111 Use :code:`False` to not use the instance of the :py:class:`.Rule` as the first argument.
112 Defaults to :code:`None`.
113
114 Returns:
115 ConditionFunction | partial[ConditionFunction] :
116 A :py:class:`ConditionFunction` or a partially initialized version to be used as a decorator
117 when the **first_argument** is not a callable.
118
119 Generics:
120 - _Rule :
121 Generic :py:class:`.Rule` type that could appear in the
122 :py:attr:`evaluation_function's <evaluation_function>` signature.
123 - _CP : :py:class:`typing.ParamSpec` of the passed :py:attr:`evaluation_function`.
124 - _CH : The :term:`Hashable` return type of the :py:attr:`evaluation_function`.
125 """
126
127 _default_actions: ClassVar[Dict[Hashable, _ActionsDictValues]] = {}
128 """
129 Default values for the :py:attr:`actions` dictionary. Could be used for subclassing.
130 By default :py:attr:`actions` is a shallow copy of this one.
131 """
132
133 actions: Dict[Hashable, _ActionsDictValues]
134 """
135 Mapping of return values to actions to be executed.
136 If this dictionary is not empty it will be used as the :py:attr:`.Rule.actions` dictionary.
137 """
138
139 if READTHEDOCS and not TYPE_CHECKING:
140 actions: dict[Hashable, CallableActionT] = {} # noqa: F821
141
142 _INVALID_NAMES: ClassVar[set[str]] = {"action", "actions", "false_action"}
143 """Forbidden names for action functions."""
144
145 @overload
146 def __new__(
147 cls,
148 first_argument: Optional[Annotated[str, "name"]] = None,
149 name: str = "ConditionFunction",
150 *,
151 truthy: Literal[False] = False,
152 use_self: Optional[bool] = None,
153 ) -> Callable[[CallableCondition[RuleT, _P, _H]], ConditionFunction[_P, _H]]: ...
154
155 @overload
156 def __new__(
157 cls,
158 first_argument: Optional[Annotated[str, "name"]] = None,
159 name: str = "ConditionFunction",
160 *,
161 truthy: Literal[True],
162 use_self: Optional[bool] = None,
163 ) -> Callable[[CallableCondition[RuleT, _P, Any]], ConditionFunction[_P, bool]]: ...
164
165 @overload
166 def __new__(
167 cls,
168 first_argument: CallableCondition[RuleT, _CP, _CH],
169 name: str = "ConditionFunction",
170 *,
171 truthy: bool = False,
172 use_self: Optional[bool] = None,
173 ) -> Self: ...
174
175 def __new__(
176 cls,
177 first_argument: Optional[Annotated[str, "name"] | CallableCondition[RuleT, _CP, _CH]] = None,
178 name: str = "ConditionFunction",
179 *,
180 truthy: bool = False,
181 use_self: Optional[bool] = None,
182 ) -> (
183 Callable[[CallableCondition[RuleT, _P, _H]], ConditionFunction[_P, _H]] # default decorator
184 | Callable[[CallableCondition[RuleT, _P, Any]], ConditionFunction[_P, bool]] # truthy=True
185 | Self # function way
186 ):
187 # example usage: @ConditionFunction("name")
188 if isinstance(first_argument, str):
189 # Calling decorator with a string @ConditionFunction("name")
190 assert name == "ConditionFunction", "The `name` argument must be the default."
191 return partial(cls, name=first_argument, truthy=truthy, use_self=use_self) # type: ignore[return-value]
192
193 # example_usage: @ConditionFunction(name="name") or @ConditionFunction(truthy=True)
194 if first_argument is None:
195 return partial(cls, name=name, truthy=truthy, use_self=use_self) # type: ignore[return-value]
196 assert isinstance(first_argument, (Callable, staticmethod)), (
197 f"First argument must be a callable, not {type(first_argument)}"
198 )
199 # @ConditionFunction or ConditionFunction(function)
200 return super().__new__(cls) # new instance
201
202 if READTHEDOCS and not TYPE_CHECKING:
203 __new__.__annotations__["first_argument"] = Optional["str(name)" | AnyCallableCondition] # noqa: F821, TC010
204
205 def __init__(
206 self,
207 evaluation_function: CallableCondition[RuleT, _CP, _CH],
208 name: str = "ConditionFunction",
209 *,
210 truthy: bool = False,
211 use_self: Optional[bool] = None,
212 ):
213 update_wrapper(
214 self,
215 evaluation_function, # pyright: ignore[reportArgumentType]
216 assigned=("__qualname__", "__module__", "__annotations__", "__doc__"),
217 )
218 if isinstance(evaluation_function, staticmethod):
219 evaluation_function = evaluation_function.__func__ # type: ignore
220
221 self.evaluation_function = evaluation_function
222 """
223 The function that is wrapped by the :py:class:`ConditionFunction`.
224 Uses the generic type hints :py:obj:`_CP`, :py:obj:`_CH` of the class.
225 """
226 if READTHEDOCS and not TYPE_CHECKING:
227 self.evaluation_function: CallableConditionT
228
229 self.truthy: bool = truthy
230 if name != "ConditionFunction":
231 self.name = name
232 elif hasattr(evaluation_function, "__name__"):
233 self.name = evaluation_function.__name__
234 else:
235 self.name = str(evaluation_function)
236 self.use_self: bool | None = use_self
237 self.actions = self._default_actions.copy()
238
239 # Possible Pyright <1.1.377 : bug when this is active
240 @overload
241 def __call__(self, ctx: "Rule", _: "Context", *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ...
242
243 @overload
244 def __call__(self, ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ...
245
246 @overload
247 def __call__(self, ctx: Union["Rule", "Context"], *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: ...
248
249 def __call__(self, ctx: Union["Rule", "Context"], *args: _CP.args, **kwargs: _CP.kwargs) -> _CH: # pyright: ignore[reportInconsistentOverload]
250 """
251 Note:
252 To handle the method vs. function difference depending on __get__
253 `ctx` can be either a Context (condition as function) or a Rule (condition as method),
254 In the method case the real Context object is args[0]!
255 """
256 try:
257 rule_result = self.evaluation_function(ctx, *args, **kwargs) # type: ignore[arg-type]
258 except Exception:
259 print(f"ERROR: in Rule {self.name} with function {self.evaluation_function}")
260 raise
261 # Handle function vs. method
262 if not isinstance(ctx, Context):
263 ctx = args[0] # type: ignore
264 assert isinstance(ctx, Context), (
265 f"This should not happen: In the method case the argument must be a Context object, not {type(ctx)}"
266 )
267 ctx.rule_result = rule_result
268 # if self.truthy:
269 # return bool(rule_result) # type: ignore
270 # assert isinstance(rule_result, Hashable), f"evaluation_function must return a hashable type, not {type(rule_result)}" # type: ignore
271 return rule_result
272
273 @overload
274 def __get__(self, instance: "Rule", objtype: Any = None) -> "Self": ...
275
276 @overload
277 def __get__(self, instance: None, objtype: Optional["type[Rule]"]) -> "partial[_CH]": ...
278
279 @overload
280 def __get__(self, instance: None, objtype: "type[Rule]") -> "partial[_CH]": ...
281
282 def __get__(self, instance: "Optional[Rule]", objtype: Optional["type[Rule]"] = None) -> "Self | partial[_CH]":
283 """
284 :term:`Descriptor Protocol <descriptor>`, for in class usage like Rule.condition
285 """
286 # NOTE: instance.condition is not an ConditionFunction, it is a partial of one.
287 if instance is None:
288 return self # called on class Rule.condition
289 return partial(
290 self, instance
291 ) # NOTE: This fixes "ctx" to instance in __call__, the real "ctx" in __call__ is provided through *args
292
[docs]
293 def copy(self, copy_actions: bool = False):
294 """
295 Copies the class by creating a new instance.
296
297 Parameters:
298 copy_actions: If :python:`True`, the :py:attr:`.ConditionFunction.actions`
299 dictionary is copied as well.
300 Defaults to :code:`False`.
301
302 Returns:
303 ConditionFunction: A new instance, with the same :python:`__init__` arguments.
304
305 Warning:
306 Be aware that when using **copy_actions** the actions themselves are not copied;
307 they are identical and shared.
308 """
309 instance = super().__new__(self.__class__)
310 self.__class__.__init__(
311 instance, self.evaluation_function, self.name, truthy=self.truthy, use_self=self.use_self
312 )
313 if copy_actions:
314 instance.actions = self.actions.copy()
315 return instance
316
317 # Helpers to extract a useful string representation of the function
318 @staticmethod
319 def _complete_func_to_string(func: Callable[..., Any]) -> str:
320 func_lines = inspect.getsourcelines(func)[0]
321 return "".join(func_lines).strip() # as string
322
323 @staticmethod
324 def _func_to_string(func: Callable[..., Any]) -> str:
325 if not hasattr(func, "__name__"):
326 return str(func)
327 if func.__name__ == "<lambda>":
328 return ConditionFunction._complete_func_to_string(func)
329 return func.__name__
330
331 @staticmethod
332 def _is_partial_action(action_function: Callable[_P, _T]) -> TypeGuard["partial[_T] | Callable[_P, _T]"]:
333 return isinstance(action_function, partial)
334
335 @overload
336 def _check_action(
337 self, action_function: CallableT, key: Hashable, *, use_self: Optional[bool] = None
338 ) -> CallableT: ...
339
340 @overload
341 def _check_action(
342 self,
343 action_function: Callable[_P, _T],
344 key: Hashable,
345 *,
346 use_self: Optional[bool] = None,
347 **preset_kwargs: _P.kwargs,
348 ) -> "_Wrapped[_P, _T, _P, _T]": ...
349
350 def _check_action(
351 self,
352 action_function: Callable[_P, _T],
353 key: Hashable,
354 *,
355 use_self: Optional[bool] = None,
356 **preset_kwargs: _P.kwargs,
357 ) -> "Callable[_P, _T] | _Wrapped[_P, _T, _P, _T]":
358 """
359 Checks if an action has an invalid name, a key is already registered.
360 If kwargs are passed these will be fixed for the returned action.
361
362 Parameters:
363 action_function: The action function to be checked.
364 key: The key to be checked if it is already registered.
365 use_self: If not :code:`None`, a `use_self` attribute is set on the function to indicate
366 if the function is a method or not.
367 preset_kwargs: The kwargs to be preset.
368 """
369 if action_function.__name__ in ConditionFunction._INVALID_NAMES:
370 msg = (
371 "When using ConditionFunction.add_action, the action function's name "
372 f"may not be in {ConditionFunction._INVALID_NAMES}, got '{action_function.__name__}'."
373 )
374 raise ValueError(msg)
375 if key in self.actions:
376 print(
377 "Warning: Overwriting already registered action",
378 self.actions[key],
379 "with key",
380 f"'{key}'",
381 "in",
382 self.name,
383 )
384 if hasattr(action_function, "use_self") and action_function.use_self is not use_self: # pyright: ignore[reportFunctionMemberAccess]
385 print(
386 "Warning: Registering action:",
387 action_function.__name__,
388 "with a different use_self value than before. old:",
389 action_function.use_self, # pyright: ignore[reportFunctionMemberAccess]
390 "new:",
391 use_self,
392 )
393 if not preset_kwargs:
394 # NOTE: This does not set in on a wrapper, if the function is used multiple times
395 # the value might be overwritten and above warning is shown.
396 if use_self is not None:
397 action_function.use_self = use_self # type: ignore[attr-defined]
398 return action_function # return original function
399
400 # Preset kwargs.
401 @wraps(action_function)
402 def action_function_with_kwargs(*args: _P.args, **kwargs: _P.kwargs) -> _T:
403 # inserts args when called and kwargs from check action
404 # allows overrides
405 return action_function(*args, **preset_kwargs, **kwargs) # type: ignore
406
407 if use_self is not None:
408 action_function_with_kwargs.use_self = use_self # type: ignore[attr-defined]
409 return action_function_with_kwargs
410
411 if TYPE_CHECKING:
412
413 class _RegisterActionDecorator:
414 """
415 Helper that describes the return type of :py:meth:`.ConditionFunction.register_action`.
416 """
417
418 @overload
419 def __call__(
420 self, func: Callable[Concatenate[RuleT, Context, ...], _T]
421 ) -> Callable[[RuleT, Context], _T]: ...
422
423 @overload
424 def __call__(self, func: Callable[Concatenate[Context, ...], _T]) -> Callable[[Context], _T]: ...
425
426 def __call__(self, func: CallableAction[RuleT, _P, _T]) -> CallableAction[RuleT, [], _T]: ...
427
[docs]
428 @singledispatchmethod
429 def register_action(
430 self, key: typing.Hashable = True, *, use_self: Optional[bool] = None, **kwargs: _P.kwargs
431 ) -> "_RegisterActionDecorator":
432 """
433 Add an action to be executed when the condition function returns a specific value.
434
435 This function can be used as a decorator or as a method:
436
437 .. code-block:: python
438
439 # As decorator
440 @ConditionFunction
441 def is_speeding(ctx: Context) -> Literal["very fast", "fast", "normal", "slow", True]:
442 ...
443
444 @is_speeding.register_action(key="very fast")
445 # or
446 @is_speeding.register_action # default key is True
447 def very_fast_action(ctx: Context):
448 ctx.agent.set_target_speed(ctx.config.target_speed-5)
449
450 # Or as function
451 def fast_action(ctx: Context, speed: int):
452 ctx.agent.set_target_speed(speed)
453
454 is_speeding.register_action("fast", fast_action)
455
456 # Custom keywords without Rule instance
457 is_speeding.register_action("normal",
458 use_self=False, # no Rule in the signature
459 speed=ctx.config.target_speed) # keyword argument
460 def custom_speed(ctx: Context, speed: int):
461 ctx.agent.set_target_speed(speed)
462
463 Parameters:
464 key: If the condition function returns this value, this action will be executed.
465 Defaults to :python:`True`.
466 action_function: (When not used as decorator) The action to be executed.
467 use_self: If not :code:`None`, a :code:`use_self` attribute is set on the function to indicate
468 if the function is a method or not.
469 kwargs : Keyword arguments for the decorated function.
470
471 Returns:
472 Decorator to be used or the decorated function as ConditionFunction.
473
474 Note:
475 Only one action is allowed per key. If an action is already registered for the key,
476 it will be overwritten.
477 """
478
479 def decorator(action_function: AnyCallableAction) -> AnyCallableAction:
480 checked_action = self._check_action(action_function, key, use_self=use_self, **kwargs)
481 self.actions[key] = checked_action
482 return checked_action
483
484 return cast("ConditionFunction._RegisterActionDecorator", decorator)
485
486 # no correct overload when dispatching on a callable
487 @register_action.register(collections.abc.Callable) # type: ignore
488 def _register_action_directly(
489 self, action_function: CallableT, key: Hashable = True, *, use_self: Optional[bool] = None, **kwargs: _P.kwargs
490 ) -> CallableT:
491 """
492 - Case 1
493 @register_action # key is True
494 def action_function(ctx: Context):
495 ...
496
497 - Case 2
498 register_action(action_function, key="very fast", **action_function_kwargs)
499 """
500 checked_action = self._check_action(action_function, key, use_self=use_self, **kwargs)
501 # Case 1 and 2 (without kwargs): checked_action is the action function, else wrapped.
502 self.actions[key] = checked_action # register action
503 return checked_action
504
505 # ----------------------
506
507 if not TYPE_CHECKING:
508 # This is shadowed as pyright interprets self.__class__.__name__ as this function
509 @property
510 def __name__(self) -> str:
511 """
512 Returns the name of the function.
513 """
514 return self.name
515
516 else:
517 __name__: str
518
519 def __str__(self):
520 return self.name
521
522 def __repr__(self) -> str:
523 if self.name == "ConditionFunction":
524 s = self.__class__.__name__ + f"({self.evaluation_function}"
525 else:
526 s = self.__class__.__name__ + f'(name="{self.name}", evaluation_function={self.evaluation_function}'
527 if self.truthy:
528 s += ", truthy=True"
529 s += ")"
530 return s
531
[docs]
532 @classmethod
533 def AND(
534 cls, func1: "ConditionFunction[_CP, _CH]", func2: "ConditionFunction[_CP, _H]"
535 ) -> "ConditionFunction[_CP, _CH | _H]":
536 """Combine two functions with :python:`and`, i.e. to return True if both return True."""
537
538 # NOTE: arguments are, lost. For type hints need to make ConditionFunction generic
539 def combined_func(ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs):
540 return func1(ctx, *args, **kwargs) and func2(ctx, *args, **kwargs)
541
542 return cls(combined_func, name=f"{func1.name}_and_{func2.name}") # type: ignore
543
[docs]
544 @classmethod
545 def OR(
546 cls, func1: "ConditionFunction[_CP, _CH]", func2: "ConditionFunction[_CP, _H]"
547 ) -> "ConditionFunction[_CP, _CH | _H]":
548 """Combine two functions to return True if either returns True."""
549
550 def combined_func(ctx: "Context", *args: _CP.args, **kwargs: _CP.kwargs):
551 return func1(ctx, *args, **kwargs) or func2(ctx, *args, **kwargs)
552
553 return cls(combined_func, name=f"{func1.name}_or_{func2.name}") # type: ignore
554
[docs]
555 @classmethod
556 def NOT(cls, func: "ConditionFunction[_CP, _CH]") -> "ConditionFunction[_CP, bool]":
557 """Invert the return value of a function."""
558
559 def combined_func(ctx: "Context | Rule", *args: _CP.args, **kwargs: _CP.kwargs):
560 return not func(ctx, *args, **kwargs)
561
562 return cls(combined_func, name=f"not_{func.name}") # type: ignore
563
[docs]
564 def __add__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]":
565 """Combine with another function using :py:meth:`AND`."""
566 return self.AND(self, other)
567
[docs]
568 def __and__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]":
569 """Combine with another function using :py:meth:`AND`."""
570 return self.AND(self, other)
571
[docs]
572 def __or__(self, other: "ConditionFunction[_CP, _H]") -> "ConditionFunction[_CP, _CH | _H]":
573 """Combine with another function using :py:meth:`OR`."""
574 return self.OR(self, other)
575
[docs]
576 def __invert__(self) -> "ConditionFunction[_CP, bool]":
577 """
578 Invert the return value of the function.
579 """
580 return self.NOT(self)
581
582
583class ActionFunction(ConditionFunction[_P, _T]):
584 """
585 A decorator that can be used with :any:`Rule.action`. It is nearly equivalent to :any:`ConditionFunction`,
586 only calling the function is more simple, i.e. does not assert a Hashable return type.
587
588 .. deprecated::
589 Will likely be removed as no strong use case.
590
591 :meta private:
592 """
593
594 def __init__(
595 self,
596 action_function: CallableAction[RuleT, _P, _T],
597 name: str = "ActionFunction",
598 *,
599 use_self: Optional[bool] = None,
600 ):
601 super().__init__(action_function, name, use_self=use_self)
602
603 @classmethod
604 def NOT(cls, _: Never) -> "NoReturn": # type: ignore
605 """
606 Raises:
607 NotImplementedError: NOT is not implemented for ActionFunction.
608 """
609 raise NotImplementedError("NOT is not implemented for ActionFunction")
610
611 def __call__(self, ctx: "Rule | Context", *args: _P.args, **kwargs: _P.kwargs) -> _T: # type: ignore
612 return self.evaluation_function(ctx, *args, **kwargs) # type: ignore[arg-type]