1# pyright: strict
2
3# pyright: reportUnnecessaryIsInstance=false, reportUnnecessaryComparison=false
4
5
6from __future__ import annotations
7
8import inspect
9
10import random
11from collections.abc import Mapping
12from dataclasses import is_dataclass
13from functools import partial, update_wrapper, wraps
14from inspect import isclass
15from itertools import accumulate
16from typing import (
17 TYPE_CHECKING,
18 Any,
19 Callable,
20 ClassVar,
21 Container,
22 Dict,
23 Hashable,
24 Iterable,
25 List,
26 NoReturn,
27 Optional,
28 Set,
29 TypeVar,
30 Union,
31 cast,
32)
33from weakref import CallableProxyType, WeakSet, proxy
34
35import pygame
36from omegaconf import DictConfig, OmegaConf
37from typing_extensions import (
38 Annotated,
39 Concatenate,
40 Literal,
41 NotRequired,
42 ParamSpec,
43 Required,
44 Self,
45 TypedDict,
46 Unpack,
47 overload,
48)
49
50from agents.tools.logs import logger
51from classes.constants import READTHEDOCS, Hazard, HazardSeverity, Phase, RulePriority, RuleResult
52from classes.evaluation_function import ConditionFunction
53from classes.exceptions import DoNotEvaluateChildRules, UnblockRuleException
54from classes.worldmodel import GameFramework
55from classes.information_manager import InformationManager
56from launch_tools import CarlaDataProvider, singledispatchmethod
57
58if TYPE_CHECKING:
59 import carla
60 from classes.type_protocols import CallableAction, CallableActionT, ConditionFunctionLike, ConditionFunctionLikeT
61 from classes.detection_matrix import DetectionMatrix
62 from agents.lunatic_agent import LunaticAgent
63 from agents.tools.config_creation import ContextSettings, LiveInfo, RuleConfig
64 # NOTE: gameframework.py adds GameFramework to this module's variables
65 # at this position it would be a circular import
66
67_T = TypeVar("_T")
68_P = ParamSpec("_P")
69_Rule = TypeVar("_Rule", bound="Rule")
70
71
[docs]
72class Context(CarlaDataProvider):
73 """
74 Object to be passed as the first argument (instead of self) to rules, actions and evaluation functions.
75
76 The :py:class:`Context` class derives from the scenario runner's :py:class:`CarlaDataProvider` to allow access to the world, map, etc.
77
78 Tip:
79 There is normally no need to initialize the context object manually.
80 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`.
81 """
82
83 agent: "LunaticAgent"
84 """Backreference to the agent."""
85
86 config: "ContextSettings"
87 """A copy of the agents config. Overwritten by the condition's settings."""
88
89 evaluation_results: dict[Phase, Hashable] # ambiguous wording, which result? here evaluation result
90 """
91 Stores the result from the :py:meth:`Rule.condition` of the last rule that was evaluated in a phase.
92
93 .. deprecated:: in consideration
94 """
95
96 action_results: dict[Phase, Any]
97 """
98 Stores the result from the :py:meth:`action` of the last rule that was applicable in a phase.
99
100 .. deprecated:: in consideration
101 """
102
103 _control: Optional[carla.VehicleControl]
104 """
105 Current control the agent should use.
106
107 Accessible from the :py:attr:`control` property.
108 """
109
110 prior_result: Optional[Any] # TODO: maybe rename
111 """Result of the current phase."""
112
113 phase_results: dict[Phase, Any]
114 """
115 Stores the results of the phases the agent has been in.
116 By default the keys are set to :py:attr:`Context.PHASE_NOT_EXECUTED`.
117 """
118
119 last_context: Optional["Context"]
120 """The context object of the last tick. Used to access the last phase's results."""
121
122 second_pass: Optional[bool] = None
123 """
124 Whether or not the run_step function performs a second pass, i.e.
125 after the route has been replanned.
126
127 Warning:
128 The correctness should *not* be assumed.
129 The user is responsible for setting this value to True if a second pass is required.
130 """
131
132 _detected_hazards: Set[Hazard]
133 """
134 Detected hazards in the current phase.
135
136 If not empty at the end of the inner step an EmergencyStopException is raised.
137 """
138
139 detected_hazards_info: dict[Hazard, Union[HazardSeverity, Any]]
140 """
141 Information about the detected hazards. :py:meth:`add_hazard` inserts the given :py:class:`.HazardSeverity` as value,
142 however, note that the values are can be arbitrary if used otherwise.
143 """
144
145 PHASE_NOT_EXECUTED: object = object()
146 """
147 Value in :py:attr:`phase_results` to indicate that :python:`agent.execute_phase(phase)` was called for the respective phase
148
149 :meta hide-value:
150 """
151
152 def __init__(self, agent: "LunaticAgent", **kwargs: Any):
153 """
154 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`.
155 """
156 self.agent = agent
157 self._control = kwargs.pop("control", None)
158 self._init_arguments = kwargs
159 self.phase_results = dict.fromkeys(Phase.get_phases(), Context.PHASE_NOT_EXECUTED)
160
161 self.config: "ContextSettings" = agent.config.copy() # type: ignore
162 self.config._content["live_info"] = agent.live_info # not a copy! # pyright: ignore
163
164 self.detected_hazards = set()
165 self.detected_hazards_info = dict.fromkeys(Hazard, HazardSeverity.NONE)
166 self.__dict__.update(kwargs)
167
168 # Less used attributes
169 self.evaluation_results = {}
170 self.action_results = {}
171
172 @property
173 def current_phase(self) -> Phase:
174 """Current :py:class:`Phase` the agent is in."""
175 return self.agent.current_phase
176
177 @property
178 def control(self) -> Union[carla.VehicleControl, None]:
179 """
180 Current control the agent should use. Set by :py:meth:`execute_phase(update_controls=...) <.LunaticAgent.execute_phase>`.
181
182 Note:
183 Safeguarded to be not set to None. Setting it to :code:`None` is discouraged.
184 Use :py:meth:`set_control` if setting it to :code:`None` is really needed.
185 """
186 return self._control
187
188 @control.setter
189 def control(self, control: carla.VehicleControl):
190 if control is None: # type: ignore[comparison]
191 raise ValueError("Context.control must not be None. To set it to None explicitly use set_control.")
192 self._control = control
193
[docs]
194 def set_control(self, control: Optional[carla.VehicleControl]):
195 """Set the control, allows to set it to None."""
196 self._control = control
197
[docs]
198 def get_or_calculate_control(self) -> carla.VehicleControl:
199 """
200 Get the control if it is set, otherwise calculate it by
201 executing the local planner.
202
203 Returns:
204 The control the agent should use.
205
206 Note:
207 Use this function inside rules to acquire a control object-
208
209 Warning:
210 This is equivalent to ending the inner step of the agent.
211
212 See Also:
213 :py:meth:`LunaticAgent._calculate_control`
214 """
215 if self.control:
216 return self.control
217 self.control = self.agent._calculate_control() # pyright: ignore[reportPrivateUsage]
218 return self.control # pyright: ignore[reportReturnType]
219
220 @property
221 def detected_hazards(self) -> Set[Hazard]:
222 """
223 Detected hazards in the current phase.
224
225 If not empty at the end of the inner step an EmergencyStopException is raised.
226 """
227 return self._detected_hazards
228
229 @detected_hazards.setter
230 def detected_hazards(self, hazards: Set[Hazard]):
231 if not isinstance(hazards, set): # type: ignore
232 raise TypeError("detected_hazards must be a set of Hazards.")
233 self._detected_hazards = hazards
234
[docs]
235 def add_hazard(self, hazard: Hazard, hazard_level: HazardSeverity = HazardSeverity.EMERGENCY):
236 """
237 Add the specified hazard to the detected hazards, in parallel the `hazard_level` can be set which
238 which is stored in :any:`detected_hazards_info`.
239 """
240 if hazard not in Hazard:
241 logger.warning(f"Adding {hazard} to the detected hazards which is not a member of the Hazard enum.")
242 self.detected_hazards.add(hazard)
243 self.detected_hazards_info[hazard] = hazard_level
244
[docs]
245 def discard_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "subset"):
246 """
247 Discards a hazard from the detected hazards.
248
249 Parameters:
250 hazard: Hazard to remove from :py:attr:`detected_hazards`.
251 match: How to match the hazard to remove.
252
253 - "exact" removes if the exact :py:class:`~classes.constants.Hazard` flag is present.
254 - "subset" removes if the hazard is a subset of the detected hazard, e.g.:
255
256 - :python:`discard_hazard(Hazard.VEHICLE, match="subset")` would remove
257 :any:`Hazard.OBSTACLE` = ``Hazard.VEHICLE | PEDESTRIAN | STATIC_OBSTACLE``.
258
259 - :python:`discard_hazard(Hazard.TRAFFIC_LIGHT, match="subset")` would *not* remove
260 :any:`Hazard.TRAFFIC_LIGHT_RED`.
261
262 """
263 if match == "subset":
264 self.detected_hazards = {h for h in self.detected_hazards if hazard not in h} # supports flags
265 elif match == "exact":
266 self.detected_hazards.discard(hazard)
267 elif match == "intersection":
268 self.detected_hazards = {h for h in self.detected_hazards if not hazard & h}
269 else:
270 msg = f"match must be 'exact', 'subset' or 'intersection', not {match}."
271 raise ValueError(msg)
272
[docs]
273 def has_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "intersection") -> bool:
274 """
275 Checks if the hazard intersects with any of the detected hazards.
276
277 See :py:meth:`discard_hazard` for the different matching options.
278 """
279 if match == "exact":
280 return hazard in self.detected_hazards
281 if match == "subset":
282 return any(hazard in h for h in self.detected_hazards)
283 if match == "intersection":
284 return any(hazard & h for h in self.detected_hazards)
285 msg = f"match must be 'exact', 'subset' or 'intersection', not {match}."
286 raise ValueError(msg)
287
288 # Convenience function when using detect_vehicles
289 from agents.tools.lunatic_agent_tools import max_detection_distance # noqa
290
291 @property
292 def live_info(self) -> "LiveInfo":
293 return self.config.live_info
294
295 @property
296 def active_blocking_rules(self) -> Set["BlockingRule"]:
297 return self.agent._active_blocking_rules # pyright: ignore[reportPrivateUsage]
298
299
300# @ConditionFunction["Rule", [], Literal[True]] # python 3.9+ syntax
[docs]
301@ConditionFunction
302def always_execute(ctx: Context) -> Literal[True]: # pylint: disable=unused-argument, # noqa: ARG001
303 """
304 This is an :py:class:`.ConditionFunction` that always returns :python:`True`. It can be used to always execute an action.
305 """
306 return True
307
308
309def _use_temporary_config(
310 func: Callable[Concatenate[_Rule, "Context", Optional[Dict[str, Any]], _P], _T],
311) -> Callable[Concatenate[_Rule, "Context", Optional[Dict[str, Any]], _P], _T]:
312 """
313 During the condition evaluation the ctx.config should have the overwrite settings applied
314 but not in a permanent way.
315 """
316 # TODO: To avoid unused argument error, consume the dict; however this might be harder to understand
317
318 @wraps(func)
319 def wrapper(
320 self: _Rule, ctx: Context, overwrite: Optional[Dict[str, Any]] = None, *args: _P.args, **kwargs: _P.kwargs
321 ) -> _T:
322 settings = self.overwrite_settings.copy() # Dict with "self" : SelfConfig
323 if overwrite:
324 settings.update(overwrite)
325
326 original_ctx_config = ctx.config
327 ctx.config["self"] = "???" # do not merge old self_config
328
329 temp: "ContextSettings" = OmegaConf.merge(ctx.config, settings) # type: ignore
330
331 OmegaConf.set_readonly(temp, True)
332 ctx.config = temp
333 self.self_config = self.overwrite_settings["self"] = ctx.config["self"]
334 OmegaConf.set_readonly(
335 self.self_config, False
336 ) # The Rule's settings should still be dynamic; expected in overwrite
337 try:
338 return func(self, ctx, overwrite, *args, **kwargs)
339 finally:
340 ctx.config = original_ctx_config
341
342 return wrapper
343
344
345class _CountdownRule:
346 # TODO: low prio: make cooldown dependant of tickrate or add a conversion from seconds to ticks OR make time-based
347 tickrate: ClassVar[int] = NotImplemented
348 """
349 :meta private:
350 """
351
352 DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0
353 """
354 Value the cooldown is reset to when :py:meth:`reset_cooldown` is called without a value.
355
356 Used *only* when :py:attr:`cooldown_reset_value` is not set.
357 """
358
359 start_cooldown: ClassVar[int] = 0
360 """Initial :py:attr:`cooldown` when initialized. if >0 the rule will not be ready for the first **start_cooldown** ticks."""
361
362 _instances: ClassVar["WeakSet[_CountdownRule]"] = WeakSet()
363 """Keep track of all Rule instances for the cooldowns"""
364
365 _cooldown: int
366 """If 0 the rule is ready to be executed."""
367
368 blocked: bool = False # NOTE: not a property
369 """Indicates if the rule is blocked for this tick only. Is reset to False after the tick."""
370
371 if TYPE_CHECKING:
372
373 class _InitParameters(TypedDict):
374 """Dict describing the parameters of the __init__ of the class method."""
375
376 cooldown_reset_value: NotRequired[Optional[int]]
377 enabled: NotRequired[bool]
378
379 def __init__(self, cooldown_reset_value: Optional[int] = None, enabled: bool = True):
380 self._instances.add(self)
381 self._cooldown = self.start_cooldown
382 self.max_cooldown = cooldown_reset_value if cooldown_reset_value is not None else self.DEFAULT_COOLDOWN_RESET
383 self._enabled = enabled
384
385 def is_ready(self) -> bool:
386 """Group aware check if a rule is ready."""
387 return (
388 self.cooldown == 0 and self.enabled and not self.blocked
389 ) # Note: uses property getters. Group aware for GroupRules
390
391 def reset_cooldown(self, value: Optional[int] = None):
392 if value is None:
393 self._cooldown = self.max_cooldown
394 elif value >= 0:
395 self._cooldown = int(value)
396 else:
397 raise ValueError("Cooldown value must be a None or a non-negative integer.")
398
399 @property
400 def cooldown(self) -> int:
401 """
402 Cooldown of the rule in ticks until it can be executed again. Only if 0 the rule can be executed.
403 """
404 return self._cooldown
405
406 @cooldown.setter
407 def cooldown(self, value: int):
408 self._cooldown = value
409
410 def update_cooldown(self):
411 """
412 Update the :py:attr:`cooldown` of *this* rule.
413
414 :meta private:
415 """
416 if self._cooldown > 0:
417 self._cooldown -= 1
418
419 @classmethod
420 def update_all_cooldowns(cls):
421 """Updates the cooldowns of **all** rules."""
422 for instance in cls._instances:
423 instance.update_cooldown()
424
425 @classmethod
426 def unblock_all_rules(cls):
427 """Unblocks all rules"""
428 for instance in cls._instances:
429 instance.blocked = False
430
431 @property
432 def enabled(self) -> bool:
433 """If :code:`False` the rule will not be evaluated. Contrary to :py:attr:`blocked` this permanently disables the rule."""
434 return self._enabled
435
436 @enabled.setter
437 def enabled(self, value: bool):
438 self._enabled = value
439
440 def set_active(self, value: bool):
441 """Enables or disables the rule. Contrary to :py:attr:`blocked` it will not be reset after the tick."""
442 self._enabled = value
443
444 class CooldownFramework:
445 """
446 Context manager that can reduce all cooldowns at the end of a `with` statement.
447 """
448
449 def __enter__(self):
450 return self
451
452 def __exit__(self, exc_type, exc_value, traceback): # type: ignore
453 self.tick()
454
455 @staticmethod
456 def tick():
457 """
458 Update all cooldowns and unblock all rules.
459
460 Calls:
461 - :py:meth:`Rule.update_all_cooldowns`
462 - :py:meth:`Rule.unblock_all_rules`
463 """
464 Rule.update_all_cooldowns()
465 Rule.unblock_all_rules()
466
467
468_GroupInstanceValues = TypedDict(
469 "_GroupInstanceValues", {"cooldown": int, "max_cooldown": int, "instances": "WeakSet[_GroupRule]"}
470)
471"""TypeHint that describes the values of `_GroupRule._group_instances`."""
472
473
474class _GroupRule(_CountdownRule):
475 group: Optional[str] = None # group of rules that share a cooldown
476 """
477 Group name of rules that should share their cooldown.
478
479 None for a rule to not share its cooldown.
480 """
481
482 # first two values in the list are current and max cooldown, the third is a set of all instances
483 _group_instances: ClassVar[Dict[str, _GroupInstanceValues]] = {}
484 """
485 Dictionary of all group instances.
486 Keys:
487 The group name.
488 Values:
489 Is a list of the *current cooldown*, the *max cooldown for reset* and a *WeakSet of all instances*.
490 """
491
492 if TYPE_CHECKING:
493
494 class _InitParameters(_CountdownRule._InitParameters): # pyright: ignore[reportPrivateUsage]
495 """Dict describing the parameters of the __init__ of the class method."""
496
497 group: NotRequired[Optional[str]]
498
499 def __init__(self, group: Optional[str] = None, cooldown_reset_value: Optional[int] = None, enabled: bool = True):
500 super().__init__(cooldown_reset_value, enabled)
501 self.group = group
502 if group is None:
503 return
504 if group not in self._group_instances:
505 self._group_instances[group] = {"cooldown": 0, "max_cooldown": 0, "instances": WeakSet()}
506 self._group_instances[group]["instances"].add(self) # add to weak set
507
508 @property
509 def cooldown(self) -> int:
510 """
511 Cooldown of the rule in ticks until it can be executed again after its action was executed.
512 If 0 the rule is ready to be executed.
513 """
514 if self.group:
515 return _GroupRule._group_instances[self.group]["cooldown"]
516 return super().cooldown
517
518 @property
519 def has_group(self) -> bool:
520 """Indicates if the rule belongs to a group, i.e. :python:`self.group is not None`."""
521 return self.group is not None
522
523 @cooldown.setter
524 def cooldown(self, value: int):
525 if self.group:
526 _GroupRule._group_instances[self.group]["cooldown"] = value
527 return
528 super().cooldown = value
529
530 def set_my_group_cooldown(self, value: Optional[int] = None):
531 """Update the cooldown of the group this rule belong to"""
532 if self.group is None:
533 logger.warning("Rule does not belong to a group, but set_my_group_cooldown was called. Ignoring.")
534 return
535 if value is None:
536 self._group_instances[self.group]["cooldown"] = self._group_instances[self.group][
537 "max_cooldown"
538 ] # set to max
539 else:
540 self._group_instances[self.group]["cooldown"] = value
541
542 def reset_cooldown(self, value: Optional[int] = None):
543 """Reset or set the cooldown; for a group rule it resets the group cooldown."""
544 if self.group:
545 self.set_my_group_cooldown(value)
546 else:
547 super().reset_cooldown()
548
549 @classmethod
550 def set_cooldown_of_group(cls, group: str, value: int):
551 """Updates the cooldown of the specified group to a specific value"""
552 if group in cls._group_instances:
553 cls._group_instances[group]["cooldown"] = value
554 else:
555 msg = f"Group {group} does not exist."
556 raise ValueError(msg)
557
558 __filter_not_ready_instances: Callable[["_CountdownRule"], bool] = (
559 lambda instance: instance.group is None and instance._cooldown > 0
560 ) # pylint: disable=line-too-long # type: ignore[attr-defined]
561 """
562 Filter function to get all instances that are not ready. see `update_all_cooldowns`
563 Group rules should not be affected by this filter.
564 """
565
566 @classmethod
567 def update_all_cooldowns(cls):
568 """Globally updates the cooldown of *all* rules."""
569 # Update Groups
570 for instance_data in cls._group_instances.values():
571 if instance_data["cooldown"] > 0:
572 instance_data["cooldown"] -= 1
573 for instance in filter(cls.__filter_not_ready_instances, cls._instances):
574 instance._cooldown -= 1
575
576
[docs]
577class Rule(_GroupRule):
578 _auto_init_: ClassVar[bool] = True
579 """
580 If set to False the automatic :code:`__init__` creation is disabled when subclassing.
581 This automatic :code:`__init__` will fix parameters like :any:`phases` and :any:`condition` to the class.
582
583 Declaring an :code:`__init__` method in the class has the same effect as setting :code:`_auto_init_` to False.
584
585 Note:
586 Using :python:`class NewRuleType(metarule=Rule)` is nearly equivalent to :python:`_auto_init_=False`, but is not inherited.
587 """
588
589 NOT_APPLICABLE: ClassVar[Literal[RuleResult.NOT_APPLICABLE]] = RuleResult.NOT_APPLICABLE
590 """
591 Unique object :py:attr:`.RuleResult.NOT_APPLICABLE` that indicates that no action was executed.
592
593 :meta hide-value:
594
595 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly
596 """
597
598 NO_RESULT: ClassVar[Literal[RuleResult.NO_RESULT]] = RuleResult.NO_RESULT
599 """
600 Unique object :py:attr:`.RuleResult.NO_RESULT` that indicates that the rules :py:meth:`action` did not return a result,
601 e.g. because an exception was raised.
602
603 :meta hide-value:
604
605 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly
606 """
607
608 _PROPERTY_MEMBERS: ClassVar[Set[str]] = {"cooldown", "has_group", "enabled"}
609 """
610 A subclass can only overwrite these attributes with properties. This prevents a user accidentally overwriting the property,
611 e.g. `cooldown = 20` with a method or variable.
612
613 TODO:
614 Consider making enabled a variable and not a property.
615 """
616
617 description: str
618 """Description of what this rule should do"""
619
620 phases: frozenset[Phase]
621 """
622 The phase or phases in which the rule should be evaluated.
623 For instantiation the phases attribute can be any :term:`Iterable` [:py:class:`.Phase`].
624 """
625
626 phase: Phase
627 """For the Class API the phase attribute be set to a single Phase object."""
628
629 if TYPE_CHECKING:
630 condition: ConditionFunctionLike[Self, [], Hashable]
631 """
632 The condition that determines if the rule's actions should be executed.
633
634 Simple Variant:
635 return True if the action should be executed, False otherwise.
636 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`.
637
638 Advanced Variant:
639 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict.
640
641 NOTE:
642 The readthedocs description is set in the __init__ function.
643 """
644
645 actions: dict[Any, CallableAction[Self, [], Any]]
646 """Dictionary that maps rule results to the action that should be executed."""
647
648 action: Annotated[CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"]
649 """
650 Action that should be executed if the rule is True. If `actions` is set, this is ignored.
651 """
652
653 false_action: Annotated[
654 CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"
655 ]
656 """Action that should be executed if the rule is False. May not be set if `actions` is set."""
657
658 if READTHEDOCS and not TYPE_CHECKING:
659 false_action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"]
660 action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"]
661 actions: dict[Any, CallableActionT]
662
663 # group : Optional[str]
664 # """Group name for rules that should share their cooldown."""
665
666 overwrite_settings: dict[str, Any]
667 """
668 Settings that should overwrite the agent's settings for this rule.
669
670 Note:
671 The overwrite settings are primitive :py:class:`dict` objects,
672 :py:class:`omegaconf.DictConfig` objects are converted.
673 """
674
675 self_config: "RuleConfig"
676 """
677 A custom sub-config for the rule that is not included in the agents settings.
678 Automatically gets a `instance` key added with the rule instance.
679
680 Can be accessed via `ctx.config.current_rule` or `self.config.self`.
681
682 Note:
683 Internally `self.config` and `ctx.config` is the same object, which makes
684 interpolations to the agent's settings possible.
685
686 Attention:
687 The **self_config object is *not* constant** it is recreated each time the
688 rule is evaluated to have the current context available.
689 """
690
691 priority: Union[float, int, RulePriority] = RulePriority.NORMAL
692 """Rules are executed in order of their priority, from high to low."""
693
694 # Initialization functions
695
696 _ctx: Optional[Context] = None
697 """No hard attachment, to not keep the context objects alive, use with care. Check where it is set in a rule."""
698
[docs]
699 def clone(self):
700 """
701 Create a new instance of the rule with the same settings.
702
703 Note:
704 - The current cooldown **is not** taken into account.
705 - The current enabled state **is** taken into account.
706 """
707 return self.__class__(self) # Make use over overloaded __init__
708
709 @overload
710 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None) -> "Self":
711 ...
712 # No argument call; should be redundant
713
714 @overload
715 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None, **kwargs: Unpack[_InitParameters]) -> "Self":
716 ...
717 # argument call
718
719 @overload
720 def __new__(cls, **kwargs: Unpack[_InitParametersComplete]) -> "Self":
721 ...
722 # Normal init
723
724 @overload
725 def __new__(cls, phases: "type[Any] | Self") -> "Self":
726 ...
727 # Rule.copy(other_rule)
728
729 def __new__(
730 cls,
731 phases: Optional[Union[Phase, Iterable[Phase], "type[Any]", Self, str, None]] = None,
732 bases: Optional[tuple[type[Any], ...]] = None,
733 clsdict: Optional[dict[str, Any]] = None,
734 **kwargs: Any,
735 ):
736 """
737 The @Rule decorator allows to instantiate a Rule class directly for easier out-of-the-box usage.
738 Further check for metaclass initialization, else this is a normal instance creation.
739
740 Parameters:
741 kwargs: These will be passed to :python:`__init_subclass__`.
742 """
743 # @Rule
744 # class NewRuleInstance:
745 # or Rule(other_rule) -> copy
746 if isclass(phases):
747 if issubclass(phases, _CountdownRule):
748 raise ValueError(
749 "When using @Rule the class may not be a subclass of Rule. Do not inherit from rule or subclass from Rule instead with the decorator."
750 "Do not do:\n\t@Rule\n\tclass MyRule(>>Rule<<): ...\n"
751 )
752 # cls is the Rule used to decorate the decorated_class
753 decorated_class = phases
754
755 # Create the new class, # NOTE: goes to __init_subclass_
756 new_decorated_class = type(
757 decorated_class.__name__, (cls,), decorated_class.__dict__.copy(), _init_by_decorator=True, **kwargs
758 ) # > calls init_subclass; copy() for correct type!
759 if TYPE_CHECKING:
760 assert issubclass(new_decorated_class, cls) and issubclass(new_decorated_class, decorated_class) # noqa: PT018
761
762 return super().__new__(new_decorated_class)
763
764 # class NewRuleType(metaclass=Rule) # deprecated
765 if isinstance(phases, str):
766 try:
767 logger.warning("Using NewRule(metaclass=Rule) is deprecated. Use NewRule(Rule, metarule=True) instead.")
768 assert clsdict
769 assert isinstance(bases, tuple)
770 class_name = phases
771 new_rule_class = type(class_name, (cls, *bases), clsdict, metarule=True, **kwargs)
772 except Exception:
773 print(
774 "ERROR: If you want to initialize a rule be sure that you pass a Phase object as the first argument."
775 "A string assumes that you've used class NewSubclass(metaclass=Rule) "
776 "This is DEPRECATED use class NewSubclass(Rule, metarule=True) instead."
777 )
778 raise
779 else:
780 assert issubclass(new_rule_class, cls) # for type-hints
781 return new_rule_class
782 # Normal instance
783 return super().__new__(cls)
784
785 if TYPE_CHECKING:
786
787 class _InitParameters(_GroupRule._InitParameters, total=False, closed=False): # pyright: ignore[reportPrivateUsage]
788 """Dict describing the parameters of the __init__ of the class method."""
789
790 # phases: Union[Phase, Iterable[Phase]] # leave this and use explicitly later on
791 # condition: Optional[ConditionFunctionLike[Rule, ..., Hashable]]
792 action: Optional[Union[CallableAction[Rule, [], Any], Dict[Any, CallableAction[Rule, [], Any]]]]
793 false_action: Optional[CallableAction[Rule, [], Any]]
794 actions: Optional[Dict[Any, CallableAction[Rule, [], Any]]]
795 description: str
796 overwrite_settings: Optional[Dict[str, Any]]
797 self_config: Optional[Dict[str, Any]]
798 priority: RulePriority
799
800 class _InitParametersComplete(_InitParameters):
801 """Dict describing the parameters of the __init__ of the class method."""
802
803 phases: Required[Union[Phase, Iterable[Phase]]]
804 condition: NotRequired[Optional[ConditionFunctionLike[Rule, ..., Hashable]]]
805
806 class _CallKwargs(TypedDict, closed=True):
807 ignore_phase: bool
808 """Default False"""
809 ignore_cooldown: bool
810 """Default False"""
811
812 @singledispatchmethod
813 def __init__(
814 self,
815 phases: Union[Phase, Iterable[Phase]], # iterable of Phases
816 # /, # phases must be positional; python3.8+ only
817 condition: Optional[ConditionFunctionLikeT] = None,
818 action: Optional[Union[CallableAction[Self, []], Dict[Any, CallableAction[Self, []]]]] = None,
819 false_action: Optional[CallableAction[Self, []]] = None,
820 *,
821 actions: Optional[Dict[Any, CallableAction[Self, []]]] = None,
822 description: str = "What does this rule do?",
823 overwrite_settings: Optional[Dict[str, Any]] = None,
824 self_config: Optional[Dict[str, Any]] = None,
825 priority: RulePriority = RulePriority.NORMAL,
826 cooldown_reset_value: Optional[int] = None,
827 group: Optional[str] = None,
828 enabled: bool = True,
829 ):
830 """
831 Initializes a Rule object.
832
833 Parameters:
834 phases: The phase(s) when the rule should be evaluated.
835 An iterable of Phase objects or a single Phase object.
836 condition: A function that takes a Context object as input and returns a Hashable value.
837 If not provided, the class must implement a `condition` function.
838 action: A function or a dictionary of functions that take a Context object as input.
839 If `action` behaves like `actions`.
840 Only one of `action` and `actions` can be set.
841 false_action: A function that takes a Context object as input and returns any value.
842 It is used when `action` is a single function and represents the action to be taken when the condition is False.
843 actions: A dictionary of `action` functions
844 It should map the return values of `condition` to the corresponding action function.
845 If `action` is None, `actions` must be provided.
846 description: A string that describes what this rule does.
847 overwrite_settings: A dictionary of settings that will overwrite the
848 agent's setting for this Rule.
849 priority: The priority of the rule. It can be a float, an integer, or a RulePriority enum value.
850 cooldown_reset_value: An optional integer value that represents the cooldown reset value for the rule.
851 If not provided falls back to the class attribute :py:attr:`.Rule.DEFAULT_COOLDOWN_RESET`.
852 group: An optional string that specifies the group to which this rule belongs.
853 enabled: A boolean value indicating whether the rule is enabled or not.
854
855 Raises:
856 ValueError: If ``phases`` is empty or None, or if ``phases`` contains an object that is not of type Phase.
857 TypeError: If ``condition`` is None and the class does not implement a :py:meth:`condition` function,
858 or if both ``action`` and ``actions`` are None and the class does not have an :py:attr:`actions` attribute or an :py:meth:`action` function.
859 TypeError: if ``actions`` is not a Mapping object.
860 ValueError: If both ``action`` and ``actions`` are provided.
861 ValueError: If ``action`` is a Mapping and either ``false_action`` or ``actions`` is not None.
862 ValueError: If an ``action`` function is not callable.
863 ValueError: If ``description`` is not a string.
864 """
865
866 # Check phases
867 if not phases:
868 raise ValueError("phases must not be empty or None")
869 if not isinstance(phases, frozenset):
870 phases = frozenset(phases) if isinstance(phases, Iterable) else frozenset([phases])
871 for p in phases:
872 if not isinstance(p, Phase):
873 msg = f"phase must be of type Phases, not {type(p)}"
874 raise TypeError(msg)
875 self.phases = phases
876
877 # Check Rule
878 if condition is None and not hasattr(self, "condition"):
879 msg = (
880 f"{self.__class__.__name__}.__init__() missing 1 required positional argument: 'condition'. "
881 "Alternatively the class must implement a `condition` function."
882 )
883 raise TypeError(msg)
884
885 if False and TYPE_CHECKING:
886 # Idea have type hint here
887 self.condition: Any = ...
888 """
889 The condition that determines if the rule's actions should be executed.
890
891 Simple Variant:
892 return True if the action should be executed, False otherwise.
893 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`.
894
895 Advanced Variant:
896 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict.
897 """
898 if condition is not None:
899 self.condition = condition
900 else:
901 self.condition = self.__class__.condition
902
903 if condition is not None:
904 if not hasattr(self, "condition"):
905 self.condition = condition
906 else:
907 # Warn if cls.condition and passes condition are different
908 self_func = getattr(self.condition, "__func__", getattr(self.condition, "func", self.condition))
909 if self_func != condition: # Compare method with function
910 logger.warning(
911 f"Warning 'condition' argument passed but class {self.__class__.__name__} "
912 "already implements a different function 'self.condition'. "
913 f"Overwriting {self.condition} with passed condition {getattr(condition, '__name__', str(condition))}. "
914 "This might lead to undesired results."
915 )
916
917 # NOTE: IMPORTANT: self.condition = condition overwrites methods with functions
918 # To keep methods as methods the condition parameter is removed with
919 # `do_not_overwrite.append("condition")` used during __init_subclass__
920 # Could move this check also to here, however, it will then not be checked during class creation
921 self.condition = condition
922 # else: self.__class__.condition must be true already, which was checked in the subclass initialization
923
924 # Check Actions
925 if action is not None and actions is not None:
926 try:
927 if len(actions) == 1 and action is actions[True]:
928 logger.info(
929 "`action` and `actions` have been both been used when initializing %s. "
930 "Did you use `condition.register_action`? Then you can omit the action parameter.",
931 self,
932 )
933 action = None
934 else:
935 raise ValueError("Only one of 'action' and 'actions' can be set.")
936 except (KeyError, ValueError) as e:
937 raise ValueError(
938 "Either only one of 'action' and 'actions' can be set, or actions[True] must be the same as action "
939 "- other actions are currently not supported."
940 ) from e
941 if action is None and actions is None and not hasattr(self, "actions"):
942 # NOTE: the k in params check below is essential for this to work correctly.
943 msg = (
944 f"{self.__class__.__name__}.__init__() arguments `action` and `actions` are both None. "
945 "Provide at least one argument alternatively the class must have an `actions` "
946 "attribute or an `action` function."
947 )
948 raise TypeError(msg)
949
950 if action is None:
951 if not isinstance(actions, Mapping):
952 msg = f"actions must be a Mapping, not {type(actions)}"
953 raise TypeError(msg)
954
955 self.actions = actions
956 elif isinstance(action, Mapping):
957 self.actions = action
958 if false_action is not None or actions is not None:
959 raise ValueError("When passing a dict to action, false_action and actions must be None")
960 else:
961 # NOTE: Might overwrite actions attribute
962 self.actions = {} # type: ignore
963 if action is not None:
964 self.actions[True] = action
965 if false_action is not None:
966 self.actions[False] = false_action
967 # actions can be the dict of the class, we do not want the same dict instance
968 self.actions = dict(self.actions)
969
970 # Assure that method(self, ctx) like functions are accessible like them
971 for key, func in self.actions.items():
972 if not callable(func):
973 msg = f"Action for key {key} must be callable, not {type(func)}"
974 raise TypeError(msg)
975 multiple_parameters = len(inspect.signature(func).parameters) >= 2
976 if multiple_parameters and getattr(func, "use_self", True):
977 # NOTE: could use types.MethodType
978 self.actions[key] = partial(func, self) # bind to self
979
980 # Check Description
981 if not isinstance(description, str):
982 msg = f"description must be of type str, not {type(description)}"
983 raise TypeError(msg)
984 self.description = description
985 super().__init__(group or self.group, cooldown_reset_value, enabled) # or self.group for subclassing
986 self.priority: float | int | RulePriority = priority # used by agent.add_rule
987
988 self.overwrite_settings = overwrite_settings or {}
989 if not isinstance(self.overwrite_settings, dict):
990 # NOTE: If DictConfig only the outermost will be a dict,
991 # i.e. this could be dict[str, DictConfig]
992 self.overwrite_settings = dict(self.overwrite_settings)
993 if self_config and "self" in self.overwrite_settings and self.overwrite_settings["self"] != self_config:
994 logger.debug("Warning: self_config and self.overwrite_settings['self'] must be the same object.")
995
996 default_self_config: "RuleConfig" = cast(
997 "RuleConfig", getattr(self, "self_config", getattr(self, "SelfConfig", {}))
998 )
999 if isclass(default_self_config):
1000 if not is_dataclass(default_self_config):
1001 logger.warning(
1002 f"Class {self.__class__.__name__} has a self_config class that is not a dataclass. "
1003 "This might lead to undesired results, i.e. missing keys in the config."
1004 )
1005 default_self_config = default_self_config()
1006 if not isinstance(default_self_config, DictConfig):
1007 default_self_config = cast(
1008 "RuleConfig", OmegaConf.create(default_self_config, flags={"allow_objects": True})
1009 ) # type: ignore
1010 if self_config:
1011 self.self_config = cast("RuleConfig", OmegaConf.merge(default_self_config, self_config))
1012 else:
1013 self.self_config = default_self_config
1014 assert self.self_config._get_flag("allow_objects"), "self_config must allow objects to be used as values." # pyright: ignore[reportPrivateUsage]
1015
1016 self.overwrite_settings["self"] = self.self_config
1017 self.overwrite_settings["self"]["instance"] = self
1018
1019 # Called on subclass creation. Can be used for class API
1020 def __init_subclass__(cls, _init_by_decorator: bool = False, metarule: bool = False):
1021 """
1022 Automatically creates a :python:`__init__` function to allow for a simple to use
1023 class-interface to create rule classes.
1024
1025 By setting :python:`_auto_init_ = False` in the class definition, the automatic __init__
1026 creation is disabled. Similarly, this is also the case if :python:`metarule=Rule` is used
1027 for the class creation.
1028 """
1029 if hasattr(cls, "phases") and hasattr(cls, "phase") and cls.phases and cls.phase:
1030 raise ValueError(
1031 f"Both 'phases' and 'phase' are set in class {cls.__name__}. Use only one. %s, %s"
1032 % (cls.phases, cls.phase)
1033 )
1034
1035 for attr in cls._PROPERTY_MEMBERS:
1036 if hasattr(cls, attr) and not hasattr(getattr(cls, attr), "__get__"):
1037 msg = (
1038 f"Class {cls.__name__} has overwritten property {attr} with {getattr(cls, attr)}."
1039 " You may only overwrite the following attributes with properties: {cls._PROPERTY_MEMBERS}."
1040 "Did you mean `start_cooldown` or `cooldown_reset_value` instead of `cooldown`?"
1041 )
1042 raise ValueError(msg)
1043 if not cls._auto_init_ or metarule:
1044 return
1045
1046 custom_init = cls.__dict__.get("__init__", False)
1047
1048 # Members that are not overwritten and passed as default arguments (None) into the __init__
1049 do_not_overwrite = ["phases"]
1050
1051 if hasattr(cls, "condition"):
1052 # Check if the condition should be treated as a method or function
1053 rule_func: ConditionFunctionLike[Self, ..., Hashable]
1054 if isinstance(cls.condition, ConditionFunction):
1055 rule_func = cls.condition.evaluation_function # pyright: ignore[reportUnknownMemberType, reportAssignmentType]
1056 else:
1057 rule_func = cls.condition
1058 if isinstance(rule_func, staticmethod):
1059 rule_func = rule_func.__func__
1060
1061 # Decide method(self, ctx) vs. function(ctx)
1062 if hasattr(cls.condition, "use_self") and cls.condition.use_self is not None: # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportFunctionMemberAccess, reportAttributeAccessIssue]
1063 condition_as_method: bool = cls.condition.use_self # type: ignore[attr-defined]
1064 else:
1065 params = len(inspect.signature(rule_func).parameters)
1066 if params >= 2:
1067 if params > 2:
1068 logger.warning(
1069 f"Rule {getattr(cls.condition, '__name__', cls.condition)}" # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
1070 "has more than 2 parameters. "
1071 "Treating it as a method(self, ctx, **kwargs) with self argument!"
1072 "To avoid this message or to use it as a function use"
1073 "ConditionFunction(use_self=True|False) explicitly."
1074 )
1075 condition_as_method = True
1076 else:
1077 condition_as_method = False
1078 if condition_as_method:
1079 logger.debug(
1080 "Implementing %s as method(self, ctx) - "
1081 "If you need it as a function(ctx, *args) decorate it with "
1082 "@ConditionFunction(use_self=False).",
1083 getattr(cls.condition, "__name__", cls.condition),
1084 ) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
1085 do_not_overwrite.append("condition")
1086 else:
1087 # If the signature has only one parameter its a function; else its user decision.
1088 logger.debug(
1089 "Implementing %s as function(ctx) without a self argument "
1090 "- If you need it as a method(self, ctx, *args) decorate it with "
1091 "@ConditionFunction(use_self=True).",
1092 getattr(cls.condition, "__name__", cls.condition),
1093 ) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
1094
1095 # Actions provided by condition.actions, e.g. ConditionFunction.register_action
1096 if hasattr(cls.condition, "actions") and cls.condition.actions: # type: ignore[attr-defined]
1097 if hasattr(cls, "actions") and cls.actions:
1098 msg = (
1099 f"Class {cls.__name__} already has an 'actions' attribute. "
1100 "It will be overwritten by the 'actions' attribute of the condition."
1101 " This is the case if ConditionFunction.register_action has been used."
1102 )
1103 raise ValueError(msg)
1104 cls.actions = cls.condition.actions # type: ignore[attr-defined]
1105
1106 if not hasattr(cls, "description"):
1107 cls.description = cls.__doc__ or "No description provided."
1108
1109 if hasattr(cls, "self_config"):
1110 do_not_overwrite.append("self_config")
1111
1112 # Create a __init__ function that sets some of the parameters.
1113 # find overlapping parameters
1114 params = inspect.signature(cls.__init__).parameters
1115
1116 # TODO: to be lazy Rule arguments need to be added to params so that "k in params" works
1117
1118 def partial_init(self: Self, phases: Optional[Iterable[Phase]] = None, *args, **kwargs: Rule._InitParameters): # pyright: ignore[reportMissingParameterType, reportUnknownParameterType]
1119 # Need phases as first argument
1120 cls_phases = getattr(cls, "phases", None) # allow for both wordings
1121 cls_phase = getattr(cls, "phase", None)
1122 if _init_by_decorator:
1123 # Using @Rule phases as first argument is the class
1124 phases = cls_phases or cls_phase
1125 else:
1126 if phases is None: # NOTE: Could be Phase.NONE
1127 phases = cls_phases or cls_phase
1128 if phases is None:
1129 msg = f"`phases` or `phase` must be provided for class {cls.__name__}"
1130 raise ValueError(msg)
1131 # Removing condition to not overwrite it
1132 kwargs.update({k: v for k, v in cls.__dict__.items() if k in params and k not in do_not_overwrite}) # type: ignore
1133 try:
1134 if custom_init:
1135 custom_init(self, phases, *args, **kwargs)
1136 else:
1137 # note: could be single dispatch function
1138 # super will not be _GroupRule but at least Rule
1139 super(cls, self).__init__(phases, *args, **kwargs) # type: ignore[arg-type]
1140 except IndexError: # functools <= python3.10
1141 logger.error(
1142 "\nError in __init__ of %s. Possible reason: Check if the __init__ method has the correct signature. `phases` must be a positional argument.\n",
1143 cls.__name__,
1144 )
1145 raise
1146 except TypeError as e:
1147 # e.g. forgot a action, or rules attribute (MultiRule)
1148 if "missing" in str(e):
1149 logger.error(
1150 "Class %s has likely missing attributes that cannot be passed to init. Check if all required attributes are set in the class definition.",
1151 cls.__name__,
1152 )
1153 raise
1154
1155 update_wrapper(partial_init, cls.__init__) # pyright: ignore[reportUnknownArgumentType]
1156 cls.__init__ = partial_init # pyright: ignore[reportIncompatibleMethodOverride, reportAttributeAccessIssue]
1157
1158 @__init__.register(_CountdownRule)
1159 @__init__.register(type)
1160 def __init_by_decorating_class(self, cls: "Rule"): # pyright: ignore[reportUnusedFunction]
1161 """
1162 Initialize by passing a Rule or class object to the __init__ method.
1163
1164 This allows the usage of the @Rule decorator and easy copying.
1165 """
1166 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for spelling mistake
1167 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1168
1169 # TODO: Automate this via inspect.signature or TypedDict of the __init__ signature
1170 self.__init__(
1171 phases,
1172 cls.condition,
1173 getattr(cls, "action", None),
1174 getattr(cls, "false_action", None),
1175 actions=getattr(cls, "actions", None),
1176 description=cls.description,
1177 overwrite_settings=getattr(cls, "overwrite_settings", None),
1178 self_config=getattr(cls, "self_config", None),
1179 priority=getattr(cls, "priority", RulePriority.NORMAL),
1180 cooldown_reset_value=cooldown_reset_value,
1181 group=getattr(cls, "group", None),
1182 enabled=getattr(cls, "enabled", True),
1183 )
1184
1185 @__init__.register(Mapping)
1186 def __init_from_mapping(self, source: "_InitParametersComplete"): # pyright: ignore[reportUnusedFunction]
1187 # NOTE: This is weakly tested and not much supported.
1188 condition = source.get("condition", None)
1189 if condition is not None:
1190 condition = getattr(self, "condition", None)
1191 self.__init__(
1192 source.get("phases", source.get("phase")),
1193 condition,
1194 source.get("action"),
1195 source.get("false_action"),
1196 actions=source.get("actions"),
1197 description=source.get("description", self.description),
1198 overwrite_settings=source.get("overwrite_settings"),
1199 self_config=source.get("self_config"),
1200 priority=source.get("priority", RulePriority.NORMAL),
1201 cooldown_reset_value=source.get("cooldown_reset_value"),
1202 group=source.get("group"),
1203 enabled=source.get("enabled", True),
1204 )
1205
1206 # -----------------------
1207
1208 @classmethod
1209 def _get_init_signature(cls):
1210 """
1211 Get the signature of the __init__ function.
1212
1213 Returns:
1214 The signature of the __init__ function.
1215
1216 :meta private:
1217 """
1218 return inspect.signature(cls.__init__).parameters.keys()
1219
[docs]
1220 def execute_phase(
1221 self, phase: Phase, *, prior_results: Any = None, update_controls: Optional[carla.VehicleControl] = None
1222 ):
1223 """
1224 Attention:
1225 - **Use with care to avoid loops or recursions.**
1226
1227 - If a :py:class:`.Context` is available prefer using
1228 :py:meth:`ctx.agent.execute_phase <.LunaticAgent.execute_phase>` instead.
1229
1230 Helper function to execute a phase from within a rule,
1231 wrapper of :py:meth:`agents.lunatic_agent.LunaticAgent.execute_phase`.
1232
1233 Warns:
1234 ReferenceError: If the weak proxy pointing to the :py:class:`Context` object has been
1235 deleted. The phase will not be executed.
1236
1237 See Also:
1238 - :py:meth:`.LunaticAgent.execute_phase`
1239 """
1240 try:
1241 self._ctx.agent.execute_phase(
1242 phase=phase, # pyright: ignore[reportOptionalMemberAccess]
1243 prior_results=prior_results,
1244 update_controls=update_controls,
1245 )
1246 except ReferenceError:
1247 logger.error("ReferenceError in Rule.execute_phase. Weakproxy deleted.")
1248
1249 # -----------------------
1250 # Evaluation functions
1251 # -----------------------
1252
1253 @_use_temporary_config
1254 def evaluate(
1255 self,
1256 ctx: Context,
1257 overwrite: Optional[Dict[str, Any]] = None, # pylint: ignore=unused-argument # noqa: ARG002
1258 ) -> Union[bool, Hashable, "Literal[RuleResult.NO_RESULT]"]:
1259 """
1260 Note:
1261 This is an **interface** function of rules that is executed during :py:meth:`__call__`.
1262 This method does not check if a rule is applicable.
1263 i.e. if the rule is in the correct phase of if it py:meth:`is_ready`
1264 this is done in :py:meth:`__call__`.
1265
1266 Meta rules with children should overwrite this method and call
1267 :py:meth:`evaluate_children` from here.
1268
1269 Executes the :py:attr:`condition` function of the rule.
1270 The decorator automatically takes care of temporarily setting the :py:attr:`overwrite_settings`.
1271
1272 Parameters:
1273 ctx: The context object that is passed to the condition function.
1274 overwrite: A dictionary of settings that will overwrite the agent's setting for this Rule.
1275 Used by the :code:`@_use_temporary_config` decorator.
1276
1277 :meta private:
1278 """
1279 self._ctx = proxy(ctx) # use with care and access over function
1280 return self.condition(ctx) # pyright: ignore[reportCallIssue]
1281
[docs]
1282 def evaluate_children(self, ctx: Context) -> "NoReturn": # pylint: disable=unused-argument
1283 """
1284 Not implemented for this rule class.
1285
1286 Note:
1287 This is an **interface** function of meta rules
1288 that is executed during :py:meth:`__call__` to call further rules.
1289 """
1290 raise NotImplementedError("This method should be implemented in a subclass")
1291
[docs]
1292 def __call__(
1293 self,
1294 ctx: Context,
1295 overwrite: Optional[Dict[str, Any]] = None,
1296 *,
1297 ignore_phase: bool = False,
1298 ignore_cooldown: bool = False,
1299 ) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]:
1300 """
1301 1. First checks if the rule is *applicable*, i.e. is its :py:attr:`cooldown == 0 <cooldown>`,
1302 if not returns :py:attr:`NOT_APPLICABLE`.
1303 2. Afterwards evaluates the rules :py:meth:`condition` function.
1304 - if the result is not in :py:attr:`actions` returns :py:attr:`NOT_APPLICABLE`.
1305 - otherwise merges the :py:attr:`overwrite_settings` with the py:attr:`.Context.config` and executes the action.
1306
1307 Parameters:
1308 ctx: The context object that is passed to the condition function.
1309 overwrite: Extends :py:attr:`overwrite_settings` for this call only. Defaults to :code:`None`.
1310 ignore_phase: If :python:`True` the phase check is skipped. Defaults to :code:`False`.
1311 ignore_cooldown: If True the cooldown check is skipped. Defaults to :code:`False`.
1312 """
1313 # Check phase
1314 assert ignore_phase or ctx.agent.current_phase in self.phases
1315
1316 if not self.is_ready() and not ignore_cooldown:
1317 return RuleResult.NOT_APPLICABLE
1318 if (
1319 not ignore_phase and ctx.agent.current_phase not in self.phases
1320 ): # NOTE: This is currently never False as checked in execute_phase and the agents dictionary.
1321 return RuleResult.NOT_APPLICABLE # not applicable for this phase
1322
1323 exception = None
1324 result = Rule.NO_RESULT
1325 try:
1326 result = self.evaluate(ctx, overwrite)
1327 except BaseException as e:
1328 exception = e
1329 else:
1330 ctx.evaluation_results[ctx.agent.current_phase] = result
1331 if result in self.actions:
1332 self.reset_cooldown()
1333 # Apply overwrite settings permanently
1334 ctx.config.merge_with(self.overwrite_settings)
1335 if overwrite:
1336 ctx.config.merge_with(overwrite)
1337
1338 action_result = self.actions[result](
1339 ctx
1340 ) # todo allow priority, random chance # pyright: ignore[reportCallIssue]
1341 ctx.action_results[ctx.agent.current_phase] = action_result
1342 return action_result
1343 return RuleResult.NOT_APPLICABLE # No action was executed
1344 finally:
1345 self._ctx = None
1346 if exception:
1347 self.reset_cooldown() #
1348 raise exception
1349
1350 def __str__(self) -> str:
1351 try:
1352 if isinstance(self.condition, partial):
1353 return (
1354 self.__class__.__name__ + f"(description='{self.description}', "
1355 f"phases={self.phases}, group={self.group}, "
1356 f"priority={self.priority}, "
1357 f"actions={self.actions}, "
1358 f"condition={self.condition.func}, " # pyright: ignore[reportUnknownMemberType]
1359 f"cooldown={self.cooldown})"
1360 )
1361 return (
1362 self.__class__.__name__ + f"(description='{self.description}', "
1363 f"phases={self.phases}, group={self.group}, "
1364 f"priority={self.priority}, "
1365 f"actions={self.actions}, "
1366 f"condition={self.condition.__name__}, "
1367 f"cooldown={self.cooldown})"
1368 )
1369 except AttributeError as e:
1370 logger.info("Error during string creation: " + str(e))
1371 return (
1372 self.__class__.__name__ + "(Error in condition.__str__: Rule has not been initialized correctly. "
1373 "Missing attributes: " + str(e) + ")"
1374 )
1375
1376 def __repr__(self) -> str:
1377 return str(self)
1378
1379
[docs]
1380class MultiRule(Rule, metarule=True):
1381 """
1382 This metarule allows to execute one or multiple rules if it is applicable.
1383 Depending on :py:attr:`execute_all_rules` it will either execute all rules or only the first
1384 applicable rule from its :py:attr:`rules` list.
1385 """
1386
1387 rules: List[Rule]
1388 """The list of child rules to be called if this rule's condition is true."""
1389
1390 def _wrap_action(self, action: Callable[[Context], Any]):
1391 """
1392 Wrap the passed action.
1393 First the action is executed afterwards the child rules are evaluated.
1394
1395 Note:
1396 There is no extra condition that is checked between the two actions.
1397 """
1398
1399 @wraps(action)
1400 def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any:
1401 try:
1402 result = action(ctx, *args, **kwargs)
1403 except DoNotEvaluateChildRules as e:
1404 return e, None
1405 else:
1406 results = self.evaluate_children(ctx) # execute given rules as well
1407 return result, results
1408
1409 return wrapper
1410
1411 if TYPE_CHECKING:
1412
1413 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage,reportGeneralTypeIssues]
1414 rules: Required[List[Rule]]
1415 sort_rules: NotRequired[bool]
1416 execute_all_rules: NotRequired[bool]
1417
1418 @singledispatchmethod
1419 def __init__(
1420 self,
1421 phases: Union[Phase, Iterable[Phase]],
1422 # /, # phases must be positional; python3.8+ only
1423 rules: List[Rule],
1424 condition: Optional[ConditionFunctionLikeT] = None,
1425 *,
1426 description: str = "If its own condition is true calls the passed rules.",
1427 priority: RulePriority = RulePriority.NORMAL,
1428 sort_rules: bool = True,
1429 execute_all_rules: bool = False,
1430 action: Optional[Callable[[Context], Any]] = None,
1431 ignore_phase: bool = True,
1432 overwrite_settings: Optional[Dict[str, Any]] = None,
1433 self_config: Optional[Dict[str, Any]] = None,
1434 cooldown_reset_value: Optional[int] = None,
1435 group: Optional[str] = None,
1436 enabled: bool = True,
1437 ):
1438 """
1439 Initializes a Rule object that can have further rules as children.
1440
1441 Args:
1442 phases (Union[Phase, Iterable]): The phase or phases in which the rule should be active.
1443 rules (List[Rule]): The list of child rules to be called if the rule's condition is true.
1444 condition (Callable[[Context]], optional): The condition that determines if the rules should be evaluated. Defaults to :py:func:`always_execute`.
1445 execute_all_rules (bool, optional):
1446 If False will only execute the first rule with a applicable condition, i.e. this MultiRule is like a node in a decision tree.
1447 If True all rules are evaluated, unless one raises a `DoNotEvaluateChildRules` exception.
1448 Defaults to :python:`False`.
1449 sort_rules (bool, optional): Flag indicating whether to sort the rules by priority. Defaults to :python:`True`.
1450 action (Callable[[Context]], optional): The action to be executed before the passed rules are evaluated. Defaults to :python:`None`.
1451 ignore_phase (bool, optional): Flag indicating whether to ignore the Phase of the passed child rules. Defaults to :python:`True`.
1452 overwrite_settings (Dict[str, Any], optional): Additional settings to overwrite the agent's settings. Defaults to :python:`None`.
1453 priority (RulePriority, optional): The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`.
1454 description (str, optional): The description of the rule. Defaults to :python:`"If its own rule is true calls the passed rules."`.
1455 group (str | None, optional): The group name of the rule. Defaults to :python:`None`.
1456 enabled (bool, optional): Flag indicating whether the rule is enabled after creation. Defaults to :python:`True`.
1457 """
1458 self.ignore_phase = ignore_phase
1459 if rules is None: # type: ignore
1460 logger.warning(
1461 "Warning: No rules passed to %s: %s. You can still add rules to the rules attribute later.",
1462 self.__class__.mro()[1].__name__,
1463 self.__class__.__name__,
1464 )
1465 rules = []
1466 if len(rules) == 0:
1467 # NOTE: This will be logged twice, once more by
1468 logger.warning("Rules list is empty. %s will always return NOT_APPLICABLE.", self.__class__.__name__)
1469 self.rules = rules
1470 self.execute_all_rules = execute_all_rules
1471 if sort_rules:
1472 self.rules.sort(key=lambda r: r.priority, reverse=True)
1473 # if an action is passed to be executed before the passed rules it is wrapped to execute both
1474 if action is not None:
1475 action = self._wrap_action(action)
1476 else:
1477 action = self.evaluate_children # will be called elsewhere
1478 if condition is None and not hasattr(self, "condition"):
1479 condition_arg = always_execute
1480 else:
1481 condition_arg = condition
1482 super().__init__(
1483 phases,
1484 condition=condition_arg,
1485 action=action,
1486 description=description,
1487 overwrite_settings=overwrite_settings,
1488 self_config=self_config,
1489 priority=priority,
1490 cooldown_reset_value=cooldown_reset_value,
1491 enabled=enabled,
1492 group=group,
1493 )
1494
1495 @__init__.register(_CountdownRule) # For similar_rule = Rule(some_rule), easier cloning
1496 @__init__.register(type) # For @Rule class MyRule: ...
1497 def __init_by_decorating_class(self, cls: "MultiRule"): # pyright: ignore[reportUnusedFunction]
1498 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow both
1499 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1500 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided."
1501 self.__init__(
1502 phases,
1503 cls.rules,
1504 cls.condition, # type: ignore
1505 description=cls.description,
1506 overwrite_settings=getattr(cls, "overwrite_settings", None),
1507 self_config=getattr(cls, "self_config", None),
1508 priority=getattr(cls, "priority", RulePriority.NORMAL),
1509 cooldown_reset_value=cooldown_reset_value,
1510 group=getattr(cls, "group", None),
1511 enabled=getattr(cls, "enabled", True),
1512 sort_rules_by_priority=getattr(cls, "sort_rules_by_priority", True),
1513 execute_all_rules=getattr(cls, "execute_all_rules", False),
1514 prior_action=getattr(cls, "prior_action", None),
1515 ignore_phase=getattr(cls, "ignore_phase", True),
1516 )
1517
1518 @__init__.register(Mapping)
1519 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction]
1520 self.__init__(
1521 cls.get("phases", cls.get("phase")),
1522 cls["rules"],
1523 cls.get("condition"),
1524 description=cls.get("description", self.description),
1525 overwrite_settings=cls.get("overwrite_settings"),
1526 self_config=cls.get("self_config"),
1527 priority=cls.get("priority", RulePriority.NORMAL),
1528 cooldown_reset_value=cls.get("cooldown_reset_value"),
1529 group=cls.get("group"),
1530 enabled=cls.get("enabled", True),
1531 sort_rules_by_priority=cls.get("sort_rules_by_priority", True),
1532 execute_all_rules=cls.get("execute_all_rules", False),
1533 prior_action=cls.get("prior_action", None),
1534 ignore_phase=cls.get("ignore_phase", True),
1535 )
1536
[docs]
1537 def evaluate_children(self, ctx: Context) -> Union[List[Any], Any]: # pyright: ignore[reportIncompatibleMethodOverride]
1538 """
1539 Evaluates the children rules of the current rule in the given context.
1540
1541 Args:
1542 ctx : The context in which the child rules are evaluated.
1543
1544 Returns:
1545 The results of evaluating the children rules.
1546 Returns a list of results if execute_all_rules is True,
1547 otherwise the result of the first rule that was applied.
1548 """
1549 results: List[Any] = []
1550 for rule in self.rules:
1551 try:
1552 result = rule(ctx, ignore_phase=self.ignore_phase)
1553 except DoNotEvaluateChildRules:
1554 return results
1555 if not self.execute_all_rules and result is not Rule.NOT_APPLICABLE: # one rule was applied end.
1556 return result
1557 results.append(result)
1558 if not self.execute_all_rules:
1559 return Rule.NOT_APPLICABLE # does not expect a list
1560 return results
1561
1562
[docs]
1563class RandomRule(MultiRule, metarule=True):
1564 """
1565 A rule that selects and evaluates one or more random child rules from a set of rules.
1566
1567 Args:
1568 phases : The phase or phases in which the rule is applicable.
1569 rules : The set of rules from which to select random child rules.
1570 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`.
1571 condition :
1572 A callable that determines if the rule is applicable in a given context.
1573 If None and the rule does not implement a :py:attr:`condition` attribute the rule always executes.
1574 Defaults to :python:`None`.
1575 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`.
1576 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`.
1577 priority : The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`.
1578 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`.
1579 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`.
1580 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`.
1581 group : The group to which the rule belongs. Defaults to :python:`None`.
1582 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`.
1583 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`.
1584
1585 Raises:
1586 ValueError: When passing **rules** as a dict with weights, the **weights** argument must be None.
1587 """
1588
1589 # TODO: add a dummy attribute for one additional weight, that skips the evaluation. Should only considered once.
1590
1591 if TYPE_CHECKING:
1592
1593 class _InitParameters(MultiRule._InitParameters): # pyright: ignore[reportPrivateUsage]
1594 repeat_if_not_applicable: NotRequired[bool]
1595 weights: NotRequired[Optional[List[float]]]
1596
1597 @singledispatchmethod
1598 def __init__(
1599 self,
1600 phases: Union[Phase, Iterable[Phase]], # /, # phases must be positional; python3.8+ only
1601 rules: Union[Dict[Rule, float], List[Rule]],
1602 repeat_if_not_applicable: bool = True,
1603 condition: Optional[Callable[[Context], Any]] = None,
1604 *,
1605 action: Optional[Callable[[Context], Any]] = None,
1606 ignore_phase: bool = True,
1607 priority: RulePriority = RulePriority.NORMAL,
1608 description: str = "If its own condition is true calls one or more random child rules from the passed rules.",
1609 overwrite_settings: Optional[Dict[str, Any]] = None,
1610 self_config: Optional[Dict[str, Any]] = None,
1611 cooldown_reset_value: Optional[int] = None,
1612 group: Optional[str] = None,
1613 enabled: bool = True,
1614 weights: Optional[List[float]] = None,
1615 ):
1616 """
1617 Initializes a Rule object that can trigger one or more random child rules.
1618
1619 Args:
1620 phases : The phase or phases in which the rule is applicable.
1621 rules : The set of rules from which to select random child rules.
1622 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`.
1623 condition :
1624 A callable that determines if the rule is applicable in a given context.
1625 If None and the rule does not implement a `condition` attribute the rule always executes.
1626 Defaults to :python:`None`.
1627 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`.
1628 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`.
1629 priority : The priority of the rule. Defaults to :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`.
1630 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`.
1631 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`.
1632 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`.
1633 group : The group to which the rule belongs. Defaults to :python:`None`.
1634 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`.
1635 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`.
1636 """
1637 if isinstance(rules, dict):
1638 if weights is not None:
1639 raise ValueError("When passing rules as a dict with weights, the weights argument must be None")
1640 self.weights = list(accumulate(rules.values())) # cumulative weights for random.choices are more efficient
1641 self.rules = list(rules.keys())
1642 else:
1643 self.weights = weights or list(accumulate(r.priority for r in rules))
1644 self.rules = rules
1645 self.repeat_if_not_applicable = repeat_if_not_applicable
1646 super().__init__(
1647 phases,
1648 rules,
1649 condition=condition,
1650 action=action,
1651 description=description,
1652 priority=priority,
1653 ignore_phase=ignore_phase,
1654 overwrite_settings=overwrite_settings,
1655 self_config=self_config,
1656 cooldown_reset_value=cooldown_reset_value,
1657 enabled=enabled,
1658 group=group,
1659 )
1660
1661 @__init__.register(_CountdownRule)
1662 @__init__.register(type)
1663 def __init_by_decorating_class(self, cls: "RandomRule"): # pyright: ignore[reportUnusedFunction]
1664 phases = getattr(cls, "phases", getattr(cls, "phase", None))
1665 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1666
1667 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided."
1668 self.__init__(
1669 phases,
1670 cls.rules,
1671 repeat_if_not_applicable=cls.repeat_if_not_applicable,
1672 condition=cls.condition,
1673 description=cls.description,
1674 overwrite_settings=getattr(cls, "overwrite_settings", None),
1675 self_config=getattr(cls, "self_config", None),
1676 priority=getattr(cls, "priority", RulePriority.NORMAL),
1677 cooldown_reset_value=cooldown_reset_value,
1678 group=getattr(cls, "group", None),
1679 enabled=getattr(cls, "enabled", True),
1680 prior_action=getattr(cls, "prior_action", None),
1681 ignore_phase=getattr(cls, "ignore_phase", True),
1682 )
1683
1684 @__init__.register(Mapping)
1685 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction]
1686 self.__init__(
1687 cls.get("phases", cls.get("phase")),
1688 cls["rules"],
1689 repeat_if_not_applicable=cls.get("repeat_if_not_applicable", True),
1690 condition=cls.get("condition"),
1691 description=cls.get("description", self.description),
1692 overwrite_settings=cls.get("overwrite_settings"),
1693 self_config=cls.get("self_config"),
1694 priority=cls.get("priority", RulePriority.NORMAL),
1695 cooldown_reset_value=cls.get("cooldown_reset_value"),
1696 group=cls.get("group"),
1697 enabled=cls.get("enabled", True),
1698 sort_rules_by_priority=cls.get("sort_rules_by_priority", True),
1699 execute_all_rules=cls.get("execute_all_rules", False),
1700 prior_action=cls.get("prior_action", None),
1701 ignore_phase=cls.get("ignore_phase", True),
1702 )
1703
[docs]
1704 def evaluate_children(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None) -> Any:
1705 """
1706 Evaluate a random child rule.
1707 If `self.repeat_if_not_applicable=False` and the randomly chosen rule is not applicable,
1708 then no further rules are evaluated
1709 For `self.repeat_if_not_applicable=False` possible rules are evaluated in a random fashion
1710 until one rule was applicable.
1711 """
1712 if len(self.rules) == 0:
1713 logger.warning("No rules to evaluate in %s. Returning NOT_APPLICABLE.", self.__class__.__name__)
1714 return RuleResult.NOT_APPLICABLE
1715 if self.repeat_if_not_applicable:
1716 rules = self.rules.copy()
1717 weights = self.weights.copy()
1718 else:
1719 rules = self.rules
1720 weights = self.weights
1721
1722 result = RuleResult.NOT_APPLICABLE
1723 while rules:
1724 rule = random.choices(rules, cum_weights=weights, k=1)[0]
1725 try:
1726 result = rule(
1727 ctx, overwrite, ignore_phase=self.ignore_phase
1728 ) # NOTE: Context action/evaluation results only store result of LAST rule
1729 except DoNotEvaluateChildRules:
1730 return None
1731 if not self.repeat_if_not_applicable or result is not Rule.NOT_APPLICABLE:
1732 # break after the first rule of `self.repeat_if_not_applicable=False`
1733 # else break if it was applicable.
1734 break
1735 rules.remove(rule)
1736 weights = list(accumulate(r.priority for r in rules))
1737 return result
1738
1739
[docs]
1740class BlockingRule(Rule, metarule=True):
1741 """
1742 This meta rule allows to define rules that are able to takeover the agent's workflow and
1743 apply the :py:class:`.VehicleControl` directly from withhin the rule.
1744
1745 """
1746
1747 _gameframework: ClassVar[Union["GameFramework", "CallableProxyType[GameFramework]", None]] = None
1748 """
1749 Set when a :py:class:`GameFramework` is initialized.
1750 Alternatively can be set when any BlockingRule is created.
1751 """
1752
1753 ticks_passed: int
1754 """Count how many ticks have been performed by this rule and blocked the agent."""
1755
1756 MAX_TICKS = 5000 # 5000 * 1/20 = 250 seconds
1757 """
1758 The amount of ticks that can be performed by this rule before it is automatically disabled.
1759 If the rule has looped for this amount of ticks if will then call :py:attr:`max_tick_callback`
1760 and raise an :py:exc:`.UnblockRuleException` afterwards.
1761
1762 As a hack :py:attr:`max_tick_callback` can change :py:attr:`ticks_passed` to prevent the
1763 exception and continue the rule.
1764 """
1765
1766 max_tick_callback: Optional[Callable[[Self, Context], Any]] = None
1767 """
1768 An optional callback that is executed when :py:attr:`ticks_passed` reaches :py:attr:`MAX_TICKS`.
1769 """
1770
1771 if TYPE_CHECKING:
1772
1773 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage]
1774 gameframework: Required[Optional[GameFramework]]
1775
1776 @singledispatchmethod
1777 def __init__(
1778 self,
1779 phases: Union[Phase, Iterable[Phase]], # iterable of Phases
1780 # /, # phases must be positional; python3.8+ only
1781 condition: Optional[ConditionFunctionLikeT] = None,
1782 action: Optional[Union[CallableAction[Self, []], Dict[Any, CallableAction[Self, []]]]] = None,
1783 false_action: Optional[CallableAction[Self, []]] = None,
1784 *,
1785 gameframework: Optional[GameFramework],
1786 actions: Optional[Dict[Any, CallableAction[Rule, []]]] = None,
1787 description: str = "What does this rule do?",
1788 overwrite_settings: Optional[Dict[str, Any]] = None,
1789 self_config: Optional[Dict[str, Any]] = None,
1790 priority: RulePriority = RulePriority.NORMAL,
1791 cooldown_reset_value: Optional[int] = None,
1792 group: Optional[str] = None,
1793 enabled: bool = True,
1794 ):
1795 super().__init__(
1796 phases,
1797 condition,
1798 action,
1799 false_action,
1800 actions=actions,
1801 description=description,
1802 overwrite_settings=overwrite_settings,
1803 self_config=self_config,
1804 priority=priority,
1805 cooldown_reset_value=cooldown_reset_value,
1806 group=group,
1807 enabled=enabled,
1808 )
1809 if gameframework:
1810 BlockingRule._gameframework = gameframework
1811 if not GameFramework.clock or GameFramework.display is None:
1812 # Not much we can do about it
1813 # logger.info("%s : GameFramework should be initialized before using this rule.", self.__class__.__name__)
1814 # NOTE: This now does not GameFramework.display
1815 GameFramework.init_pygame()
1816 self.ticks_passed = 0
1817
1818 @__init__.register(_CountdownRule)
1819 @__init__.register(type)
1820 def __init_by_decorating_class(self, cls: "BlockingRule"): # pyright: ignore[reportUnusedFunction]
1821 """
1822 Initialize by passing a Rule or class object to the __init__ method.
1823
1824 This allows the usage of the @Rule decorator and easy copying.
1825 """
1826 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for both
1827 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1828 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided."
1829 self.__init__(
1830 phases,
1831 cls.condition, # type: ignore
1832 action=getattr(cls, "action", None),
1833 false_action=getattr(cls, "false_action", None),
1834 gameframework=getattr(cls, "gameframework", BlockingRule._gameframework),
1835 actions=getattr(cls, "actions", None),
1836 description=cls.description,
1837 overwrite_settings=getattr(cls, "overwrite_settings", None),
1838 self_config=getattr(cls, "self_config", None),
1839 priority=getattr(cls, "priority", RulePriority.NORMAL),
1840 cooldown_reset_value=cooldown_reset_value,
1841 group=getattr(cls, "group", None),
1842 enabled=getattr(cls, "enabled", True),
1843 )
1844
1845 @__init__.register(Mapping)
1846 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction]
1847 super().__init__(cls.get("phases", cls.get("phase")), **cls)
1848
1849 def _render_everything(self, ctx: Context):
1850 if self._gameframework:
1851 self._gameframework.render_everything()
1852 else:
1853 world_model = ctx.agent._world_model # pyright: ignore[reportPrivateUsage]
1854 display = GameFramework.display
1855 world_model.tick(GameFramework.clock) # does not tick the world! # pyright: ignore[reportArgumentType]
1856 if display:
1857 world_model.render(display, finalize=False) # pyright: ignore[reportArgumentType]
1858 try:
1859 world_model.controller.render(display) # type: ignore[attr-defined] # noqa: SIM105
1860 except AttributeError: # in case not available for controller class
1861 pass
1862
1863 dm_render_conf: "DetectionMatrix.RenderOptions" = world_model._args.camera.hud.detection_matrix # pyright: ignore[reportPrivateUsage, reportAssignmentType]
1864 if dm_render_conf and ctx.agent:
1865 ctx.agent.render_detection_matrix(display, **dm_render_conf)
1866 world_model.finalize_render(display) # pyright: ignore[reportArgumentType]
1867 if GameFramework.display:
1868 pygame.display.flip()
1869
1870 @overload
1871 def loop_agent(
1872 self,
1873 ctx: Context,
1874 control: Optional[carla.VehicleControl] = None,
1875 *,
1876 execute_planner: Literal[True],
1877 execute_phases: Any,
1878 ) -> carla.VehicleControl: ...
1879
1880 @overload
1881 def loop_agent(
1882 self,
1883 ctx: Context,
1884 control: Optional[carla.VehicleControl] = None,
1885 *,
1886 execute_planner: Literal[False],
1887 execute_phases: Any,
1888 ) -> None: ...
1889
[docs]
1890 def loop_agent(
1891 self,
1892 ctx: Context,
1893 control: Optional[carla.VehicleControl] = None,
1894 *,
1895 execute_planner: bool,
1896 execute_phases: Any = True,
1897 ) -> "carla.VehicleControl | None":
1898 """
1899 A combination of `LunaticAgent.parse_keyboard_input`, `LunaticAgent.apply_control`, `BlockingRule.update_world`,
1900 and `Context.get_or_calculate_control` to advance agent and world.
1901
1902 Args:
1903 ctx (Context): The current context object
1904 control (Optional[carla.VehicleControl], optional): The control to apply; will overwrite the context's control.
1905 If None takes the context's control.
1906 Defaults to :python:`None`.
1907
1908 See Also:
1909 Executes the following methods:
1910
1911 1. :py:meth:`.LunaticAgent.parse_keyboard_input`
1912 2. :py:meth:`.LunaticAgent.apply_control`
1913 3. :py:meth:`.BlockingRule.update_world` ticks the :py:class:`carla.World` and renders everything.
1914 4. :py:meth:`.Context.get_or_calculate_control` to acquire the :py:class:`carla.VehicleControl` object.
1915 """
1916 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent
1917 ctx.agent.apply_control(control)
1918
1919 # NOTE: This ticks the world forward by one step
1920 # The ctx.control is reset to None; execute_plan
1921 # > Phase.UPDATE_INFORMATION | Phase.BEGIN
1922 self.update_world(ctx, execute_phases=execute_phases)
1923 if execute_planner:
1924 return ctx.get_or_calculate_control()
1925 return None
1926
[docs]
1927 @staticmethod
1928 def get_world() -> carla.World:
1929 """Method to access the world object"""
1930 return CarlaDataProvider.get_world()
1931
1932 def _begin_tick(self, ctx: Context):
1933 self._gameframework.clock.tick() # self.args.fps) # type: ignore[attr-defined,union-type]
1934 frame = None
1935 if (
1936 self._gameframework and self._gameframework.launch_config.handle_ticks
1937 ): # i.e. no scenario runner doing it for us
1938 if CarlaDataProvider.is_sync_mode():
1939 frame = self.get_world().tick()
1940 else:
1941 frame = self.get_world().wait_for_tick().frame
1942 CarlaDataProvider.on_carla_tick()
1943 else:
1944 # CRITICAL: The framework expects a return value before it ticks the world,
1945 # however the blocking rules should take over the ticks; so it should still do this!
1946 # TODO: Should implement some return <-> pass trough mechanism; maybe implement a
1947 # generator-like variant of the rule.
1948 if CarlaDataProvider.is_sync_mode():
1949 frame = self.get_world().tick()
1950 else:
1951 frame = self.get_world().wait_for_tick().frame
1952 CarlaDataProvider.on_carla_tick()
1953
1954 if CarlaDataProvider.is_sync_mode():
1955 # We do this only in sync mode as frames could pass between gathering this information
1956 # and an agent calling InformationManager.tick(), which in turn calls global_tick
1957 # with possibly a DIFFERENT frame wasting computation.
1958 if frame is None:
1959 frame = self.get_world().get_snapshot().frame
1960 InformationManager.global_tick(frame)
1961
1962 # Tick world and render everything
1963 # control should be reset, we are at the "start of the tick again"
1964 ctx.set_control(None)
1965 self.ticks_passed += 1
1966
[docs]
1967 def update_world(
1968 self, ctx: Context, *, execute_phases: Union[bool, Container[Phase]] = True
1969 ) -> "carla.VehicleControl | None":
1970 """
1971 Ticks the world and takes care of the rendering.
1972
1973 Will call
1974 - :python:`ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=<this Rule instance>)`
1975 - :py:meth:`.LunaticAgent.update_information`, with or without executing the phases, depending on **execute_phases**.
1976
1977 Args:
1978 ctx: The context to use
1979 execute_update_information:
1980 Whether to execute the :python:`Phase.UPDATE_INFORMATION | Phase.[BEGIN|END]` with this function.
1981 Can also be a container containing one of both of these phases.
1982 Defaults to :python:`True`.
1983
1984 Raises:
1985 UnblockRuleException: If the ticks passed are over :py:attr:`MAX_TICKS`
1986
1987 Attention:
1988 The usage with Leaderboard_ is working but experimental.
1989 The scenario expects the agent to return a control object in every step, however as this rule
1990 takes over the ticks completely an outside ScenarioManager might not work as expected.
1991 """
1992 self._begin_tick(ctx)
1993 ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=self)
1994
1995 # Update the agent's information
1996 if execute_phases and (execute_phases is True or Phase.UPDATE_INFORMATION | Phase.BEGIN in execute_phases):
1997 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.BEGIN, prior_results=self)
1998 ctx.agent._update_information() # pyright: ignore[reportPrivateUsage]
1999 if execute_phases and Phase.UPDATE_INFORMATION | Phase.END not in self.phases:
2000 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.END, prior_results=self)
2001 if self.ticks_passed > self.MAX_TICKS:
2002 logger.info(
2003 "Rule %s has passed its max_ticks %s, calling max_tick_callback and unblocking it", self, self.MAX_TICKS
2004 )
2005 if self.max_tick_callback:
2006 self.max_tick_callback(ctx) # pyright: ignore[reportCallIssue]
2007 if self.ticks_passed > self.MAX_TICKS:
2008 raise UnblockRuleException
2009 # max_tick_callback can override the ticks passed
2010 else:
2011 raise UnblockRuleException
2012
2013 self._render_everything(ctx)
2014
[docs]
2015 def __call__(
2016 self,
2017 ctx: Context,
2018 overwrite: Optional[Dict[str, Any]] = None,
2019 in_loop: bool = False,
2020 *, # Kwargs from Rule.__call__
2021 ignore_phase: bool = False,
2022 ignore_cooldown: bool = False,
2023 ) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]:
2024 if not in_loop:
2025 self.ticks_passed = 0
2026 else:
2027 ignore_phase = True
2028 ignore_cooldown = True
2029 if self in ctx.agent._active_blocking_rules: # pyright: ignore[reportPrivateUsage]
2030 logger.warning(
2031 "Rule %s is already blocking the agent, this is a recursive call. Not executing the rule.", self
2032 )
2033 return Rule.NOT_APPLICABLE
2034 try:
2035 return super().__call__(ctx, overwrite, ignore_phase=ignore_phase, ignore_cooldown=ignore_cooldown)
2036 except UnblockRuleException as e:
2037 return e.result
2038 finally:
2039 ctx.agent._active_blocking_rules.discard(self) # pyright: ignore[reportPrivateUsage]
2040
2041 def evaluate(
2042 self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None
2043 ) -> Union[bool, Hashable, Literal[RuleResult.NO_RESULT]]:
2044 result = super().evaluate(ctx, overwrite)
2045 if result in self.actions:
2046 ctx.agent._active_blocking_rules.add(self) # pyright: ignore[reportPrivateUsage]
2047 return result
2048
2049
2050# Provide necessary imports for the evaluation_function module and prevents circular imports
2051
2052import classes.evaluation_function as __evaluation_function # noqa
2053
2054__evaluation_function.Rule = Rule
2055__evaluation_function.Context = Context
2056del __evaluation_function