1# pyright: strict
2# pyright: reportIndexIssue=information, reportCallIssue=information
3# pyright: reportGeneralTypeIssues=warning
4# pyright: reportUnusedFunction=information, reportUnusedImport=information
5# pyright: reportUnnecessaryIsInstance=false, reportUnnecessaryComparison=false
6# pyright: reportPrivateUsage=none
7
8from __future__ import annotations
9
10import inspect
11
12import random
13from collections.abc import Mapping
14from dataclasses import is_dataclass
15from functools import partial, update_wrapper, wraps
16from inspect import isclass
17from itertools import accumulate
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Callable,
22 ClassVar,
23 Container,
24 Dict,
25 Hashable,
26 Iterable,
27 List,
28 NoReturn,
29 Optional,
30 Set,
31 TypeVar,
32 Union,
33 cast,
34)
35from weakref import CallableProxyType, WeakSet, proxy
36
37import pygame
38from omegaconf import DictConfig, OmegaConf
39from typing_extensions import (
40 Annotated,
41 Concatenate,
42 Literal,
43 NotRequired,
44 ParamSpec,
45 Required,
46 Self,
47 TypedDict,
48 Unpack,
49 overload,
50)
51
52from agents.tools.logs import logger
53from classes.constants import READTHEDOCS, Hazard, HazardSeverity, Phase, RulePriority, RuleResult
54from classes.evaluation_function import ConditionFunction
55from classes.exceptions import DoNotEvaluateChildRules, UnblockRuleException
56from classes.worldmodel import GameFramework
57from classes.information_manager import InformationManager
58from launch_tools import CarlaDataProvider, singledispatchmethod
59
60if TYPE_CHECKING:
61 import carla
62 from classes.type_protocols import CallableAction, CallableActionT, ConditionFunctionLike, ConditionFunctionLikeT
63 from agents.lunatic_agent import LunaticAgent
64 from agents.tools.config_creation import ContextSettings, LiveInfo, RuleConfig
65 # NOTE: gameframework.py adds GameFramework to this module's variables
66 # at this position it would be a circular import
67
68_T = TypeVar("_T")
69_P = ParamSpec("_P")
70_Rule = TypeVar("_Rule", bound="Rule")
71
72
[docs]
73class Context(CarlaDataProvider):
74 """
75 Object to be passed as the first argument (instead of self) to rules, actions and evaluation functions.
76
77 The :py:class:`Context` class derives from the scenario runner's :py:class:`CarlaDataProvider` to allow access to the world, map, etc.
78
79 Tip:
80 There is normally no need to initialize the context object manually.
81 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`.
82 """
83
84 agent: "LunaticAgent"
85 """Backreference to the agent."""
86
87 config: "ContextSettings"
88 """A copy of the agents config. Overwritten by the condition's settings."""
89
90 evaluation_results: dict[Phase, Hashable] # ambiguous wording, which result? here evaluation result
91 """
92 Stores the result from the :py:meth:`Rule.condition` of the last rule that was evaluated in a phase.
93
94 .. deprecated:: in consideration
95 """
96
97 action_results: dict[Phase, Any]
98 """
99 Stores the result from the :py:meth:`action` of the last rule that was applicable in a phase.
100
101 .. deprecated:: in consideration
102 """
103
104 _control: Optional[carla.VehicleControl]
105 """
106 Current control the agent should use.
107
108 Accessible from the :py:attr:`control` property.
109 """
110
111 prior_result: Optional[Any] # TODO: maybe rename
112 """Result of the current phase."""
113
114 phase_results: dict[Phase, Any]
115 """
116 Stores the results of the phases the agent has been in.
117 By default the keys are set to :py:attr:`Context.PHASE_NOT_EXECUTED`.
118 """
119
120 last_context: Optional["Context"]
121 """The context object of the last tick. Used to access the last phase's results."""
122
123 second_pass: Optional[bool] = None
124 """
125 Whether or not the run_step function performs a second pass, i.e.
126 after the route has been replanned.
127
128 Warning:
129 The correctness should *not* be assumed.
130 The user is responsible for setting this value to True if a second pass is required.
131 """
132
133 _detected_hazards: Set[Hazard]
134 """
135 Detected hazards in the current phase.
136
137 If not empty at the end of the inner step an EmergencyStopException is raised.
138 """
139
140 detected_hazards_info: dict[Hazard, Union[HazardSeverity, Any]]
141 """
142 Information about the detected hazards. :py:meth:`add_hazard` inserts the given :py:class:`.HazardSeverity` as value,
143 however, note that the values are can be arbitrary if used otherwise.
144 """
145
146 PHASE_NOT_EXECUTED: object = object()
147 """
148 Value in :py:attr:`phase_results` to indicate that :python:`agent.execute_phase(phase)` was called for the respective phase
149
150 :meta hide-value:
151 """
152
153 def __init__(self, agent: "LunaticAgent", **kwargs: Any):
154 """
155 Its recommended to initialize the context object with :py:meth:`.LunaticAgent._make_context`.
156 """
157 self.agent = agent
158 self._control = kwargs.pop("control", None)
159 self._init_arguments = kwargs
160 self.phase_results = dict.fromkeys(Phase.get_phases(), Context.PHASE_NOT_EXECUTED)
161
162 self.config: "ContextSettings" = agent.config.copy() # type: ignore
163 self.config._content["live_info"] = agent.live_info # not a copy! # pyright: ignore
164
165 self.detected_hazards = set()
166 self.detected_hazards_info = dict.fromkeys(Hazard, HazardSeverity.NONE)
167 self.__dict__.update(kwargs)
168
169 # Less used attributes
170 self.evaluation_results = {}
171 self.action_results = {}
172
173 @property
174 def current_phase(self) -> Phase:
175 """Current :py:class:`Phase` the agent is in."""
176 return self.agent.current_phase
177
178 @property
179 def control(self) -> Union[carla.VehicleControl, None]:
180 """
181 Current control the agent should use. Set by :py:meth:`execute_phase(update_controls=...) <.LunaticAgent.execute_phase>`.
182
183 Note:
184 Safeguarded to be not set to None. Setting it to :code:`None` is discouraged.
185 Use :py:meth:`set_control` if setting it to :code:`None` is really needed.
186 """
187 return self._control
188
189 @control.setter
190 def control(self, control: carla.VehicleControl):
191 if control is None: # type: ignore[comparison]
192 raise ValueError("Context.control must not be None. "
193 "To set it to None explicitly use set_control.")
194 self._control = control
195
[docs]
196 def set_control(self, control: Optional[carla.VehicleControl]):
197 """Set the control, allows to set it to None."""
198 self._control = control
199
[docs]
200 def get_or_calculate_control(self) -> carla.VehicleControl:
201 """
202 Get the control if it is set, otherwise calculate it by
203 executing the local planner.
204
205 Returns:
206 The control the agent should use.
207
208 Note:
209 Use this function inside rules to acquire a control object-
210
211 Warning:
212 This is equivalent to ending the inner step of the agent.
213
214 See Also:
215 :py:meth:`LunaticAgent._calculate_control`
216 """
217 if self.control:
218 return self.control
219 self.control = self.agent._calculate_control() # pyright: ignore[reportPrivateUsage]
220 return self.control # pyright: ignore[reportReturnType]
221
222 @property
223 def detected_hazards(self) -> Set[Hazard]:
224 """
225 Detected hazards in the current phase.
226
227 If not empty at the end of the inner step an EmergencyStopException is raised.
228 """
229 return self._detected_hazards
230
231 @detected_hazards.setter
232 def detected_hazards(self, hazards: Set[Hazard]):
233 if not isinstance(hazards, set): # type: ignore
234 raise TypeError("detected_hazards must be a set of Hazards.")
235 self._detected_hazards = hazards
236
[docs]
237 def add_hazard(self, hazard: Hazard, hazard_level: HazardSeverity = HazardSeverity.EMERGENCY):
238 """
239 Add the specified hazard to the detected hazards, in parallel the `hazard_level` can be set which
240 which is stored in :any:`detected_hazards_info`.
241 """
242 if hazard not in Hazard:
243 logger.warning(f"Adding {hazard} to the detected hazards which is not a member of the Hazard enum.")
244 self.detected_hazards.add(hazard)
245 self.detected_hazards_info[hazard] = hazard_level
246
[docs]
247 def discard_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "subset"):
248 """
249 Discards a hazard from the detected hazards.
250
251 Parameters:
252 hazard: Hazard to remove from :py:attr:`detected_hazards`.
253 match: How to match the hazard to remove.
254
255 - "exact" removes if the exact :py:class:`~classes.constants.Hazard` flag is present.
256 - "subset" removes if the hazard is a subset of the detected hazard, e.g.:
257
258 - :python:`discard_hazard(Hazard.VEHICLE, match="subset")` would remove
259 :any:`Hazard.OBSTACLE` = ``Hazard.VEHICLE | PEDESTRIAN | STATIC_OBSTACLE``.
260
261 - :python:`discard_hazard(Hazard.TRAFFIC_LIGHT, match="subset")` would *not* remove
262 :any:`Hazard.TRAFFIC_LIGHT_RED`.
263
264 """
265 if match == "subset":
266 self.detected_hazards = {h for h in self.detected_hazards if hazard not in h} # supports flags
267 elif match == "exact":
268 self.detected_hazards.discard(hazard)
269 elif match == "intersection":
270 self.detected_hazards = {h for h in self.detected_hazards if not hazard & h}
271 else:
272 raise ValueError(f"match must be 'exact', 'subset' or 'intersection', not {match}.")
273
[docs]
274 def has_hazard(self, hazard: Hazard, match: Literal["exact", "subset", "intersection"] = "intersection") -> bool:
275 """
276 Checks if the hazard intersects with any of the detected hazards.
277
278 See :py:meth:`discard_hazard` for the different matching options.
279 """
280 if match == "exact":
281 return hazard in self.detected_hazards
282 if match == "subset":
283 return any(hazard in h for h in self.detected_hazards)
284 if match == "intersection":
285 return any(hazard & h for h in self.detected_hazards)
286 raise ValueError(f"match must be 'exact', 'subset' or 'intersection', not {match}.")
287
288 # Convenience function when using detect_vehicles
289 from agents.tools.lunatic_agent_tools import max_detection_distance # noqa
290
291 @property
292 def live_info(self) -> "LiveInfo":
293 return self.config.live_info
294
295 @property
296 def active_blocking_rules(self) -> Set["BlockingRule"]:
297 return self.agent._active_blocking_rules # pyright: ignore[reportPrivateUsage]
298
299
300#@ConditionFunction["Rule", [], Literal[True]] # python 3.9+ syntax
[docs]
301@ConditionFunction
302def always_execute(ctx: Context) -> Literal[True]: # pylint: disable=unused-argument, # noqa: ARG001
303 """
304 This is an :py:class:`.ConditionFunction` that always returns :python:`True`. It can be used to always execute an action.
305 """
306 return True
307
308
309def _use_temporary_config(func: Callable[Concatenate[_Rule,
310 "Context",
311 Optional[Dict[str, Any]], _P], _T]
312 ) -> Callable[Concatenate[_Rule, "Context", Optional[Dict[str, Any]], _P], _T]:
313 """
314 During the condition evaluation the ctx.config should have the overwrite settings applied
315 but not in a permanent way.
316 """
317 # TODO: To avoid unused argument error, consume the dict; however this might be harder to understand
318
319 @wraps(func)
320 def wrapper(self: _Rule, ctx: Context, overwrite: Optional[Dict[str, Any]] = None, *args: _P.args, **kwargs: _P.kwargs) -> _T:
321 settings = self.overwrite_settings.copy() # Dict with "self" : SelfConfig
322 if overwrite:
323 settings.update(overwrite)
324
325 original_ctx_config = ctx.config
326 ctx.config["self"] = "???" # do not merge old self_config
327
328 temp: "ContextSettings" = OmegaConf.merge(ctx.config, settings) # type: ignore
329
330 OmegaConf.set_readonly(temp, True)
331 ctx.config = temp
332 self.self_config = self.overwrite_settings["self"] = ctx.config["self"]
333 OmegaConf.set_readonly(self.self_config, False) # The Rule's settings should still be dynamic; expected in overwrite
334 try:
335 return func(self, ctx, overwrite, *args, **kwargs)
336 finally:
337 ctx.config = original_ctx_config
338 return wrapper
339
340
341class _CountdownRule:
342
343 # TODO: low prio: make cooldown dependant of tickrate or add a conversion from seconds to ticks OR make time-based
344 tickrate: ClassVar[int] = NotImplemented
345 """
346 :meta private:
347 """
348
349 DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0
350 """
351 Value the cooldown is reset to when :py:meth:`reset_cooldown` is called without a value.
352
353 Used *only* when :py:attr:`cooldown_reset_value` is not set.
354 """
355
356 start_cooldown: ClassVar[int] = 0
357 """Initial :py:attr:`cooldown` when initialized. if >0 the rule will not be ready for the first **start_cooldown** ticks."""
358
359 _instances: ClassVar["WeakSet[_CountdownRule]"] = WeakSet()
360 """Keep track of all Rule instances for the cooldowns"""
361
362 _cooldown: int
363 """If 0 the rule is ready to be executed."""
364
365 blocked: bool = False # NOTE: not a property
366 """Indicates if the rule is blocked for this tick only. Is reset to False after the tick."""
367
368 if TYPE_CHECKING:
369 class _InitParameters(TypedDict):
370 """Dict describing the parameters of the __init__ of the class method."""
371 cooldown_reset_value: NotRequired[Optional[int]]
372 enabled: NotRequired[bool]
373
374 def __init__(self, cooldown_reset_value: Optional[int] = None, enabled: bool = True):
375 self._instances.add(self)
376 self._cooldown = self.start_cooldown
377 self.max_cooldown = cooldown_reset_value if cooldown_reset_value is not None else self.DEFAULT_COOLDOWN_RESET
378 self._enabled = enabled
379
380 def is_ready(self) -> bool:
381 """Group aware check if a rule is ready."""
382 return self.cooldown == 0 and self.enabled and not self.blocked # Note: uses property getters. Group aware for GroupRules
383
384 def reset_cooldown(self, value: Optional[int] = None):
385 if value is None:
386 self._cooldown = self.max_cooldown
387 elif value >= 0:
388 self._cooldown = int(value)
389 else:
390 raise ValueError("Cooldown value must be a None or a non-negative integer.")
391
392 @property
393 def cooldown(self) -> int:
394 """
395 Cooldown of the rule in ticks until it can be executed again. Only if 0 the rule can be executed.
396 """
397 return self._cooldown
398
399 @cooldown.setter
400 def cooldown(self, value: int):
401 self._cooldown = value
402
403 def update_cooldown(self):
404 """
405 Update the :py:attr:`cooldown` of *this* rule.
406
407 :meta private:
408 """
409 if self._cooldown > 0:
410 self._cooldown -= 1
411
412 @classmethod
413 def update_all_cooldowns(cls):
414 """Updates the cooldowns of **all** rules."""
415 for instance in cls._instances:
416 instance.update_cooldown()
417
418 @classmethod
419 def unblock_all_rules(cls):
420 """Unblocks all rules"""
421 for instance in cls._instances:
422 instance.blocked = False
423
424 @property
425 def enabled(self) -> bool:
426 """If :code:`False` the rule will not be evaluated. Contrary to :py:attr:`blocked` this permanently disables the rule."""
427 return self._enabled
428
429 @enabled.setter
430 def enabled(self, value: bool):
431 self._enabled = value
432
433 def set_active(self, value: bool):
434 """Enables or disables the rule. Contrary to :py:attr:`blocked` it will not be reset after the tick."""
435 self._enabled = value
436
437 class CooldownFramework:
438 """
439 Context manager that can reduce all cooldowns at the end of a `with` statement.
440 """
441
442 def __enter__(self):
443 return self
444
445 def __exit__(self, exc_type, exc_value, traceback): # type: ignore
446 self.tick()
447
448 @staticmethod
449 def tick():
450 """
451 Update all cooldowns and unblock all rules.
452
453 Calls:
454 - :py:meth:`Rule.update_all_cooldowns`
455 - :py:meth:`Rule.unblock_all_rules`
456 """
457 Rule.update_all_cooldowns()
458 Rule.unblock_all_rules()
459
460
461_GroupInstanceValues = TypedDict('_GroupInstanceValues', {"cooldown": int, "max_cooldown": int, "instances": "WeakSet[_GroupRule]"})
462"""TypeHint that describes the values of `_GroupRule._group_instances`."""
463
464
465class _GroupRule(_CountdownRule):
466 group: Optional[str] = None # group of rules that share a cooldown
467 """
468 Group name of rules that should share their cooldown.
469
470 None for a rule to not share its cooldown.
471 """
472
473 # first two values in the list are current and max cooldown, the third is a set of all instances
474 _group_instances: ClassVar[Dict[str, _GroupInstanceValues]] = {}
475 """
476 Dictionary of all group instances.
477 Keys:
478 The group name.
479 Values:
480 Is a list of the *current cooldown*, the *max cooldown for reset* and a *WeakSet of all instances*.
481 """
482
483 if TYPE_CHECKING:
484 class _InitParameters(_CountdownRule._InitParameters): # pyright: ignore[reportPrivateUsage]
485 """Dict describing the parameters of the __init__ of the class method."""
486 group: NotRequired[Optional[str]]
487
488 def __init__(self, group: Optional[str] = None, cooldown_reset_value: Optional[int] = None, enabled: bool = True):
489 super().__init__(cooldown_reset_value, enabled)
490 self.group = group
491 if group is None:
492 return
493 if group not in self._group_instances:
494 self._group_instances[group] = {'cooldown': 0, 'max_cooldown': 0, 'instances': WeakSet()}
495 self._group_instances[group]["instances"].add(self) # add to weak set
496
497 @property
498 def cooldown(self) -> int:
499 """
500 Cooldown of the rule in ticks until it can be executed again after its action was executed.
501 If 0 the rule is ready to be executed.
502 """
503 if self.group:
504 return _GroupRule._group_instances[self.group]["cooldown"]
505 return super().cooldown
506
507 @property
508 def has_group(self) -> bool:
509 """Indicates if the rule belongs to a group, i.e. :python:`self.group is not None`."""
510 return self.group is not None
511
512 @cooldown.setter
513 def cooldown(self, value: int):
514 if self.group:
515 _GroupRule._group_instances[self.group]["cooldown"] = value
516 return
517 super().cooldown = value
518
519 def set_my_group_cooldown(self, value: Optional[int] = None):
520 """Update the cooldown of the group this rule belong to"""
521 if self.group is None:
522 logger.warning("Rule does not belong to a group, but set_my_group_cooldown was called. Ignoring.")
523 return
524 if value is None:
525 self._group_instances[self.group]["cooldown"] = self._group_instances[self.group]["max_cooldown"] # set to max
526 else:
527 self._group_instances[self.group]["cooldown"] = value
528
529 def reset_cooldown(self, value: Optional[int] = None):
530 """Reset or set the cooldown; for a group rule it resets the group cooldown."""
531 if self.group:
532 self.set_my_group_cooldown(value)
533 else:
534 super().reset_cooldown()
535
536 @classmethod
537 def set_cooldown_of_group(cls, group: str, value: int):
538 """Updates the cooldown of the specified group to a specific value"""
539 if group in cls._group_instances:
540 cls._group_instances[group]["cooldown"] = value
541 else:
542 raise ValueError(f"Group {group} does not exist.")
543
544 __filter_not_ready_instances: Callable[["_CountdownRule"], bool] = lambda instance: instance.group is None and instance._cooldown > 0 # pylint: disable=line-too-long # type: ignore[attr-defined]
545 """
546 Filter function to get all instances that are not ready. see `update_all_cooldowns`
547 Group rules should not be affected by this filter.
548 """
549
550 @classmethod
551 def update_all_cooldowns(cls):
552 """Globally updates the cooldown of *all* rules."""
553 # Update Groups
554 for instance_data in cls._group_instances.values():
555 if instance_data["cooldown"] > 0:
556 instance_data["cooldown"] -= 1
557 for instance in filter(cls.__filter_not_ready_instances, cls._instances):
558 instance._cooldown -= 1
559
560
[docs]
561class Rule(_GroupRule):
562 _auto_init_: ClassVar[bool] = True
563 """
564 If set to False the automatic :code:`__init__` creation is disabled when subclassing.
565 This automatic :code:`__init__` will fix parameters like :any:`phases` and :any:`condition` to the class.
566
567 Declaring an :code:`__init__` method in the class has the same effect as setting :code:`_auto_init_` to False.
568
569 Note:
570 Using :python:`class NewRuleType(metarule=Rule)` is nearly equivalent to :python:`_auto_init_=False`, but is not inherited.
571 """
572
573 NOT_APPLICABLE: ClassVar[Literal[RuleResult.NOT_APPLICABLE]] = RuleResult.NOT_APPLICABLE
574 """
575 Unique object :py:attr:`.RuleResult.NOT_APPLICABLE` that indicates that no action was executed.
576
577 :meta hide-value:
578
579 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly
580 """
581
582 NO_RESULT: ClassVar[Literal[RuleResult.NO_RESULT]] = RuleResult.NO_RESULT
583 """
584 Unique object :py:attr:`.RuleResult.NO_RESULT` that indicates that the rules :py:meth:`action` did not return a result,
585 e.g. because an exception was raised.
586
587 :meta hide-value:
588
589 .. deprecated:: Use :py:attr:`.RuleResult.NOT_APPLICABLE` directly
590 """
591
592 _PROPERTY_MEMBERS: ClassVar[Set[str]] = {"cooldown", "has_group", "enabled"}
593 """
594 A subclass can only overwrite these attributes with properties. This prevents a user accidentally overwriting the property,
595 e.g. `cooldown = 20` with a method or variable.
596
597 TODO:
598 Consider making enabled a variable and not a property.
599 """
600
601 description: str
602 """Description of what this rule should do"""
603
604 phases: frozenset[Phase]
605 """
606 The phase or phases in which the rule should be evaluated.
607 For instantiation the phases attribute can be any :term:`Iterable` [:py:class:`.Phase`].
608 """
609
610 phase: Phase
611 """For the Class API the phase attribute be set to a single Phase object."""
612
613 if TYPE_CHECKING:
614 condition: ConditionFunctionLike[Self, [], Hashable]
615 """
616 The condition that determines if the rule's actions should be executed.
617
618 Simple Variant:
619 return True if the action should be executed, False otherwise.
620 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`.
621
622 Advanced Variant:
623 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict.
624
625 NOTE:
626 The readthedocs description is set in the __init__ function.
627 """
628
629 actions: dict[Any, CallableAction[Self, [], Any]]
630 """Dictionary that maps rule results to the action that should be executed."""
631
632 action: Annotated[CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"]
633 """
634 Action that should be executed if the rule is True. If `actions` is set, this is ignored.
635 """
636
637 false_action: Annotated[CallableAction[Self, [], Any], "attribute not available on instance -> merged into `actions`"]
638 """Action that should be executed if the rule is False. May not be set if `actions` is set."""
639
640 if READTHEDOCS and not TYPE_CHECKING:
641 false_action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"]
642 action: Annotated[CallableActionT, "attribute not available on instance -> merged into `actions`"]
643 actions: dict[Any, CallableActionT]
644
645 #group : Optional[str]
646 #"""Group name for rules that should share their cooldown."""
647
648 overwrite_settings: dict[str, Any]
649 """
650 Settings that should overwrite the agent's settings for this rule.
651
652 Note:
653 The overwrite settings are primitive :py:class:`dict` objects,
654 :py:class:`omegaconf.DictConfig` objects are converted.
655 """
656
657 self_config: "RuleConfig"
658 """
659 A custom sub-config for the rule that is not included in the agents settings.
660 Automatically gets a `instance` key added with the rule instance.
661
662 Can be accessed via `ctx.config.current_rule` or `self.config.self`.
663
664 Note:
665 Internally `self.config` and `ctx.config` is the same object, which makes
666 interpolations to the agent's settings possible.
667
668 Attention:
669 The **self_config object is *not* constant** it is recreated each time the
670 rule is evaluated to have the current context available.
671 """
672
673 priority: Union[float, int, RulePriority] = RulePriority.NORMAL
674 """Rules are executed in order of their priority, from high to low."""
675
676 # Initialization functions
677
678 _ctx: Optional[Context] = None
679 """No hard attachment, to not keep the context objects alive, use with care. Check where it is set in a rule."""
680
[docs]
681 def clone(self):
682 """
683 Create a new instance of the rule with the same settings.
684
685 Note:
686 - The current cooldown **is not** taken into account.
687 - The current enabled state **is** taken into account.
688 """
689 return self.__class__(self) # Make use over overloaded __init__
690
691 @overload
692 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None) -> "Self": ...
693 # No argument call; should be redundant
694
695 @overload
696 def __new__(cls, phases: Union[Phase, Iterable[Phase], None] = None, **kwargs: Unpack[_InitParameters]) -> "Self": ...
697 # argument call
698
699 @overload
700 def __new__(cls, **kwargs: Unpack[_InitParametersComplete]) -> "Self": ...
701 # Normal init
702
703 @overload
704 def __new__(cls, phases: "type[Any] | Self") -> "Self": ...
705 # Rule.copy(other_rule)
706
707 def __new__(cls,
708 phases: Optional[Union[Phase, Iterable[Phase], None, "type[Any]", Self, str]] = None,
709 bases: Optional[tuple[type[Any], ...]] = None,
710 clsdict: Optional[dict[str, Any]] = None,
711 **kwargs: Any):
712 """
713 The @Rule decorator allows to instantiate a Rule class directly for easier out-of-the-box usage.
714 Further check for metaclass initialization, else this is a normal instance creation.
715
716 Parameters:
717 kwargs: These will be passed to :python:`__init_subclass__`.
718 """
719 # @Rule
720 # class NewRuleInstance:
721 # or Rule(other_rule) -> copy
722 if isclass(phases):
723 if issubclass(phases, _CountdownRule):
724 raise ValueError("When using @Rule the class may not be a subclass of Rule. Do not inherit from rule or subclass from Rule instead with the decorator."
725 "Do not do:\n\t@Rule\n\tclass MyRule(>>Rule<<): ...\n")
726 # cls is the Rule used to decorate the decorated_class
727 decorated_class = phases
728
729 # Create the new class, # NOTE: goes to __init_subclass_
730 new_decorated_class = type(decorated_class.__name__, (cls, ), decorated_class.__dict__.copy(),
731 _init_by_decorator=True, **kwargs) # > calls init_subclass; copy() for correct type!
732 if TYPE_CHECKING:
733 assert issubclass(new_decorated_class, cls) and issubclass(new_decorated_class, decorated_class)
734
735 return super().__new__(new_decorated_class)
736
737 # class NewRuleType(metaclass=Rule) # deprecated
738 if isinstance(phases, str):
739 try:
740 logger.warning("Using NewRule(metaclass=Rule) is deprecated. Use NewRule(Rule, metarule=True) instead.")
741 assert clsdict and isinstance(bases, tuple)
742 class_name = phases
743 new_rule_class = type(class_name, (cls, *bases), clsdict, metarule=True, **kwargs)
744 except Exception:
745 print("ERROR: If you want to initialize a rule be sure that you pass a Phase object as the first argument."
746 "A string assumes that you've used class NewSubclass(metaclass=Rule) "
747 "This is DEPRECATED use class NewSubclass(Rule, metarule=True) instead.")
748 raise
749 else:
750 assert issubclass(new_rule_class, cls) # for type-hints
751 return new_rule_class
752 # Normal instance
753 return super().__new__(cls)
754
755 if TYPE_CHECKING:
756 class _InitParameters(_GroupRule._InitParameters, total=False, closed=False): # pyright: ignore[reportPrivateUsage]
757 """Dict describing the parameters of the __init__ of the class method."""
758 #phases: Union[Phase, Iterable[Phase]] # leave this and use explicitly later on
759 #condition: Optional[ConditionFunctionLike[Rule, ..., Hashable]]
760 action: Optional[Union[CallableAction[Rule, [], Any], Dict[Any, CallableAction[Rule, [], Any]]]]
761 false_action: Optional[CallableAction[Rule, [], Any]]
762 actions: Optional[Dict[Any, CallableAction[Rule, [], Any]]]
763 description: str
764 overwrite_settings: Optional[Dict[str, Any]]
765 self_config: Optional[Dict[str, Any]]
766 priority: RulePriority
767
768 class _InitParametersComplete(_InitParameters):
769 """Dict describing the parameters of the __init__ of the class method."""
770 phases: Required[Union[Phase, Iterable[Phase]]]
771 condition: NotRequired[Optional[ConditionFunctionLike[Rule, ..., Hashable]]]
772
773 class _CallKwargs(TypedDict, closed=True):
774 ignore_phase: bool
775 """Default False"""
776 ignore_cooldown: bool
777 """Default False"""
778
779 @singledispatchmethod
780 def __init__(self,
781 phases: Union[Phase, Iterable[Phase]], # iterable of Phases
782 #/, # phases must be positional; python3.8+ only
783 condition: Optional[ConditionFunctionLikeT] = None,
784 action: Optional[Union[CallableAction[Self, []], Dict[Any, CallableAction[Self, []]]]] = None,
785 false_action: Optional[CallableAction[Self, []]] = None,
786 *,
787 actions: Optional[Dict[Any, CallableAction[Self, []]]] = None,
788 description: str = "What does this rule do?",
789 overwrite_settings: Optional[Dict[str, Any]] = None,
790 self_config: Optional[Dict[str, Any]] = None,
791 priority: RulePriority = RulePriority.NORMAL,
792 cooldown_reset_value: Optional[int] = None,
793 group: Optional[str] = None,
794 enabled: bool = True,
795 ):
796 """
797 Initializes a Rule object.
798
799 Parameters:
800 phases: The phase(s) when the rule should be evaluated.
801 An iterable of Phase objects or a single Phase object.
802 condition: A function that takes a Context object as input and returns a Hashable value.
803 If not provided, the class must implement a `condition` function.
804 action: A function or a dictionary of functions that take a Context object as input.
805 If `action` behaves like `actions`.
806 Only one of `action` and `actions` can be set.
807 false_action: A function that takes a Context object as input and returns any value.
808 It is used when `action` is a single function and represents the action to be taken when the condition is False.
809 actions: A dictionary of `action` functions
810 It should map the return values of `condition` to the corresponding action function.
811 If `action` is None, `actions` must be provided.
812 description: A string that describes what this rule does.
813 overwrite_settings: A dictionary of settings that will overwrite the
814 agent's setting for this Rule.
815 priority: The priority of the rule. It can be a float, an integer, or a RulePriority enum value.
816 cooldown_reset_value: An optional integer value that represents the cooldown reset value for the rule.
817 If not provided falls back to the class attribute :py:attr:`.Rule.DEFAULT_COOLDOWN_RESET`.
818 group: An optional string that specifies the group to which this rule belongs.
819 enabled: A boolean value indicating whether the rule is enabled or not.
820
821 Raises:
822 ValueError: If ``phases`` is empty or None, or if ``phases`` contains an object that is not of type Phase.
823 TypeError: If ``condition`` is None and the class does not implement a :py:meth:`condition` function,
824 or if both ``action`` and ``actions`` are None and the class does not have an :py:attr:`actions` attribute or an :py:meth:`action` function.
825 TypeError: if ``actions`` is not a Mapping object.
826 ValueError: If both ``action`` and ``actions`` are provided.
827 ValueError: If ``action`` is a Mapping and either ``false_action`` or ``actions`` is not None.
828 ValueError: If an ``action`` function is not callable.
829 ValueError: If ``description`` is not a string.
830 """
831
832 # Check phases
833 if not phases:
834 raise ValueError("phases must not be empty or None")
835 if not isinstance(phases, frozenset):
836 phases = frozenset(phases) if isinstance(phases, Iterable) else frozenset([phases])
837 for p in phases:
838 if not isinstance(p, Phase):
839 raise TypeError(f"phase must be of type Phases, not {type(p)}")
840 self.phases = phases
841
842 # Check Rule
843 if condition is None and not hasattr(self, "condition"):
844 raise TypeError(
845 f"{self.__class__.__name__}.__init__() missing 1 required positional argument: 'condition'. "
846 "Alternatively the class must implement a `condition` function.")
847
848 if False and TYPE_CHECKING:
849 # Idea have type hint here
850 self.condition: Any = ...
851 """
852 The condition that determines if the rule's actions should be executed.
853
854 Simple Variant:
855 return True if the action should be executed, False otherwise.
856 if :py:attr:`.Rule.false_action` is defined, False will execute :py:attr:`.Rule.false_action`.
857
858 Advanced Variant:
859 return a :term:`Hashable` value that is used as key in the :py:attr:`actions` dict.
860 """
861 if condition is not None:
862 self.condition = condition
863 else:
864 self.condition = self.__class__.condition
865
866 if condition is not None:
867 if not hasattr(self, "condition"):
868 self.condition = condition
869 else:
870 # Warn if cls.condition and passes condition are different
871 self_func = getattr(self.condition,
872 "__func__",
873 getattr(self.condition, "func", self.condition))
874 if self_func != condition: # Compare method with function
875 logger.warning(
876 f"Warning 'condition' argument passed but class {self.__class__.__name__} "
877 "already implements a different function 'self.condition'. "
878 f"Overwriting {self.condition} with passed condition {getattr(condition, '__name__', str(condition))}. "
879 "This might lead to undesired results.")
880
881 # NOTE: IMPORTANT: self.condition = condition overwrites methods with functions
882 # To keep methods as methods the condition parameter is removed with
883 # `do_not_overwrite.append("condition")` used during __init_subclass__
884 # Could move this check also to here, however, it will then not be checked during class creation
885 self.condition = condition
886 # else: self.__class__.condition must be true already, which was checked in the subclass initialization
887
888 # Check Actions
889 if action is not None and actions is not None:
890 try:
891 if len(actions) == 1 and action is actions[True]:
892 logger.info(
893 "`action` and `actions` have been both been used when initializing %s. "
894 "Did you use `condition.register_action`? Then you can omit the action parameter.",
895 self)
896 action = None
897 else:
898 raise ValueError("Only one of 'action' and 'actions' can be set.")
899 except (KeyError, ValueError) as e:
900 raise ValueError(
901 "Either only one of 'action' and 'actions' can be set, or actions[True] must be the same as action "
902 "- other actions are currently not supported.") from e
903 if action is None and actions is None and not hasattr(self, "actions"):
904 # NOTE: the k in params check below is essential for this to work correctly.
905 raise TypeError(
906 f"{self.__class__.__name__}.__init__() arguments `action` and `actions` are both None. "
907 "Provide at least one argument alternatively the class must have an `actions` "
908 "attribute or an `action` function.")
909
910 if action is None:
911 if not isinstance(actions, Mapping):
912 raise TypeError(f"actions must be a Mapping, not {type(actions)}")
913
914 self.actions = actions
915 elif isinstance(action, Mapping):
916 self.actions = action
917 if false_action is not None or actions is not None:
918 raise ValueError("When passing a dict to action, false_action and actions must be None")
919 else:
920 # NOTE: Might overwrite actions attribute
921 self.actions = {} # type: ignore
922 if action is not None:
923 self.actions[True] = action
924 if false_action is not None:
925 self.actions[False] = false_action
926 # actions can be the dict of the class, we do not want the same dict instance
927 self.actions = dict(self.actions)
928
929 # Assure that method(self, ctx) like functions are accessible like them
930 for key, func in self.actions.items():
931 if not callable(func):
932 raise TypeError(f"Action for key {key} must be callable, not {type(func)}")
933 multiple_parameters = len(inspect.signature(func).parameters) >= 2
934 if multiple_parameters and getattr(func, "use_self", True):
935 # NOTE: could use types.MethodType
936 self.actions[key] = partial(func, self) # bind to self
937
938 # Check Description
939 if not isinstance(description, str):
940 raise TypeError(f"description must be of type str, not {type(description)}")
941 self.description = description
942 super().__init__(group or self.group, cooldown_reset_value, enabled) # or self.group for subclassing
943 self.priority: float | int | RulePriority = priority # used by agent.add_rule
944
945 self.overwrite_settings = overwrite_settings or {}
946 if not isinstance(self.overwrite_settings, dict):
947 # NOTE: If DictConfig only the outermost will be a dict,
948 # i.e. this could be dict[str, DictConfig]
949 self.overwrite_settings = dict(self.overwrite_settings)
950 if self_config and "self" in self.overwrite_settings and self.overwrite_settings["self"] != self_config:
951 logger.debug("Warning: self_config and self.overwrite_settings['self'] must be the same object.")
952
953 default_self_config = cast("RuleConfig", getattr(self, "self_config", getattr(self, "SelfConfig", {})))
954 if isclass(default_self_config):
955 if not is_dataclass(default_self_config):
956 logger.warning(
957 f"Class {self.__class__.__name__} has a self_config class that is not a dataclass. "
958 "This might lead to undesired results, i.e. missing keys in the config.")
959 default_self_config = default_self_config()
960 if not isinstance(default_self_config, DictConfig):
961 default_self_config = cast("RuleConfig", OmegaConf.create(default_self_config, flags={"allow_objects": True})) # type: ignore
962 if self_config:
963 self.self_config = cast("RuleConfig", OmegaConf.merge(default_self_config, self_config))
964 else:
965 self.self_config = default_self_config
966 assert self.self_config._get_flag("allow_objects"), "self_config must allow objects to be used as values." # pyright: ignore[reportPrivateUsage]
967
968 self.overwrite_settings["self"] = self.self_config
969 self.overwrite_settings["self"]["instance"] = self
970
971 # Called on subclass creation. Can be used for class API
972 def __init_subclass__(cls, _init_by_decorator: bool = False, metarule: bool = False):
973 """
974 Automatically creates a :python:`__init__` function to allow for a simple to use
975 class-interface to create rule classes.
976
977 By setting :python:`_auto_init_ = False` in the class definition, the automatic __init__
978 creation is disabled. Similarly, this is also the case if :python:`metarule=Rule` is used
979 for the class creation.
980 """
981 if hasattr(cls, "phases") and hasattr(cls, "phase") and cls.phases and cls.phase:
982 raise ValueError(f"Both 'phases' and 'phase' are set in class {cls.__name__}. Use only one. %s, %s" % (cls.phases, cls.phase))
983
984 for attr in cls._PROPERTY_MEMBERS:
985 if hasattr(cls, attr) and not hasattr(getattr(cls, attr), "__get__"):
986 raise ValueError(
987 f"Class {cls.__name__} has overwritten property {attr} with {getattr(cls, attr)}."
988 " You may only overwrite the following attributes with properties: {cls._PROPERTY_MEMBERS}."
989 "Did you mean `start_cooldown` or `cooldown_reset_value` instead of `cooldown`?")
990 if not cls._auto_init_ or metarule:
991 return
992
993 custom_init = cls.__dict__.get("__init__", False)
994
995 # Members that are not overwritten and passed as default arguments (None) into the __init__
996 do_not_overwrite = ["phases"]
997
998 if hasattr(cls, "condition"):
999 # Check if the condition should be treated as a method or function
1000 rule_func: ConditionFunctionLike[Self, ..., Hashable]
1001 if isinstance(cls.condition, ConditionFunction):
1002 rule_func = cls.condition.evaluation_function # pyright: ignore[reportUnknownMemberType, reportAssignmentType]
1003 else:
1004 rule_func = cls.condition
1005 if isinstance(rule_func, staticmethod):
1006 rule_func = rule_func.__func__
1007
1008 # Decide method(self, ctx) vs. function(ctx)
1009 if hasattr(cls.condition, "use_self") and cls.condition.use_self is not None: # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportFunctionMemberAccess, reportAttributeAccessIssue]
1010 condition_as_method: bool = cls.condition.use_self # type: ignore[attr-defined]
1011 else:
1012 params = len(inspect.signature(rule_func).parameters)
1013 if params >= 2:
1014 if params > 2:
1015 logger.warning(f"Rule {getattr(cls.condition, '__name__', cls.condition)}" # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
1016 "has more than 2 parameters. "
1017 "Treating it as a method(self, ctx, **kwargs) with self argument!"
1018 "To avoid this message or to use it as a function use"
1019 "ConditionFunction(use_self=True|False) explicitly.")
1020 condition_as_method = True
1021 else:
1022 condition_as_method = False
1023 if condition_as_method:
1024 logger.debug("Implementing %s as method(self, ctx) - "
1025 "If you need it as a function(ctx, *args) decorate it with "
1026 "@ConditionFunction(use_self=False).",
1027 getattr(cls.condition, '__name__', cls.condition)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
1028 do_not_overwrite.append("condition")
1029 else:
1030 # If the signature has only one parameter its a function; else its user decision.
1031 logger.debug("Implementing %s as function(ctx) without a self argument "
1032 "- If you need it as a method(self, ctx, *args) decorate it with "
1033 "@ConditionFunction(use_self=True).",
1034 getattr(cls.condition, '__name__', cls.condition)) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
1035
1036 # Actions provided by condition.actions, e.g. ConditionFunction.register_action
1037 if hasattr(cls.condition, "actions") and cls.condition.actions: # type: ignore[attr-defined]
1038 if hasattr(cls, "actions") and cls.actions:
1039 raise ValueError(f"Class {cls.__name__} already has an 'actions' attribute. "
1040 "It will be overwritten by the 'actions' attribute of the condition."
1041 " This is the case if ConditionFunction.register_action has been used.")
1042 cls.actions = cls.condition.actions # type: ignore[attr-defined]
1043
1044 if not hasattr(cls, "description"):
1045 cls.description = cls.__doc__ or "No description provided."
1046
1047 if hasattr(cls, "self_config"):
1048 do_not_overwrite.append("self_config")
1049
1050 # Create a __init__ function that sets some of the parameters.
1051 # find overlapping parameters
1052 params = inspect.signature(cls.__init__).parameters
1053
1054 # TODO: to be lazy Rule arguments need to be added to params so that "k in params" works
1055
1056 def partial_init(self: Self, phases: Optional[Iterable[Phase]] = None, *args, **kwargs: Rule._InitParameters): # pyright: ignore[reportMissingParameterType, reportUnknownParameterType]
1057 # Need phases as first argument
1058 cls_phases = getattr(cls, "phases", None) # allow for both wordings
1059 cls_phase = getattr(cls, "phase", None)
1060 if _init_by_decorator:
1061 # Using @Rule phases as first argument is the class
1062 phases = cls_phases or cls_phase
1063 else:
1064 if phases is None: # NOTE: Could be Phase.NONE
1065 phases = cls_phases or cls_phase
1066 if phases is None:
1067 raise ValueError(f"`phases` or `phase` must be provided for class {cls.__name__}")
1068 # Removing condition to not overwrite it
1069 kwargs.update({k: v for k, v in cls.__dict__.items() if k in params and k not in do_not_overwrite}) # type: ignore
1070 try:
1071 if custom_init:
1072 custom_init(self, phases, *args, **kwargs)
1073 else:
1074 # note: could be single dispatch function
1075 # super will not be _GroupRule but at least Rule
1076 super(cls, self).__init__(phases, *args, **kwargs) # type: ignore[arg-type]
1077 except IndexError: # functools <= python3.10
1078 logger.error("\nError in __init__ of %s. Possible reason: Check if the __init__ method has the correct signature. `phases` must be a positional argument.\n", cls.__name__)
1079 raise
1080 except TypeError as e:
1081 # e.g. forgot a action, or rules attribute (MultiRule)
1082 if "missing" in str(e):
1083 logger.error("Class %s has likely missing attributes that cannot be passed to init. Check if all required attributes are set in the class definition.", cls.__name__)
1084 raise
1085 update_wrapper(partial_init, cls.__init__) # pyright: ignore[reportUnknownArgumentType]
1086 cls.__init__ = partial_init # pyright: ignore[reportIncompatibleMethodOverride, reportAttributeAccessIssue]
1087
1088 @__init__.register(_CountdownRule)
1089 @__init__.register(type)
1090 def __init_by_decorating_class(self, cls: "Rule"): # pyright: ignore[reportUnusedFunction]
1091 """
1092 Initialize by passing a Rule or class object to the __init__ method.
1093
1094 This allows the usage of the @Rule decorator and easy copying.
1095 """
1096 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for spelling mistake
1097 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1098
1099 # TODO: Automate this via inspect.signature or TypedDict of the __init__ signature
1100 self.__init__(phases,
1101 cls.condition,
1102 getattr(cls, "action", None),
1103 getattr(cls, "false_action", None),
1104 actions=getattr(cls, "actions", None),
1105 description=cls.description,
1106 overwrite_settings=getattr(cls, "overwrite_settings", None),
1107 self_config=getattr(cls, "self_config", None),
1108 priority=getattr(cls, "priority", RulePriority.NORMAL),
1109 cooldown_reset_value=cooldown_reset_value,
1110 group=getattr(cls, "group", None),
1111 enabled=getattr(cls, "enabled", True))
1112
1113 @__init__.register(Mapping)
1114 def __init_from_mapping(self, source: "_InitParametersComplete"): # pyright: ignore[reportUnusedFunction]
1115 # NOTE: This is weakly tested and not much supported.
1116 condition = source.get("condition", None)
1117 if condition is not None:
1118 condition = getattr(self, "condition", None)
1119 self.__init__(source.get("phases", source.get("phase")),
1120 condition,
1121 source.get("action"),
1122 source.get("false_action"),
1123 actions=source.get("actions"),
1124 description=source.get("description", self.description),
1125 overwrite_settings=source.get("overwrite_settings"),
1126 self_config=source.get("self_config"),
1127 priority=source.get("priority", RulePriority.NORMAL),
1128 cooldown_reset_value=source.get("cooldown_reset_value"),
1129 group=source.get("group"),
1130 enabled=source.get("enabled", True))
1131
1132 # -----------------------
1133
1134 @classmethod
1135 def _get_init_signature(cls):
1136 """
1137 Get the signature of the __init__ function.
1138
1139 Returns:
1140 The signature of the __init__ function.
1141
1142 :meta private:
1143 """
1144 return inspect.signature(cls.__init__).parameters.keys()
1145
[docs]
1146 def execute_phase(self,
1147 phase: Phase, *,
1148 prior_results: Any = None,
1149 update_controls: Optional[carla.VehicleControl] = None):
1150 """
1151 Attention:
1152 - **Use with care to avoid loops or recursions.**
1153
1154 - If a :py:class:`.Context` is available prefer using
1155 :py:meth:`ctx.agent.execute_phase <.LunaticAgent.execute_phase>` instead.
1156
1157 Helper function to execute a phase from within a rule,
1158 wrapper of :py:meth:`agents.lunatic_agent.LunaticAgent.execute_phase`.
1159
1160 Warns:
1161 ReferenceError: If the weak proxy pointing to the :py:class:`Context` object has been
1162 deleted. The phase will not be executed.
1163
1164 See Also:
1165 - :py:meth:`.LunaticAgent.execute_phase`
1166 """
1167 try:
1168 self._ctx.agent.execute_phase(phase=phase, # pyright: ignore[reportOptionalMemberAccess]
1169 prior_results=prior_results,
1170 update_controls=update_controls)
1171 except ReferenceError:
1172 logger.error("ReferenceError in Rule.execute_phase. Weakproxy deleted.")
1173
1174 # -----------------------
1175 # Evaluation functions
1176 # -----------------------
1177
1178 @_use_temporary_config
1179 def evaluate(self, ctx: Context,
1180 overwrite: Optional[Dict[str, Any]] = None # pylint: ignore=unused-argument # noqa: ARG002
1181 ) -> Union[bool, Hashable, "Literal[RuleResult.NO_RESULT]"]:
1182 """
1183 Note:
1184 This is an **interface** function of rules that is executed during :py:meth:`__call__`.
1185 This method does not check if a rule is applicable.
1186 i.e. if the rule is in the correct phase of if it py:meth:`is_ready`
1187 this is done in :py:meth:`__call__`.
1188
1189 Meta rules with children should overwrite this method and call
1190 :py:meth:`evaluate_children` from here.
1191
1192 Executes the :py:attr:`condition` function of the rule.
1193 The decorator automatically takes care of temporarily setting the :py:attr:`overwrite_settings`.
1194
1195 Parameters:
1196 ctx: The context object that is passed to the condition function.
1197 overwrite: A dictionary of settings that will overwrite the agent's setting for this Rule.
1198 Used by the :code:`@_use_temporary_config` decorator.
1199
1200 :meta private:
1201 """
1202 self._ctx = proxy(ctx) # use with care and access over function
1203 return self.condition(ctx) # pyright: ignore[reportCallIssue]
1204
[docs]
1205 def evaluate_children(self, ctx: Context) -> "NoReturn": # pylint: disable=unused-argument
1206 """
1207 Not implemented for this rule class.
1208
1209 Note:
1210 This is an **interface** function of meta rules
1211 that is executed during :py:meth:`__call__` to call further rules.
1212 """
1213 raise NotImplementedError("This method should be implemented in a subclass")
1214
[docs]
1215 def __call__(self,
1216 ctx: Context,
1217 overwrite: Optional[Dict[str, Any]] = None,
1218 *,
1219 ignore_phase: bool = False,
1220 ignore_cooldown: bool = False) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]:
1221 """
1222 1. First checks if the rule is *applicable*, i.e. is its :py:attr:`cooldown == 0 <cooldown>`,
1223 if not returns :py:attr:`NOT_APPLICABLE`.
1224 2. Afterwards evaluates the rules :py:meth:`condition` function.
1225 - if the result is not in :py:attr:`actions` returns :py:attr:`NOT_APPLICABLE`.
1226 - otherwise merges the :py:attr:`overwrite_settings` with the py:attr:`.Context.config` and executes the action.
1227
1228 Parameters:
1229 ctx: The context object that is passed to the condition function.
1230 overwrite: Extends :py:attr:`overwrite_settings` for this call only. Defaults to :code:`None`.
1231 ignore_phase: If :python:`True` the phase check is skipped. Defaults to :code:`False`.
1232 ignore_cooldown: If True the cooldown check is skipped. Defaults to :code:`False`.
1233 """
1234 # Check phase
1235 assert ignore_phase or ctx.agent.current_phase in self.phases
1236
1237 if not self.is_ready() and not ignore_cooldown:
1238 return RuleResult.NOT_APPLICABLE
1239 if not ignore_phase and ctx.agent.current_phase not in self.phases: # NOTE: This is currently never False as checked in execute_phase and the agents dictionary.
1240 return RuleResult.NOT_APPLICABLE # not applicable for this phase
1241
1242 exception = None
1243 result = Rule.NO_RESULT
1244 try:
1245 result = self.evaluate(ctx, overwrite)
1246 except BaseException as e:
1247 exception = e
1248 else:
1249 ctx.evaluation_results[ctx.agent.current_phase] = result
1250 if result in self.actions:
1251 self.reset_cooldown()
1252 # Apply overwrite settings permanently
1253 ctx.config.merge_with(self.overwrite_settings)
1254 if overwrite:
1255 ctx.config.merge_with(overwrite)
1256
1257 action_result = self.actions[result](ctx) # todo allow priority, random chance # pyright: ignore[reportCallIssue]
1258 ctx.action_results[ctx.agent.current_phase] = action_result
1259 return action_result
1260 return RuleResult.NOT_APPLICABLE # No action was executed
1261 finally:
1262 self._ctx = None
1263 if exception:
1264 self.reset_cooldown() #
1265 raise exception
1266
1267 def __str__(self) -> str:
1268 try:
1269 if isinstance(self.condition, partial):
1270 return (self.__class__.__name__
1271 + f"(description='{self.description}', "
1272 f"phases={self.phases}, group={self.group}, "
1273 f"priority={self.priority}, "
1274 f"actions={self.actions}, "
1275 f"condition={self.condition.func}, " # pyright: ignore[reportUnknownMemberType]
1276 f"cooldown={self.cooldown})"
1277 )
1278 return (self.__class__.__name__
1279 + f"(description='{self.description}', "
1280 f"phases={self.phases}, group={self.group}, "
1281 f"priority={self.priority}, "
1282 f"actions={self.actions}, "
1283 f"condition={self.condition.__name__}, "
1284 f"cooldown={self.cooldown})"
1285 )
1286 except AttributeError as e:
1287 logger.info("Error during string creation: " + str(e))
1288 return (self.__class__.__name__
1289 + "(Error in condition.__str__: Rule has not been initialized correctly. "
1290 "Missing attributes: " + str(e) + ")")
1291
1292 def __repr__(self) -> str:
1293 return str(self)
1294
1295
[docs]
1296class MultiRule(Rule, metarule=True):
1297 """
1298 This metarule allows to execute one or multiple rules if it is applicable.
1299 Depending on :py:attr:`execute_all_rules` it will either execute all rules or only the first
1300 applicable rule from its :py:attr:`rules` list.
1301 """
1302
1303 rules: List[Rule]
1304 """The list of child rules to be called if this rule's condition is true."""
1305
1306 def _wrap_action(self, action: Callable[[Context], Any]):
1307 """
1308 Wrap the passed action.
1309 First the action is executed afterwards the child rules are evaluated.
1310
1311 Note:
1312 There is no extra condition that is checked between the two actions.
1313 """
1314 @wraps(action)
1315 def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any:
1316 try:
1317 result = action(ctx, *args, **kwargs)
1318 except DoNotEvaluateChildRules as e:
1319 return e, None
1320 else:
1321 results = self.evaluate_children(ctx) # execute given rules as well
1322 return result, results
1323 return wrapper
1324
1325 if TYPE_CHECKING:
1326 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage,reportGeneralTypeIssues]
1327 rules: Required[List[Rule]]
1328 sort_rules: NotRequired[bool]
1329 execute_all_rules: NotRequired[bool]
1330
1331 @singledispatchmethod
1332 def __init__(self,
1333 phases: Union[Phase, Iterable[Phase]],
1334 #/, # phases must be positional; python3.8+ only
1335 rules: List[Rule],
1336 condition: Optional[ConditionFunctionLikeT] = None,
1337 *,
1338 description: str = "If its own condition is true calls the passed rules.",
1339 priority: RulePriority = RulePriority.NORMAL,
1340 sort_rules: bool = True,
1341 execute_all_rules: bool = False,
1342 action: Optional[Callable[[Context], Any]] = None,
1343 ignore_phase: bool = True,
1344 overwrite_settings: Optional[Dict[str, Any]] = None,
1345 self_config: Optional[Dict[str, Any]] = None,
1346 cooldown_reset_value: Optional[int] = None,
1347 group: Optional[str] = None,
1348 enabled: bool = True,
1349 ):
1350 """
1351 Initializes a Rule object that can have further rules as children.
1352
1353 Args:
1354 phases (Union[Phase, Iterable]): The phase or phases in which the rule should be active.
1355 rules (List[Rule]): The list of child rules to be called if the rule's condition is true.
1356 condition (Callable[[Context]], optional): The condition that determines if the rules should be evaluated. Defaults to :py:func:`always_execute`.
1357 execute_all_rules (bool, optional):
1358 If False will only execute the first rule with a applicable condition, i.e. this MultiRule is like a node in a decision tree.
1359 If True all rules are evaluated, unless one raises a `DoNotEvaluateChildRules` exception.
1360 Defaults to :python:`False`.
1361 sort_rules (bool, optional): Flag indicating whether to sort the rules by priority. Defaults to :python:`True`.
1362 action (Callable[[Context]], optional): The action to be executed before the passed rules are evaluated. Defaults to :python:`None`.
1363 ignore_phase (bool, optional): Flag indicating whether to ignore the Phase of the passed child rules. Defaults to :python:`True`.
1364 overwrite_settings (Dict[str, Any], optional): Additional settings to overwrite the agent's settings. Defaults to :python:`None`.
1365 priority (RulePriority, optional): The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`.
1366 description (str, optional): The description of the rule. Defaults to :python:`"If its own rule is true calls the passed rules."`.
1367 group (str | None, optional): The group name of the rule. Defaults to :python:`None`.
1368 enabled (bool, optional): Flag indicating whether the rule is enabled after creation. Defaults to :python:`True`.
1369 """
1370 self.ignore_phase = ignore_phase
1371 if rules is None: # type: ignore
1372 logger.warning("Warning: No rules passed to %s: %s. You can still add rules to the rules attribute later.", self.__class__.mro()[1].__name__, self.__class__.__name__)
1373 rules = []
1374 if len(rules) == 0:
1375 # NOTE: This will be logged twice, once more by
1376 logger.warning("Rules list is empty. %s will always return NOT_APPLICABLE.", self.__class__.__name__)
1377 self.rules = rules
1378 self.execute_all_rules = execute_all_rules
1379 if sort_rules:
1380 self.rules.sort(key=lambda r: r.priority, reverse=True)
1381 # if an action is passed to be executed before the passed rules it is wrapped to execute both
1382 if action is not None:
1383 action = self._wrap_action(action)
1384 else:
1385 action = self.evaluate_children # will be called elsewhere
1386 if condition is None and not hasattr(self, "condition"):
1387 condition_arg = always_execute
1388 else:
1389 condition_arg = condition
1390 super().__init__(phases,
1391 condition=condition_arg,
1392 action=action,
1393 description=description,
1394 overwrite_settings=overwrite_settings,
1395 self_config=self_config,
1396 priority=priority,
1397 cooldown_reset_value=cooldown_reset_value,
1398 enabled=enabled,
1399 group=group)
1400
1401 @__init__.register(_CountdownRule) # For similar_rule = Rule(some_rule), easier cloning
1402 @__init__.register(type) # For @Rule class MyRule: ...
1403 def __init_by_decorating_class(self, cls: "MultiRule"): # pyright: ignore[reportUnusedFunction]
1404 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow both
1405 cooldown_reset_value = getattr(cls, "cooldown_reset_value",
1406 getattr(cls, "max_cooldown", None))
1407 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided."
1408 self.__init__(phases, cls.rules,
1409 cls.condition, # type: ignore
1410 description=cls.description,
1411 overwrite_settings=getattr(cls, "overwrite_settings", None),
1412 self_config=getattr(cls, "self_config", None),
1413 priority=getattr(cls, "priority", RulePriority.NORMAL),
1414 cooldown_reset_value=cooldown_reset_value, group=getattr(cls, "group", None),
1415 enabled=getattr(cls, "enabled", True),
1416 sort_rules_by_priority=getattr(cls, "sort_rules_by_priority", True),
1417 execute_all_rules=getattr(cls, "execute_all_rules", False),
1418 prior_action=getattr(cls, "prior_action", None),
1419 ignore_phase=getattr(cls, "ignore_phase", True))
1420
1421 @__init__.register(Mapping)
1422 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction]
1423 self.__init__(cls.get("phases", cls.get("phase")),
1424 cls["rules"],
1425 cls.get("condition"),
1426 description=cls.get("description", self.description),
1427 overwrite_settings=cls.get("overwrite_settings"),
1428 self_config=cls.get("self_config"),
1429 priority=cls.get("priority", RulePriority.NORMAL),
1430 cooldown_reset_value=cls.get("cooldown_reset_value"),
1431 group=cls.get("group"),
1432 enabled=cls.get("enabled", True),
1433 sort_rules_by_priority=cls.get("sort_rules_by_priority", True),
1434 execute_all_rules=cls.get("execute_all_rules", False),
1435 prior_action=cls.get("prior_action", None),
1436 ignore_phase=cls.get("ignore_phase", True))
1437
[docs]
1438 def evaluate_children(self, ctx: Context) -> Union[List[Any], Any]: # pyright: ignore[reportIncompatibleMethodOverride]
1439 """
1440 Evaluates the children rules of the current rule in the given context.
1441
1442 Args:
1443 ctx : The context in which the child rules are evaluated.
1444
1445 Returns:
1446 The results of evaluating the children rules.
1447 Returns a list of results if execute_all_rules is True,
1448 otherwise the result of the first rule that was applied.
1449 """
1450 results: List[Any] = []
1451 for rule in self.rules:
1452 try:
1453 result = rule(ctx, ignore_phase=self.ignore_phase)
1454 except DoNotEvaluateChildRules:
1455 return results
1456 if not self.execute_all_rules and result is not Rule.NOT_APPLICABLE: # one rule was applied end.
1457 return result
1458 results.append(result)
1459 if not self.execute_all_rules:
1460 return Rule.NOT_APPLICABLE # does not expect a list
1461 return results
1462
1463
[docs]
1464class RandomRule(MultiRule, metarule=True):
1465 """
1466 A rule that selects and evaluates one or more random child rules from a set of rules.
1467
1468 Args:
1469 phases : The phase or phases in which the rule is applicable.
1470 rules : The set of rules from which to select random child rules.
1471 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`.
1472 condition :
1473 A callable that determines if the rule is applicable in a given context.
1474 If None and the rule does not implement a :py:attr:`condition` attribute the rule always executes.
1475 Defaults to :python:`None`.
1476 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`.
1477 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`.
1478 priority : The priority of the rule. :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`.
1479 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`.
1480 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`.
1481 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`.
1482 group : The group to which the rule belongs. Defaults to :python:`None`.
1483 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`.
1484 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`.
1485
1486 Raises:
1487 ValueError: When passing **rules** as a dict with weights, the **weights** argument must be None.
1488 """
1489
1490 # TODO: add a dummy attribute for one additional weight, that skips the evaluation. Should only considered once.
1491
1492 if TYPE_CHECKING:
1493 class _InitParameters(MultiRule._InitParameters): # pyright: ignore[reportPrivateUsage]
1494 repeat_if_not_applicable: NotRequired[bool]
1495 weights: NotRequired[Optional[List[float]]]
1496
1497 @singledispatchmethod
1498 def __init__(self,
1499 phases: Union[Phase, Iterable[Phase]], # /, # phases must be positional; python3.8+ only
1500 rules: Union[Dict[Rule, float], List[Rule]],
1501 repeat_if_not_applicable: bool = True,
1502 condition: Optional[Callable[[Context], Any]] = None,
1503 *,
1504 action: Optional[Callable[[Context], Any]] = None,
1505 ignore_phase: bool = True,
1506 priority: RulePriority = RulePriority.NORMAL,
1507 description: str = "If its own condition is true calls one or more random child rules from the passed rules.",
1508 overwrite_settings: Optional[Dict[str, Any]] = None,
1509 self_config: Optional[Dict[str, Any]] = None,
1510 cooldown_reset_value: Optional[int] = None,
1511 group: Optional[str] = None,
1512 enabled: bool = True,
1513 weights: Optional[List[float]] = None
1514 ):
1515 """
1516 Initializes a Rule object that can trigger one or more random child rules.
1517
1518 Args:
1519 phases : The phase or phases in which the rule is applicable.
1520 rules : The set of rules from which to select random child rules.
1521 repeat_if_not_applicable : If False, only one rule will be evaluated even if it is not applicable. Defaults to :python:`True`.
1522 condition :
1523 A callable that determines if the rule is applicable in a given context.
1524 If None and the rule does not implement a `condition` attribute the rule always executes.
1525 Defaults to :python:`None`.
1526 action : A callable that defines the action to be performed when the rule is applicable. Defaults to :python:`None`.
1527 ignore_phase : If True, the rule will be evaluated even if it is not in the specified phase. Defaults to :python:`True`.
1528 priority : The priority of the rule. Defaults to :py:attr:`RulePriority.NORMAL <classes.constants.RulePriority.NORMAL>`.
1529 description : A description of the rule. Defaults to :python:`"If its own condition is true calls one or more random child rules from the passed rules."`.
1530 overwrite_settings : A dictionary of settings to overwrite the default settings of the rule. Defaults to :python:`None`.
1531 cooldown_reset_value : The value to reset the cooldown of the rule. Defaults to :python:`None`.
1532 group : The group to which the rule belongs. Defaults to :python:`None`.
1533 enabled : If False, the rule will not be evaluated. Defaults to :python:`True`.
1534 weights : The weights associated with each rule when selecting random child rules. Defaults to :python:`None`.
1535 """
1536 if isinstance(rules, dict):
1537 if weights is not None:
1538 raise ValueError("When passing rules as a dict with weights, the weights argument must be None")
1539 self.weights = list(accumulate(rules.values())) # cumulative weights for random.choices are more efficient
1540 self.rules = list(rules.keys())
1541 else:
1542 self.weights = weights or list(accumulate(r.priority for r in rules))
1543 self.rules = rules
1544 self.repeat_if_not_applicable = repeat_if_not_applicable
1545 super().__init__(phases, rules, condition=condition, action=action, description=description, priority=priority, ignore_phase=ignore_phase, overwrite_settings=overwrite_settings, self_config=self_config, cooldown_reset_value=cooldown_reset_value, enabled=enabled, group=group)
1546
1547 @__init__.register(_CountdownRule)
1548 @__init__.register(type)
1549 def __init_by_decorating_class(self, cls: "RandomRule"): # pyright: ignore[reportUnusedFunction]
1550 phases = getattr(cls, "phases", getattr(cls, "phase", None))
1551 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1552
1553 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided."
1554 self.__init__(phases, cls.rules, repeat_if_not_applicable=cls.repeat_if_not_applicable,
1555 condition=cls.condition,
1556 description=cls.description,
1557 overwrite_settings=getattr(cls, "overwrite_settings", None),
1558 self_config=getattr(cls, "self_config", None),
1559 priority=getattr(cls, "priority", RulePriority.NORMAL),
1560 cooldown_reset_value=cooldown_reset_value, group=getattr(cls, "group", None),
1561 enabled=getattr(cls, "enabled", True),
1562 prior_action=getattr(cls, "prior_action", None),
1563 ignore_phase=getattr(cls, "ignore_phase", True))
1564
1565 @__init__.register(Mapping)
1566 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction]
1567 self.__init__(cls.get("phases", cls.get("phase")),
1568 cls["rules"],
1569 repeat_if_not_applicable=cls.get("repeat_if_not_applicable", True),
1570 condition=cls.get("condition"),
1571 description=cls.get("description", self.description),
1572 overwrite_settings=cls.get("overwrite_settings"),
1573 self_config=cls.get("self_config"),
1574 priority=cls.get("priority", RulePriority.NORMAL),
1575 cooldown_reset_value=cls.get("cooldown_reset_value"),
1576 group=cls.get("group"),
1577 enabled=cls.get("enabled", True),
1578 sort_rules_by_priority=cls.get("sort_rules_by_priority", True),
1579 execute_all_rules=cls.get("execute_all_rules", False),
1580 prior_action=cls.get("prior_action", None),
1581 ignore_phase=cls.get("ignore_phase", True))
1582
[docs]
1583 def evaluate_children(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None) -> Any:
1584 """
1585 Evaluate a random child rule.
1586 If `self.repeat_if_not_applicable=False` and the randomly chosen rule is not applicable,
1587 then no further rules are evaluated
1588 For `self.repeat_if_not_applicable=False` possible rules are evaluated in a random fashion
1589 until one rule was applicable.
1590 """
1591 if len(self.rules) == 0:
1592 logger.warning("No rules to evaluate in %s. Returning NOT_APPLICABLE.", self.__class__.__name__)
1593 return RuleResult.NOT_APPLICABLE
1594 if self.repeat_if_not_applicable:
1595 rules = self.rules.copy()
1596 weights = self.weights.copy()
1597 else:
1598 rules = self.rules
1599 weights = self.weights
1600
1601 result = RuleResult.NOT_APPLICABLE
1602 while rules:
1603 rule = random.choices(rules, cum_weights=weights, k=1)[0]
1604 try:
1605 result = rule(ctx, overwrite, ignore_phase=self.ignore_phase) # NOTE: Context action/evaluation results only store result of LAST rule
1606 except DoNotEvaluateChildRules:
1607 return None
1608 if not self.repeat_if_not_applicable or result is not Rule.NOT_APPLICABLE:
1609 # break after the first rule of `self.repeat_if_not_applicable=False`
1610 # else break if it was applicable.
1611 break
1612 rules.remove(rule)
1613 weights = list(accumulate(r.priority for r in rules))
1614 return result
1615
1616
[docs]
1617class BlockingRule(Rule, metarule=True):
1618 """
1619 This meta rule allows to define rules that are able to takeover the agent's workflow and
1620 apply the :py:class:`.VehicleControl` directly from withhin the rule.
1621
1622 """
1623
1624 _gameframework: ClassVar[Union["GameFramework", "CallableProxyType[GameFramework]", None]] = None
1625 """
1626 Set when a :py:class:`GameFramework` is initialized.
1627 Alternatively can be set when any BlockingRule is created.
1628 """
1629
1630 ticks_passed: int
1631 """Count how many ticks have been performed by this rule and blocked the agent."""
1632
1633 MAX_TICKS = 5000 # 5000 * 1/20 = 250 seconds
1634 """
1635 The amount of ticks that can be performed by this rule before it is automatically disabled.
1636 If the rule has looped for this amount of ticks if will then call :py:attr:`max_tick_callback`
1637 and raise an :py:exc:`.UnblockRuleException` afterwards.
1638
1639 As a hack :py:attr:`max_tick_callback` can change :py:attr:`ticks_passed` to prevent the
1640 exception and continue the rule.
1641 """
1642
1643 max_tick_callback: Optional[Callable[[Self, Context], Any]] = None
1644 """
1645 An optional callback that is executed when :py:attr:`ticks_passed` reaches :py:attr:`MAX_TICKS`.
1646 """
1647
1648 if TYPE_CHECKING:
1649 class _InitParameters(Rule._InitParameters): # pyright: ignore[reportPrivateUsage]
1650 gameframework: Required[Optional[GameFramework]]
1651
1652 @singledispatchmethod
1653 def __init__(self,
1654 phases: Union[Phase, Iterable[Phase]], # iterable of Phases
1655 #/, # phases must be positional; python3.8+ only
1656 condition: Optional[ConditionFunctionLikeT] = None,
1657 action: Optional[Union[CallableAction[Self, []],
1658 Dict[Any, CallableAction[Self, []]]]] = None,
1659 false_action: Optional[CallableAction[Self, []]] = None,
1660 *,
1661 gameframework: Optional[GameFramework],
1662 actions: Optional[Dict[Any, CallableAction[Rule, []]]] = None,
1663 description: str = "What does this rule do?",
1664 overwrite_settings: Optional[Dict[str, Any]] = None,
1665 self_config: Optional[Dict[str, Any]] = None,
1666 priority: RulePriority = RulePriority.NORMAL,
1667 cooldown_reset_value: Optional[int] = None,
1668 group: Optional[str] = None,
1669 enabled: bool = True,
1670 ):
1671 super().__init__(phases, condition, action, false_action,
1672 actions=actions,
1673 description=description, overwrite_settings=overwrite_settings,
1674 self_config=self_config,
1675 priority=priority,
1676 cooldown_reset_value=cooldown_reset_value,
1677 group=group,
1678 enabled=enabled)
1679 if gameframework:
1680 BlockingRule._gameframework = gameframework
1681 if not GameFramework.clock or not GameFramework.display:
1682 # Not much we can do about it
1683 #logger.info("%s : GameFramework should be initialized before using this rule.", self.__class__.__name__)
1684 GameFramework.init_pygame()
1685 self.ticks_passed = 0
1686
1687 @__init__.register(_CountdownRule)
1688 @__init__.register(type)
1689 def __init_by_decorating_class(self, cls: "BlockingRule"): # pyright: ignore[reportUnusedFunction]
1690 """
1691 Initialize by passing a Rule or class object to the __init__ method.
1692
1693 This allows the usage of the @Rule decorator and easy copying.
1694 """
1695 phases = getattr(cls, "phases", getattr(cls, "phase", None)) # allow for both
1696 cooldown_reset_value = getattr(cls, "cooldown_reset_value", getattr(cls, "max_cooldown", None))
1697 assert hasattr(cls, "condition"), f"Class {cls} has no condition attribute. It must be provided."
1698 self.__init__(phases,
1699 cls.condition, # type: ignore
1700 action=getattr(cls, "action", None), false_action=getattr(cls, "false_action", None),
1701 gameframework=getattr(cls, "gameframework", BlockingRule._gameframework),
1702 actions=getattr(cls, "actions", None), description=cls.description,
1703 overwrite_settings=getattr(cls, "overwrite_settings", None),
1704 self_config=getattr(cls, "self_config", None),
1705 priority=getattr(cls, "priority", RulePriority.NORMAL),
1706 cooldown_reset_value=cooldown_reset_value,
1707 group=getattr(cls, "group", None), enabled=getattr(cls, "enabled", True))
1708
1709 @__init__.register(Mapping)
1710 def __init_from_mapping(self, cls: "_InitParameters"): # pyright: ignore[reportUnusedFunction]
1711 super().__init__(cls.get("phases", cls.get("phase")), **cls)
1712
1713 def _render_everything(self, ctx: Context):
1714 if self._gameframework:
1715 self._gameframework.render_everything()
1716 else:
1717 world_model = ctx.agent._world_model # pyright: ignore[reportPrivateUsage]
1718 display = GameFramework.display
1719 world_model.tick(GameFramework.clock) # does not tick the world!
1720 world_model.render(display, finalize=False)
1721 try:
1722 world_model.controller.render(display) # type: ignore[attr-defined] # noqa: SIM105
1723 except AttributeError:
1724 pass
1725
1726 dm_render_conf = world_model._args.camera.hud.detection_matrix # pyright: ignore[reportPrivateUsage]
1727 if dm_render_conf and ctx.agent:
1728 ctx.agent.render_detection_matrix(display, **dm_render_conf) # pyright: ignore[reportPrivateUsage]
1729 world_model.finalize_render(display)
1730 pygame.display.flip()
1731
1732 @overload
1733 def loop_agent(self, ctx: Context, control: Optional[carla.VehicleControl] = None,
1734 *, execute_planner: Literal[True], execute_phases: Any) -> carla.VehicleControl:
1735 ...
1736
1737 @overload
1738 def loop_agent(self, ctx: Context, control: Optional[carla.VehicleControl] = None,
1739 *, execute_planner: Literal[False], execute_phases: Any) -> None:
1740 ...
1741
[docs]
1742 def loop_agent(self, ctx: Context, control: Optional[carla.VehicleControl] = None,
1743 *, execute_planner: bool, execute_phases: Any = True) -> "carla.VehicleControl | None":
1744 """
1745 A combination of `LunaticAgent.parse_keyboard_input`, `LunaticAgent.apply_control`, `BlockingRule.update_world`,
1746 and `Context.get_or_calculate_control` to advance agent and world.
1747
1748 Args:
1749 ctx (Context): The current context object
1750 control (Optional[carla.VehicleControl], optional): The control to apply; will overwrite the context's control.
1751 If None takes the context's control.
1752 Defaults to :python:`None`.
1753
1754 See Also:
1755 Executes the following methods:
1756
1757 1. :py:meth:`.LunaticAgent.parse_keyboard_input`
1758 2. :py:meth:`.LunaticAgent.apply_control`
1759 3. :py:meth:`.BlockingRule.update_world` ticks the :py:class:`carla.World` and renders everything.
1760 4. :py:meth:`.Context.get_or_calculate_control` to acquire the :py:class:`carla.VehicleControl` object.
1761 """
1762 ctx.agent.parse_keyboard_input(control=control) # NOTE: if skipped the user has no option to stop the agent
1763 ctx.agent.apply_control(control)
1764
1765 # NOTE: This ticks the world forward by one step
1766 # The ctx.control is reset to None; execute_plan
1767 # > Phase.UPDATE_INFORMATION | Phase.BEGIN
1768 self.update_world(ctx, execute_phases=execute_phases)
1769 if execute_planner:
1770 return ctx.get_or_calculate_control()
1771 return None
1772
[docs]
1773 @staticmethod
1774 def get_world() -> carla.World:
1775 """Method to access the world object"""
1776 return CarlaDataProvider.get_world()
1777
1778 def _begin_tick(self, ctx: Context):
1779 self._gameframework.clock.tick() # self.args.fps) # type: ignore[attr-defined]
1780 frame = None
1781 if self._gameframework and self._gameframework._args.handle_ticks: # i.e. no scenario runner doing it for us
1782 if CarlaDataProvider.is_sync_mode():
1783 frame = self.get_world().tick()
1784 else:
1785 frame = self.get_world().wait_for_tick().frame
1786 CarlaDataProvider.on_carla_tick()
1787 else:
1788 # CRITICAL: The framework expects a return value before it ticks the world,
1789 # however the blocking rules should take over the ticks; so it should still do this!
1790 # TODO: Should implement some return <-> pass trough mechanism; maybe implement a
1791 # generator-like variant of the rule.
1792 if CarlaDataProvider.is_sync_mode():
1793 frame = self.get_world().tick()
1794 else:
1795 frame = self.get_world().wait_for_tick().frame
1796 CarlaDataProvider.on_carla_tick()
1797
1798 if CarlaDataProvider.is_sync_mode():
1799 # We do this only in sync mode as frames could pass between gathering this information
1800 # and an agent calling InformationManager.tick(), which in turn calls global_tick
1801 # with possibly a DIFFERENT frame wasting computation.
1802 if frame is None:
1803 frame = self.get_world().get_snapshot().frame
1804 InformationManager.global_tick(frame)
1805
1806 # Tick world and render everything
1807 # control should be reset, we are at the "start of the tick again"
1808 ctx.set_control(None)
1809 self.ticks_passed += 1
1810
[docs]
1811 def update_world(self, ctx: Context, *, execute_phases: Union[bool, Container[Phase]] = True) -> "carla.VehicleControl | None":
1812 """
1813 Ticks the world and takes care of the rendering.
1814
1815 Will call
1816 - :python:`ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=<this Rule instance>)`
1817 - :py:meth:`.LunaticAgent.update_information`, with or without executing the phases, depending on **execute_phases**.
1818
1819 Args:
1820 ctx: The context to use
1821 execute_update_information:
1822 Whether to execute the :python:`Phase.UPDATE_INFORMATION | Phase.[BEGIN|END]` with this function.
1823 Can also be a container containing one of both of these phases.
1824 Defaults to :python:`True`.
1825
1826 Raises:
1827 UnblockRuleException: If the ticks passed are over :py:attr:`MAX_TICKS`
1828
1829 Attention:
1830 The usage with Leaderboard_ is working but experimental.
1831 The scenario expects the agent to return a control object in every step, however as this rule
1832 takes over the ticks completely an outside ScenarioManager might not work as expected.
1833 """
1834 self._begin_tick(ctx)
1835 ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=self)
1836
1837 # Update the agent's information
1838 if execute_phases and (execute_phases is True or Phase.UPDATE_INFORMATION | Phase.BEGIN in execute_phases):
1839 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.BEGIN, prior_results=self)
1840 ctx.agent._update_information()
1841 if execute_phases and Phase.UPDATE_INFORMATION | Phase.END not in self.phases:
1842 ctx.agent.execute_phase(Phase.UPDATE_INFORMATION | Phase.END, prior_results=self)
1843 if self.ticks_passed > self.MAX_TICKS:
1844 logger.info("Rule %s has passed its max_ticks %s, calling max_tick_callback and unblocking it", self, self.MAX_TICKS)
1845 if self.max_tick_callback:
1846 self.max_tick_callback(ctx)
1847 if self.ticks_passed > self.MAX_TICKS:
1848 raise UnblockRuleException
1849 # max_tick_callback can override the ticks passed
1850 else:
1851 raise UnblockRuleException
1852
1853 self._render_everything(ctx)
1854
[docs]
1855 def __call__(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None, in_loop: bool = False,
1856 *, # Kwargs from Rule.__call__
1857 ignore_phase: bool = False,
1858 ignore_cooldown: bool = False) -> Union[Any, Literal[RuleResult.NOT_APPLICABLE]]:
1859 if not in_loop:
1860 self.ticks_passed = 0
1861 else:
1862 ignore_phase = True
1863 ignore_cooldown = True
1864 if self in ctx.agent._active_blocking_rules:
1865 logger.warning("Rule %s is already blocking the agent, this is a recursive call. Not executing the rule.", self)
1866 return Rule.NOT_APPLICABLE
1867 try:
1868 return super().__call__(ctx, overwrite, ignore_phase=ignore_phase, ignore_cooldown=ignore_cooldown)
1869 except UnblockRuleException as e:
1870 return e.result
1871 finally:
1872 ctx.agent._active_blocking_rules.discard(self) # pyright: ignore[reportPrivateUsage]
1873
1874 def evaluate(self, ctx: Context, overwrite: Optional[Dict[str, Any]] = None) -> Union[bool, Hashable, Literal[RuleResult.NO_RESULT]]:
1875 result = super().evaluate(ctx, overwrite)
1876 if result in self.actions:
1877 ctx.agent._active_blocking_rules.add(self) # pyright: ignore[reportPrivateUsage]
1878 return result
1879
1880# Provide necessary imports for the evaluation_function module and prevents circular imports
1881
1882import classes.evaluation_function as __evaluation_function # noqa
1883__evaluation_function.Rule = Rule
1884__evaluation_function.Context = Context
1885del __evaluation_function