classes package
This package contains helper classes and constants used in this project.
The classes from carla_originals are mostly identical to the
ones provided in the examples from the CARLA PythonAPI.
Classes Overview
GameFramework
The GameFramework is a helper-class class for a quicker setup.
It manages the game loop of the agent and takes care of:
Initialization of:
, and
interface
for other actors
The agent
Cooldowns of rules
Load the LaunchConfig via Hydra’s compose API
Interface to the CarlaDataProvider from the scenario_runner package.
Handling of special exceptions during the agent’s
run_steploopTicking of the world HUD rendering
GameFramework.render_everything(). Note: This function is the recommended way to render everything.
WorldModel
The WorldModel is a helper-class serving as a interface between the agent,
the simulator and the user. It handles the world ticks of the simulator and rendering of the pygame interface.
It is based on the World classes used in the examples from CARLA.
It is a extension of the class and provides the following functionalities:
Spawning of the ego actor (optional)
Or waits until an external script provides it. Command line argument
external_actor="hero".
Ticking (
sync=True) or waiting for the tick (sync=False) of theHUD management
Camera setup and management
Some sensors for the pygame user interface, displayed on the HUD
Toggling of
rendering
Note
GameFramework.render_everything()extends the rendering of theWorldModeland is the preferred function to call for rendering.
RSS features
Weather change
Cleanup when the program ends
HUD and Camera Manager
The HUD is a modification of the HUD classes used in the manual_control_rss.py example of Carla.
It closely combines with the classes.ui.camera_manager.CameraManager to provide the visual human interface for the pygame window.
Rules
See the Rule documentation for details.
RssSensor and Visualization
See the RSS documentation from CARLA.
KeyboardControls
See RSSKeyboardControl API for a more details.
Constants and Enums
See contents of the classes.constants module.
carla_originals
This package contains (mostly) unmodified classes extracted from examples provided by CARLA. The classes have are imported into this package.
Driver, Vehicle, VehicleSpawner
These are classes that are deprecated or will be merged or are up to major changes.
Rule Interpreter
This class allows for an alternative way to execute rules through different interfaces (python, yaml, xml, etc). It is not yet further integrated and documented.
Exceptions Overview
AgentDoneExceptionRaised when there is no more waypoint in the queue to follow and no rule set a new destination.ContinueLoopExceptionRaise when theagent.run_stepof the agent should not be continued further. The agent then returns the currentctx.controlto the caller ofrun_step.UserInterruptionTerminate therun_steploop if user input is detected, for example or a pygame hotkey like Esc. This allows the scenario runner and leaderboard to exit gracefully.UpdatedPathExceptionShould be raised when the path has been updated and the agent should replan.Phase.DONE | ENDFor more details see the
classes.exceptionsAPI documentation.
- class classes.Any(*args, **kwargs)[source]
Bases:
objectSpecial type indicating an unconstrained type.
Any is compatible with every type.
Any assumed to have all methods.
All values assumed to be instances of Any.
Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.
- classes.overload(func)[source]
Decorator for overloaded functions/methods.
In a stub file, place two or more stub definitions for the same function in a row, each decorated with @overload.
For example:
@overload def utf8(value: None) -> None: ... @overload def utf8(value: bytes) -> bytes: ... @overload def utf8(value: str) -> bytes: ...
In a non-stub file (i.e. a regular .py file), do the same but follow it with an implementation. The implementation should not be decorated with @overload:
@overload def utf8(value: None) -> None: ... @overload def utf8(value: bytes) -> bytes: ... @overload def utf8(value: str) -> bytes: ... def utf8(value): ... # implementation goes here
The overloads for a function can be retrieved at runtime using the get_overloads() function.
- class classes.TypeVar(name, *constraints, bound=None, covariant=False, contravariant=False, default=NoDefault, infer_variance=False)[source]
Bases:
objectType variable.
- classes.final(f)[source]
Decorator to indicate final methods and final classes.
Use this decorator to indicate to type checkers that the decorated method cannot be overridden, and decorated class cannot be subclassed.
For example:
class Base: @final def done(self) -> None: ... class Sub(Base): def done(self) -> None: # Error reported by type checker ... @final class Leaf: ... class Other(Leaf): # Error reported by type checker ...
There is no runtime checking of these properties. The decorator attempts to set the
__final__attribute toTrueon the decorated object to allow runtime introspection.
- class classes.Protocol[source]
Bases:
GenericBase class for protocol classes.
Protocol classes are defined as:
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example:
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:
class GenProto(Protocol[T]): def meth(self) -> T: ...
Subpackages
Submodules
classes.constants module
In this module defines enums and constants that are used throughout the project.
- classes.constants.AD_RSS_AVAILABLE: bool
Indicator if the
carla.admodule is available, i.e. if the current carla version was build with RSS support. As a rule of thumb: On Windows this will be always false. If this is false some objects will be missing in thecarlamodule, for example thecarla.RssSensor, likewise are some utilities of this project involving RSS not be available.
- class classes.constants.AgentState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
FlagHigh-level states that the agent currently can be in.
Used with
LunaticAgent.current_statesandInformationManager.state_counter.- DRIVING
- STOPPED
- BLOCKED_BY_VEHICLE
- BLOCKED_RED_LIGHT
- BLOCKED_BY_STATIC
Static obstacle.
Attention
Not updated by the information manager but in the
Phase.DETECT_STATIC_OBSTACLESphase.
- BLOCKED_OTHER
- REVERSE
- OVERTAKING
- AGAINST_LANE_DIRECTION
- BLOCKED
Combination of
BLOCKED_OTHER | BLOCKED_BY_VEHICLE | BLOCKED_RED_LIGHT | BLOCKED_BY_STATIC
- class classes.constants.RulePriority(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
IntEnumPriority of a
Rule. The higher a value, the higher the priority. Rules are sorted by their priority before being applied.- NULL = 0
- LOWEST = 1
- LOW = 2
- NORMAL = 4
- HIGH = 8
- HIGHEST = 16
- class classes.constants.Phase(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
FlagA rough order of the looped through states of the agent is:
<Phases.NONE: 0>, <Phases.UPDATE_INFORMATION|BEGIN: 5>, <Phases.UPDATE_INFORMATION|END: 6>, <Phases.PLAN_PATH|BEGIN: 9>, <Phases.PLAN_PATH|END: 10>, <Phases.DETECT_TRAFFIC_LIGHTS|BEGIN: 17>, <Phases.DETECT_TRAFFIC_LIGHTS|END: 18>, <Phases.DETECT_PEDESTRIANS|BEGIN: 33>, <Phases.DETECT_PEDESTRIANS|END: 34>, <Phases.DETECT_CARS|BEGIN: 65>, <Phases.DETECT_CARS|END: 66>, <Phases.POST_DETECTION_PHASE|BEGIN: 129>, <Phases.POST_DETECTION_PHASE|END: 130>, <Phases.EXECUTION|BEGIN: 1025>, <Phases.EXECUTION|END: 1026>
Note
The above cycle list is not up to date.
- A complete list of all the main phases can be obtained by calling
Phase.get_main_phases()
- NONE = 0
- BEGIN
Indicates the beginning of a phase. Should always be combined with another phase.
- END
Indicates the end of a phase. Should always be combined with another phase.
- UPDATE_INFORMATION
Indicates the execution of the agents
update_information()methodThis is the first Phase of the agent and executed every step.
- PLAN_PATH
Indicates that the planning of a new path has started.
Attention
If the the path is replanned a
UpdatedPathExceptionshould be raised.
- DETECT_TRAFFIC_LIGHTS
Executed during the agents
detect_hazard()method. Can addHazard.TRAFFIC_LIGHT_REDorHazard.TRAFFIC_LIGHT_YELLOWto the agentsLunaticAgent.detected_hazards.
- DETECT_PEDESTRIANS
Executed during the agents
detect_hazard()method. Can addHazard.PEDESTRIANto the agentsLunaticAgent.detected_hazards.
- DETECT_STATIC_OBSTACLES
- Checks for static obstacles in the agents path with
- DETECT_CARS
- Checks for vehicles in the agents path with
Note
If a car was detected executes
Phase.CAR_DETECTED | BEGIN,Phase.DETECT_CARS | BEGINis only executed if no car was detected.
- TAKE_NORMAL_STEP
During this phase the
carla.VehicleControlis calculated by the local planner.
- RSS_EVALUATION
See also
classes.ui.keyboard_controls.RssKeyboardControls.parse_events()
- APPLY_MANUAL_CONTROLS
Applied manually via human user interface.
- EXECUTION
Executed when the control is applied to the agent.
See also
- CAR_DETECTED
The
Phase.CAR_DETECTED | BEGINis executed when a car is detected and follows thePhase.DETECT_CARS | BEGINphase. If no car is detected thePhase.DETECT_CARS | ENDphase is executed.
- TURNING_AT_JUNCTION
Indicates that the agent is turning at a junction.
Warning
This Phase might become obsolete.
- HAZARD
Not implemented. Refer to EMERGENCY | BEGIN
- EMERGENCY = 32768
Special Phase
See also
- COLLISION
Special phase that is executed out-of-order when a
carla.Sensordetects a collision.See also
- DONE
Indicates that the agent is at the end of its path and
agent.done()isTrue.
- TERMINATING
Can be called when the agent is terminating. Must be executed by the user.
- CUSTOM_CYCLE
experimental
Can be used to indicate that the phase change is currently handled by the user.
Warning
LunaticAgent.execute_phase()checks for exact match, i.e. a phaseUPDATE_INFORMATION | BEGIN | ENDwill not be executed in the normal loop.See also
Executed in
BlockingRule.loop_agent
- DETECT_NON_CARS
Combination of
DETECT_STATIC_OBSTACLES | DETECT_TRAFFIC_LIGHTS | DETECT_PEDESTRIANS
- DETECTION_PHASE
Combination of
DETECT_STATIC_OBSTACLES | DETECT_TRAFFIC_LIGHTS | DETECT_PEDESTRIANS | DETECT_CARS
- EXCEPTIONS
Combination of
HAZARD | EMERGENCY | COLLISION | TURNING_AT_JUNCTION | CAR_DETECTED | DONE | TERMINATING
- USER_CONTROLLED
Phases that might or not be went through as they must be implemented manually by the user.
Combination of
APPLY_MANUAL_CONTROLS | EXECUTION | TERMINATING | CUSTOM_CYCLE
- NORMAL_LOOP
Combination of
UPDATE_INFORMATION | PLAN_PATH | DETECTION_PHASE | TAKE_NORMAL_STEP
- IN_LOOP
Phases that are executed in or before the inner step, EMERGENCY is executed, right after the inner step.
Combination of
NORMAL_LOOP | EMERGENCY | COLLISION
- class classes.constants.Hazard(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
FlagValues currently stored in the agent’s
detected_hazards. attribute.- TRAFFIC_LIGHT_RED
- TRAFFIC_LIGHT_YELLOW
- PEDESTRIAN
- CAR
- STATIC_OBSTACLE
Note
These refer to actors and not the environment barriers.
- OTHER
- JUNCTION
- OBSTACLE
Combination of
PEDESTRIAN | CAR | STATIC_OBSTACLE
- TRAFFIC_LIGHT
Combination of
TRAFFIC_LIGHT_RED | TRAFFIC_LIGHT_YELLOW
- class classes.constants.HazardSeverity(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
FlagHigh level descriptions to further weight
Hazard. TheHazardSeverityflags are stored indetected_hazards_info.- UNKNOWN = -1
- NONE = 0
Initial values for
LunaticAgent.detected_hazards_info
- WARNING = 1
- COLLISION
Special case for collision.
- class classes.constants.RoadOption(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
IntEnumRoadOption represents the possible topological configurations when moving from a segment of lane to other.
See also
RoadOptionColorfor the color representation of the road options.- VOID = -1
Indicated by green
- LEFT
Indicated by yellow
- RIGHT
Indicated by cyan
- STRAIGHT
Indicated by gray
- LANEFOLLOW
Indicated by green
- CHANGELANELEFT
Indicated by orange
- CHANGELANERIGHT
Indicated by dark cyan
- class classes.constants.RoadOptionColor(option: RoadOption)[source]
Bases:
objectPoints to a
carla.Colorobject that represents the color of the road option.Supports
__getitem__(name)and__call__(RoadOption)for easy access.- Parameters:
option (RoadOption)
- Return type:
- VOID
Green
- LEFT
Yellow
- RIGHT
Cyan
- STRAIGHT
Gray
- LANEFOLLOW
Green
- CHANGELANELEFT
Orange
- CHANGELANERIGHT
Dark Cyan
- class classes.constants.RssLogLevelStub(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
_CarlaIntEnumEnum declaration used in carla.RssSensor to set the log level.
- trace = 0
- debug = 1
- info = 2
- warn = 3
- err = 4
- critical = 5
- off = 6
- class classes.constants.RssRoadBoundariesModeStub(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
_CarlaIntEnumEnum declaration used in carla.RssSensor to enable or disable the stay on road feature. In summary, this feature considers the road boundaries as virtual objects. The minimum safety distance check is applied to these virtual walls, in order to make sure the vehicle does not drive off the road.
- Off = 0
- On = 1
- class classes.constants.RuleResult(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
EnumSpecial
objectsthat indicate special return values aRule- NO_RESULT
Indicates the the rule returned no result, e.g. because an exception was raised.
- NOT_APPLICABLE
Object that indicates that no action was executed, e.g. because the rule is on cooldown or blocked.
- class classes.constants.StreetType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
-
Used by the
DetectionMatrixto interpret the street type.- ON_HIGHWAY = 'On highway'
- NON_HIGHWAY_STREET = 'Non highway street'
- ON_JUNCTION = 'On junction'
- ON_HIGHWAY_ENTRY = 'On highway entry'
- ON_HIGHWAY_EXIT = 'On highway exit'
- JUNCTION_AHEAD = 'Junction ahead'
- HIGHWAY_TRAFFIC_LIGHT = 'Highway traffic light'
- HIGHWAY_WITH_ENTRY_AND_EXIT = 'Highway with entry/exit'
classes.detection_matrix module
- classes.detection_matrix.matrix_for_actor(ego_vehicle: carla.Actor, road_lane_ids: set[RoadLaneId], radius: float = 100.0, highway_shape: 'HighWayShape' | None = None)[source]
Calculates the detection matrix for the given actor.
- Parameters:
ego_vehicle (carla.Actor) – The ego vehicle
highway_shape (
tuple) – Tuple containing highway_type, number of straight highway lanes, entry waypoint tuple and/ exit waypoint tuple. Format: (highway_type: string, straight_lanes: int, entry_wps: ([wp,..], [wp,..]), exit_wps: ([wp,..], [wp,..]))road_lane_ids (set[RoadLaneId])
radius (float)
Note
CarlaDataProviderneeds to be set up before calling this function.
- class classes.detection_matrix.DetectionMatrix(ego_vehicle: carla.Actor, road_lane_ids: Set[RoadLaneId] | None = None, radius: float = 100.0)[source]
Bases:
objectAutomatically create a matrix representing the lanes around the ego vehicle on each
update()call.- Parameters:
ego_vehicle (carla.Actor)
road_lane_ids (Optional[Set[RoadLaneId]])
radius (float)
- __init__(ego_vehicle: carla.Actor, road_lane_ids: Set[RoadLaneId] | None = None, radius: float = 100.0)[source]
- Parameters:
ego_vehicle (carla.Actor)
road_lane_ids (Optional[Set[RoadLaneId]])
radius (float)
- running
If the matrix will perform updates.
- matrix: Dict[int, List[int]]
An ordered dictionary representing the city matrix. The keys for existing lanes are the lane IDs in the format “road_id_lane_id”. For non-existing lanes different placeholder exist, e.g. left_outer_lane, left_inner_lane, No_4th_lane, No_opposing_direction The values indicate whether a vehicle is present: 0 - No vehicle, 1 - Ego vehicle, 3 - No road. Format example:
{ "left_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3], "left_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3], "1_2": [0, 0, 0, 0, 0, 0, 0, 0], "1_1": [0, 0, 0, 0, 0, 0, 0, 0], "1_-1": [0, 0, 0, 0, 0, 0, 0, 0], "1_-2": [0, 0, 0, 0, 0, 0, 0, 0], "right_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3], "right_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3], }
Attention
Currently the keys are replaces by numbers.
In case of an unsupported layout or before the very first update the matrix can be
None. Be aware of this when usingAsyncDetectionMatrix.
- Type:
- update() Dict[int, List[int]] | None[source]
If the matrix is
running, it will update the matrix and return it, otherwise returnsNone.
- render(display: pygame.Surface, imshow_settings: dict[str, Any] = {'cmap': 'jet'}, vertical: bool = True, draw_values: bool = True, text_settings: dict[str, Any] = {'color': 'orange'}, *, draw: bool = True) None[source]
Renders the matrix on the given surface using
matplotlib.- Parameters:
display (pygame.Surface) – The surface to render the matrix on.
imshow_settings (dict[str, Any]) – The settings for
matplotlib.pyplot.imshow(). Defaults to{'cmap': 'jet'}.vertical (bool) – If the lanes should be displayed vertically. Defaults to
True.draw_values (bool) – If the entries should be displayed as text. Defaults to
True.text_settings (dict[str, Any]) – The settings for
matplotlib.pyplot.text()when draw_values. Defaults to{'color': 'orange'}.draw (bool) – If the matrix should be drawn. If
False, this function will do nothing.
- Return type:
None
- class classes.detection_matrix.AsyncDetectionMatrix(ego_vehicle: carla.Actor, *, road_lane_ids: Set[RoadLaneId] | None = None, sleep_time: float = 0.1)[source]
Bases:
DetectionMatrixAsynchronous version of the
DetectionMatrix.Will calculate the matrix update in a separate thread.
- Parameters:
ego_vehicle (carla.Actor)
road_lane_ids (Optional[Set[RoadLaneId]])
sleep_time (float)
- __init__(ego_vehicle: carla.Actor, *, road_lane_ids: Set[RoadLaneId] | None = None, sleep_time: float = 0.1)[source]
- Parameters:
ego_vehicle (carla.Actor) – The ego vehicle.
sleep_time (float) – The time to sleep between updates. Defaults to 0.1 seconds
road_lane_ids (Optional[Set[RoadLaneId]]) – The road and lane IDs to consider. If not provided, all will be considered.
- render(display: pygame.Surface, imshow_settings: dict[str, Any] = {'cmap': 'jet'}, vertical: bool = True, draw_values: bool = True, text_settings: dict[str, Any] = {'color': 'orange'}, *, draw: bool = True) None
Renders the matrix on the given surface using
matplotlib.- Parameters:
display (pygame.Surface) – The surface to render the matrix on.
imshow_settings (dict[str, Any]) – The settings for
matplotlib.pyplot.imshow(). Defaults to{'cmap': 'jet'}.vertical (bool) – If the lanes should be displayed vertically. Defaults to
True.draw_values (bool) – If the entries should be displayed as text. Defaults to
True.text_settings (dict[str, Any]) – The settings for
matplotlib.pyplot.text()when draw_values. Defaults to{'color': 'orange'}.draw (bool) – If the matrix should be drawn. If
False, this function will do nothing.
- Return type:
None
- matrix: Dict[int, List[int]]
An ordered dictionary representing the city matrix. The keys for existing lanes are the lane IDs in the format “road_id_lane_id”. For non-existing lanes different placeholder exist, e.g. left_outer_lane, left_inner_lane, No_4th_lane, No_opposing_direction The values indicate whether a vehicle is present: 0 - No vehicle, 1 - Ego vehicle, 3 - No road. Format example:
{ "left_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3], "left_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3], "1_2": [0, 0, 0, 0, 0, 0, 0, 0], "1_1": [0, 0, 0, 0, 0, 0, 0, 0], "1_-1": [0, 0, 0, 0, 0, 0, 0, 0], "1_-2": [0, 0, 0, 0, 0, 0, 0, 0], "right_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3], "right_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3], }
Attention
Currently the keys are replaces by numbers.
In case of an unsupported layout or before the very first update the matrix can be
None. Be aware of this when usingAsyncDetectionMatrix.
- Type:
- running
If the matrix will perform updates.
classes.evaluation_function module
- class classes.evaluation_function.ConditionFunction(first_argument: str(name) | ~typing.Callable[[~typing.Concatenate[~classes.type_protocols.RuleT, Context, ...]], ~typing.Hashable] | ~typing.Callable[[~typing.Concatenate[Context, ...]], ~typing.Hashable] | None = None, name: str = 'ConditionFunction', *, truthy: bool = False, use_self: bool | None = None)[source]
Bases:
Generic[_CP,_CH]Implements a decorator to wrap function to be used with
Ruleclasses. The function must return a hashable type, which is used to access the action to be taken by theRule.conditionfunction of the rule.Evaluation functions can be combined using the AND, OR and NOT operators to build up more complex rules from simpler ones. The operators
+, &,|and~are aliases forAND, ORandNOTrespectively. e.g. with these two functions:func1 = ConditionFunction(lambda ctx: ctx.speed > 10)func2 = ConditionFunction(lambda ctx: ctx.speed < 20)- These statements are all equivalent:
ConditionFunction(lambda ctx: 10 < ctx.speed < 20)func1 + func2func1 & func2func1.AND(func2)ConditionFunction.AND(func1, func2)
Hint
ConditionFunctions also allow for more specific returns types
Example:
@ConditionFunction def is_speeding(ctx: Context) -> Hashable: config = ctx.agent.config if config.speed > config.speed_limit+20: return "very fast" elif config.speed > config.speed_limit+5: return "fast" elif: config.speed < config.speed_limit-20: return "very slow" else: return "normal" Rule(is_speeding, action={ "very fast": lambda ctx: ctx.agent.config.follow_speed_limits(), "fast" : lambda ctx: ctx.agent.config.set_target_speed(ctx.speed_limit+5) })
- Parameters:
first_argument (str(name) | Callable[[Concatenate[RuleT, Context, ...]], Hashable] | Callable[[Concatenate[Context, ...]], Hashable] | None) – If
Noneor a string, the class will create a decorator that expects a callable as the first argument. If a string is passed it substitutes as the name argument. Otherwise a callable is expected like in the snippet used above.name (str) – The name to represent the function. Defaults to
"ConditionFunction".truthy (bool) – If True, the function will always cast the return value to a boolean value. Defaults to
False.use_self (Optional[bool]) – If
True, the function will be treated as a method and the first argument will be the instance of theRulethat uses this function. IfNone, the decision depends on the signature of the function, if it has only one argument only theContextobject is passed, if it has two or more arguments the first argument that is passed is the instance of theRule. UseFalseto not use the instance of theRuleas the first argument. Defaults toNone.
- Returns:
A
ConditionFunctionor a partially initialized version to be used as a decorator when the first_argument is not a callable.- Return type:
ConditionFunction | partial[ConditionFunction]
- Generics:
- _Rule :
Generic
Ruletype that could appear in theevaluation_function'ssignature.
_CP :
typing.ParamSpecof the passedevaluation_function._CH : The Hashable return type of the
evaluation_function.
- evaluation_function: CallableConditionT
The function that is wrapped by the
ConditionFunction. Uses the generic type hints_CP,_CHof the class.
- actions: dict[Hashable, CallableActionT] = {}
Mapping of return values to actions to be executed. If this dictionary is not empty it will be used as the
Rule.actionsdictionary.
- copy(copy_actions: bool = False)[source]
Copies the class by creating a new instance.
- Parameters:
copy_actions (bool) – If
True, theConditionFunction.actionsdictionary is copied as well. Defaults toFalse.- Returns:
A new instance, with the same
__init__arguments.- Return type:
Warning
Be aware that when using copy_actions the actions themselves are not copied; they are identical and shared.
- register_action(key: Hashable = True, *, use_self: bool | None = None, **kwargs: ParamSpecKwargs) Callable[[CallableT], CallableT][source]
- register_action(action_function: CallableT, key: Hashable = True, *, use_self: bool | None = None, **kwargs: ParamSpecKwargs) CallableT
Add an action to be executed when the condition function returns a specific value.
This function can be used as a decorator or as a method:
# As decorator @ConditionFunction def is_speeding(ctx: Context) -> Literal["very fast", "fast", "normal", "slow", True]: ... @is_speeding.register_action(key="very fast") # or @is_speeding.register_action # default key is True def very_fast_action(ctx: Context): ctx.agent.set_target_speed(ctx.config.target_speed-5) # Or as function def fast_action(ctx: Context, speed: int): ctx.agent.set_target_speed(speed) is_speeding.register_action("fast", fast_action) # Custom keywords without Rule instance is_speeding.register_action("normal", use_self=False, # no Rule in the signature speed=ctx.config.target_speed) # keyword argument def custom_speed(ctx: Context, speed: int): ctx.agent.set_target_speed(speed)
- Parameters:
key (Hashable) – If the condition function returns this value, this action will be executed. Defaults to
True.action_function – (When not used as decorator) The action to be executed.
use_self (Optional[bool]) – If not
None, ause_selfattribute is set on the function to indicate if the function is a method or not.kwargs (ParamSpecKwargs) – Keyword arguments for the decorated function.
- Returns:
Decorator to be used or the decorated function as ConditionFunction.
- Return type:
_RegisterActionDecorator
Note
Only one action is allowed per key. If an action is already registered for the key, it will be overwritten.
- classmethod AND(func1: ConditionFunction[ParamSpec, _CH], func2: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]
Combine two functions with
and, i.e. to return True if both return True.- Parameters:
func1 (ConditionFunction[ParamSpec, _CH])
func2 (ConditionFunction[ParamSpec, Hashable])
- Return type:
ConditionFunction[ParamSpec, _CH | Hashable]
- classmethod OR(func1: ConditionFunction[ParamSpec, _CH], func2: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]
Combine two functions to return True if either returns True.
- Parameters:
func1 (ConditionFunction[ParamSpec, _CH])
func2 (ConditionFunction[ParamSpec, Hashable])
- Return type:
ConditionFunction[ParamSpec, _CH | Hashable]
- classmethod NOT(func: ConditionFunction[ParamSpec, _CH]) ConditionFunction[ParamSpec, bool][source]
Invert the return value of a function.
- Parameters:
func (ConditionFunction[ParamSpec, _CH])
- Return type:
- __add__(other: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]
Combine with another function using
AND().- Parameters:
other (ConditionFunction[ParamSpec, Hashable])
- Return type:
ConditionFunction[ParamSpec, _CH | Hashable]
- __and__(other: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]
Combine with another function using
AND().- Parameters:
other (ConditionFunction[ParamSpec, Hashable])
- Return type:
ConditionFunction[ParamSpec, _CH | Hashable]
- __or__(other: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]
Combine with another function using
OR().- Parameters:
other (ConditionFunction[ParamSpec, Hashable])
- Return type:
ConditionFunction[ParamSpec, _CH | Hashable]
- __invert__() ConditionFunction[ParamSpec, bool][source]
Invert the return value of the function.
- Return type:
classes.exceptions module
Helper module that contains all the custom exceptions used in the project
- exception classes.exceptions.AgentDoneException[source]
Bases:
LunaticAgentExceptionRaised when there is no more waypoint in the queue to follow and no rule set a new destination.
When the a
GameFrameworkinstance is used as context manager will setgame_framework.continue_looptoFalse.
- exception classes.exceptions.ContinueLoopException[source]
Bases:
LunaticAgentExceptionRaise when
LunaticAgent.run_step()action of the agent should not be continued further.The agent returns the current
ctx.controlto the caller ofrun_step.Note
Handled in
LunaticAgent.run_step(), this exception should not propagate outside. It can be caught byGameFrameworkand skip the current loop and not apply any controls, an error will be logged.
- exception classes.exceptions.DoNotEvaluateChildRules(result: Any = RuleResult.NO_RESULT, *args: object)[source]
Bases:
_RuleResultExceptionCan be raised in a
MultiRuleto prevent the evaluation of child rules.Can also be raised by child rules to prevent the evaluation of further child rules.
- exception classes.exceptions.EmergencyStopException(hazards: set[Hazard], *args: object)[source]
Bases:
LunaticAgentException
- exception classes.exceptions.LunaticAgentException[source]
Bases:
ExceptionBase class for all custom exceptions that influence the Workflow of the
LunaticAgent.
- exception classes.exceptions.NoFurtherRulesException(result: Any = RuleResult.NO_RESULT, *args: object)[source]
Bases:
_RuleResultExceptionRaised when no further rules should be executed in this phase.
Caught by
LunaticAgent.execute_phase().The agent will continue at the phase where the
BlockedRulewas triggered.
- exception classes.exceptions.SkipInnerLoopException(planned_control: carla.VehicleControl, *args: object)[source]
Bases:
LunaticAgentExceptionCan be raised in LunaticAgent._inner_step. A new control object must be provided.
- Parameters:
planned_control (carla.VehicleControl)
args (object)
- Return type:
None
- __init__(planned_control: carla.VehicleControl, *args: object) None[source]
- Parameters:
planned_control (carla.VehicleControl)
args (object)
- Return type:
None
- planned_control: carla.VehicleControl
- exception classes.exceptions.UnblockRuleException(result: Any = RuleResult.NO_RESULT, *args: object)[source]
Bases:
_RuleResultExceptionCan be raised in a
BlockedRuleto end it.The agent will continue at the phase where the
BlockedRulewas triggered.Note
Further rules that are in this phase can still be executed. Alternatively, consider raising a
NoFurtherRulesException.
- exception classes.exceptions.UpdatedPathException[source]
Bases:
LunaticAgentExceptionShould be raised when the path has been updated and the agent should replan.
Rules that replan on Phase.DONE | END, should throw this exception at the end.
- exception classes.exceptions.UserInterruption[source]
Bases:
ExceptionTerminate the loop if user input is detected. Allows the scenario runner and Leaderboard to exit gracefully, if handled appropriately, e.g. by directly returning.
Thrown by
LunaticAgent.parse_keyboard_input.Note
Is not a
LunaticAgentException.
- exception classes.exceptions._RuleResultException(result: Any = RuleResult.NO_RESULT, *args: object)[source]
Bases:
LunaticAgentExceptionAbstract class for exceptions that can be raised by rules that still are able to return a result.
classes.information_manager module
Aim of this module is to provide a less convoluted access to information, i.e. distill the information from the data and return high level information
- class classes.information_manager.InformationManager(agent: LunaticAgent, update_information: bool = True)[source]
Bases:
objectTracks global information, e.g. all actors, traffic lights, etc. as well as agent specific information, e.g. current speed, current location, and nearby actors in relation to the agent.
Tip
global information is attributed by
ClassVarandstaticmethodand is shared across all instances, this information is constant for the current tick.- Parameters:
agent (LunaticAgent)
update_information (bool)
- relevant_traffic_light: carla.TrafficLight | None = None
Result of
CarlaDataProvider.get_next_traffic_light()
- relevant_traffic_light_distance: float = inf
Distance to the
relevant_traffic_lightIs float(‘inf’) ifrelevant_traffic_lightis None.
- gathered_information: InformationManager.Information
InformationManager.Informationgathered duringtick()
- vehicles: ClassVar['list[carla.Vehicle]']
List of all tracked vehicles
- walkers: ClassVar['list[carla.Walker]']
List of all tracked pedestrians
- static_obstacles: ClassVar['list[carla.Actor]']
List of all tracked static obstacles
- obstacles: ClassVar['list[carla.Actor]']
Union of
vehicles, py:attr:walkers and py:attr:static_obstacles
- lights_map: ClassVar['Dict[int, carla.Waypoint]'] = {}
Map of traffic lights to their trigger waypoints
- frame: ClassVar['int | None'] = None
Last frame the InformationManager was updated.
The current frame of the world should be passed to the global_tick method.
- __init__(agent: LunaticAgent, update_information: bool = True)[source]
- Parameters:
agent (LunaticAgent)
update_information (bool)
- state_counter: Dict[AgentState, int]
Tracks different
AgentStateand the amount of ticks the agent is in this state.
- detect_next_traffic_light()[source]
Set the
relevant_traffic_lightandrelevant_traffic_light_distanceif not set.Note
Does not check for planned path but current route along waypoints, might not be exact.
This function is automatically called in :py:meth:`tick`
- tick()[source]
Tick the information manager and update the information for the corresponding agent.
- class Information(current_waypoint: carla.Waypoint, current_speed: float, current_states: Dict[AgentState, int], relevant_traffic_light: carla.TrafficLight | None, relevant_traffic_light_distance: float, vehicles: List[carla.Vehicle], walkers: List[carla.Walker], static_obstacles: List[carla.Actor], obstacles: List[carla.Actor], walkers_nearby: List[carla.Walker], vehicles_nearby: List[carla.Vehicle], static_obstacles_nearby: List[carla.Actor], obstacles_nearby: List[carla.Actor], traffic_lights_nearby: List[carla.TrafficLight], distances: Dict[carla.Actor, float])[source]
Bases:
NamedTupleData gathered by the InformationManager which is passed to the agent.
- Parameters:
current_waypoint (carla.Waypoint)
current_speed (float)
current_states (Dict[AgentState, int])
relevant_traffic_light (Optional[carla.TrafficLight])
relevant_traffic_light_distance (float)
vehicles (List[carla.Vehicle])
walkers (List[carla.Walker])
static_obstacles (List[carla.Actor])
obstacles (List[carla.Actor])
walkers_nearby (List[carla.Walker])
vehicles_nearby (List[carla.Vehicle])
static_obstacles_nearby (List[carla.Actor])
obstacles_nearby (List[carla.Actor])
traffic_lights_nearby (List[carla.TrafficLight])
distances (Dict[carla.Actor, float])
- current_waypoint: carla.Waypoint
Alias for field number 0
- current_states: Dict[AgentState, int]
Alias for field number 2
- relevant_traffic_light: carla.TrafficLight | None
Alias for field number 3
- vehicles: List[carla.Vehicle]
Alias for field number 5
- walkers: List[carla.Walker]
Alias for field number 6
- static_obstacles: List[carla.Actor]
Filtered obstacles by InformationManager.OBSTACLE_FILTER
- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- obstacles: List[carla.Actor]
Union of vehicles, walkers and static_obstacles
- walkers_nearby: List[carla.Walker]
Alias for field number 9
- vehicles_nearby: List[carla.Vehicle]
Alias for field number 10
- static_obstacles_nearby: List[carla.Actor]
Alias for field number 11
- obstacles_nearby: List[carla.Actor]
Alias for field number 12
- traffic_lights_nearby: List[carla.TrafficLight]
Alias for field number 13
- distances: Dict[carla.Actor, float]
Distances to all actors in
obstacles
- OBSTACLE_FILTER: str = 'static.prop.[cistmw]*'
fnmatch for obstacles that the agent will consider in its path. https://carla.readthedocs.io/en/latest/bp_library/#static
- static get_traffic_lights() Dict[carla.TrafficLight, carla.Transform][source]
- Return type:
- static get_trafficlight_trigger_waypoint(traffic_light: carla.TrafficLight) carla.Waypoint[source]
Get the location where the traffic light is triggered.
- Parameters:
traffic_light (carla.TrafficLight)
- Return type:
- static global_tick(frame: int | None = None) None[source]
Update global information that is constant for the current tick and not agent specific.
- Updates:
- Parameters:
frame (Optional[int]) – The id of the current frame. If None retrieves the id from the current
carla.WorldSnapshot. Multiple calls with the same frame are ignored. (default: None)- Return type:
None
- static get_vehicles() List[carla.Vehicle][source]
- Return type:
List[carla.Vehicle]
- static get_walkers() List[carla.Walker][source]
- Return type:
List[carla.Walker]
classes.rule module
- class classes.rule.Context(*args: Any, **kwargs: Any)[source]
Bases:
CarlaDataProviderObject to be passed as the first argument (instead of self) to rules, actions and evaluation functions.
The
Contextclass derives from the scenario runner’sCarlaDataProviderto allow access to the world, map, etc.Tip
There is normally no need to initialize the context object manually. Its recommended to initialize the context object with
LunaticAgent._make_context().- Parameters:
agent (LunaticAgent)
kwargs (Any)
- last_context: 'Context' | None
The context object of the last tick. Used to access the last phase’s results.
- second_pass: bool | None = None
Whether or not the run_step function performs a second pass, i.e. after the route has been replanned.
Warning
The correctness should not be assumed. The user is responsible for setting this value to True if a second pass is required.
- PHASE_NOT_EXECUTED: object
Value in
phase_resultsto indicate thatagent.execute_phase(phase)was called for the respective phase
- agent: LunaticAgent
Backreference to the agent.
- phase_results: dict[Phase, Any]
Stores the results of the phases the agent has been in. By default the keys are set to
Context.PHASE_NOT_EXECUTED.
- config: ContextSettings
A copy of the agents config. Overwritten by the condition’s settings.
- detected_hazards_info: dict[Hazard, HazardSeverity | Any]
Information about the detected hazards.
add_hazard()inserts the givenHazardSeverityas value, however, note that the values are can be arbitrary if used otherwise.
- evaluation_results: dict[Phase, Hashable]
Stores the result from the
Rule.condition()of the last rule that was evaluated in a phase.Deprecated since version in: consideration
- action_results: dict[Phase, Any]
Stores the result from the
action()of the last rule that was applicable in a phase.Deprecated since version in: consideration
- property control: carla.VehicleControl | None
Current control the agent should use. Set by
execute_phase(update_controls=...).Note
Safeguarded to be not set to None. Setting it to
Noneis discouraged. Useset_control()if setting it toNoneis really needed.
- set_control(control: carla.VehicleControl | None)[source]
Set the control, allows to set it to None.
- Parameters:
control (Optional[carla.VehicleControl])
- get_or_calculate_control() carla.VehicleControl[source]
Get the control if it is set, otherwise calculate it by executing the local planner.
- Returns:
The control the agent should use.
- Return type:
Note
Use this function inside rules to acquire a control object-
Warning
This is equivalent to ending the inner step of the agent.
See also
LunaticAgent._calculate_control()
- property detected_hazards: Set[Hazard]
Detected hazards in the current phase.
If not empty at the end of the inner step an EmergencyStopException is raised.
- add_hazard(hazard: Hazard, hazard_level: HazardSeverity = HazardSeverity.EMERGENCY)[source]
Add the specified hazard to the detected hazards, in parallel the hazard_level can be set which which is stored in
detected_hazards_info.- Parameters:
hazard (Hazard)
hazard_level (HazardSeverity)
- discard_hazard(hazard: Hazard, match: Literal['exact', 'subset', 'intersection'] = 'subset')[source]
Discards a hazard from the detected hazards.
- Parameters:
hazard (Hazard) – Hazard to remove from
detected_hazards.match (Literal['exact', 'subset', 'intersection']) –
How to match the hazard to remove.
”exact” removes if the exact
Hazardflag is present.”subset” removes if the hazard is a subset of the detected hazard, e.g.:
discard_hazard(Hazard.VEHICLE, match="subset")would removeHazard.OBSTACLE=Hazard.VEHICLE | PEDESTRIAN | STATIC_OBSTACLE.discard_hazard(Hazard.TRAFFIC_LIGHT, match="subset")would not removeHazard.TRAFFIC_LIGHT_RED.
- has_hazard(hazard: Hazard, match: Literal['exact', 'subset', 'intersection'] = 'intersection') bool[source]
Checks if the hazard intersects with any of the detected hazards.
See
discard_hazard()for the different matching options.
- max_detection_distance(lane: Literal['same_lane', 'other_lane', 'overtaking', 'tailgating']) float
Convenience function to be used with
lunatic_agent_tools.detect_vehicles()andLunaticAgent.detect_obstacles_in_path.The max distance to consider an obstacle is calculated as:
max(obstacles.min_proximity_threshold, live_info.current_speed_limit / obstacles.speed_detection_downscale.[same|other]_lane)
- Parameters:
self (HasConfig['BehaviorAgentSettings | LunaticAgentSettings']) – An object that implements the config and live_info attributes
lane (Literal['same_lane', 'other_lane', 'overtaking', 'tailgating']) – The lane to consider.
- Return type:
Note
lane must be a key in
BehaviorAgentObstacleSettings.SpeedLimitDetectionDownscale.
- property live_info: LiveInfo
- property active_blocking_rules: Set[BlockingRule]
- classes.rule.always_execute()[source]
This is an
ConditionFunctionthat always returnsTrue. It can be used to always execute an action.
- class classes.rule.Rule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]
Bases:
_GroupRule- Parameters:
- _auto_init_: ClassVar[bool] = True
If set to False the automatic
__init__creation is disabled when subclassing. This automatic__init__will fix parameters likephasesandconditionto the class.Declaring an
__init__method in the class has the same effect as setting_auto_init_to False.Note
Using
class NewRuleType(metarule=Rule)is nearly equivalent to_auto_init_=False, but is not inherited.
- NOT_APPLICABLE: ClassVar[Literal[RuleResult.NOT_APPLICABLE]]
Unique object
RuleResult.NOT_APPLICABLEthat indicates that no action was executed.Deprecated since version Use:
RuleResult.NOT_APPLICABLEdirectly
- NO_RESULT: ClassVar[Literal[RuleResult.NO_RESULT]]
Unique object
RuleResult.NO_RESULTthat indicates that the rulesaction()did not return a result, e.g. because an exception was raised.Deprecated since version Use:
RuleResult.NOT_APPLICABLEdirectly
- false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is False. May not be set if actions is set.
- action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is True. If actions is set, this is ignored.
- clone()[source]
Create a new instance of the rule with the same settings.
Note
The current cooldown is not taken into account.
The current enabled state is taken into account.
- class CooldownFramework
Bases:
objectContext manager that can reduce all cooldowns at the end of a with statement.
- static tick()
Update all cooldowns and unblock all rules.
- DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0
Value the cooldown is reset to when
reset_cooldown()is called without a value.Used only when
cooldown_reset_valueis not set.
- blocked: bool = False
Indicates if the rule is blocked for this tick only. Is reset to False after the tick.
- property cooldown: int
Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.
- property enabled: bool
If
Falsethe rule will not be evaluated. Contrary toblockedthis permanently disables the rule.
- group: str | None = None
Group name of rules that should share their cooldown.
None for a rule to not share its cooldown.
- reset_cooldown(value: int | None = None)
Reset or set the cooldown; for a group rule it resets the group cooldown.
- Parameters:
value (Optional[int])
- set_active(value: bool)
Enables or disables the rule. Contrary to
blockedit will not be reset after the tick.- Parameters:
value (bool)
- classmethod set_cooldown_of_group(group: str, value: int)
Updates the cooldown of the specified group to a specific value
- set_my_group_cooldown(value: int | None = None)
Update the cooldown of the group this rule belong to
- Parameters:
value (Optional[int])
- start_cooldown: ClassVar[int] = 0
Initial
cooldownwhen initialized. if >0 the rule will not be ready for the first start_cooldown ticks.
- classmethod unblock_all_rules()
Unblocks all rules
- classmethod update_all_cooldowns()
Globally updates the cooldown of all rules.
- phases: frozenset[Phase]
The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [
Phase].
- condition: Any
The condition that determines if the rule’s actions should be executed.
- Simple Variant:
return True if the action should be executed, False otherwise. if
Rule.false_actionis defined, False will executeRule.false_action.- Advanced Variant:
return a Hashable value that is used as key in the
actionsdict.
- actions: dict[Any, CallableActionT]
Dictionary that maps rule results to the action that should be executed.
- priority: float | int | RulePriority = 4
Rules are executed in order of their priority, from high to low.
- overwrite_settings: dict[str, Any]
Settings that should overwrite the agent’s settings for this rule.
Note
The overwrite settings are primitive
dictobjects,omegaconf.DictConfigobjects are converted.
- self_config: RuleConfig
A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.
Can be accessed via ctx.config.current_rule or self.config.self.
Note
Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.
Attention
The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.
- execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)[source]
Attention
Use with care to avoid loops or recursions.
If a
Contextis available prefer usingctx.agent.execute_phaseinstead.
Helper function to execute a phase from within a rule, wrapper of
agents.lunatic_agent.LunaticAgent.execute_phase().- Warns:
ReferenceError – If the weak proxy pointing to the
Contextobject has been deleted. The phase will not be executed.- Parameters:
phase (Phase)
prior_results (Any)
update_controls (Optional[carla.VehicleControl])
See also
- evaluate_children(ctx: Context) NoReturn[source]
Not implemented for this rule class.
Note
This is an interface function of meta rules that is executed during
__call__()to call further rules.
- __call__(ctx: Context, overwrite: Dict[str, Any] | None = None, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE][source]
First checks if the rule is applicable, i.e. is its
cooldown == 0, if not returnsNOT_APPLICABLE.- Afterwards evaluates the rules
condition()function. if the result is not in
actionsreturnsNOT_APPLICABLE.otherwise merges the
overwrite_settingswith the py:attr:.Context.config and executes the action.
- Afterwards evaluates the rules
- Parameters:
ctx (Context) – The context object that is passed to the condition function.
overwrite (Optional[Dict[str, Any]]) – Extends
overwrite_settingsfor this call only. Defaults toNone.ignore_phase (bool) – If
Truethe phase check is skipped. Defaults toFalse.ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to
False.
- Return type:
Union[Any, Literal[RuleResult.NOT_APPLICABLE]]
- class classes.rule.BlockingRule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]
Bases:
RuleThis meta rule allows to define rules that are able to takeover the agent’s workflow and apply the
VehicleControldirectly from withhin the rule.- Parameters:
condition (Optional[ConditionFunctionLikeT])
action (Optional[Union[CallableActionT, Dict[Any, CallableActionT]]])
false_action (Optional[CallableActionT])
gameframework (Optional[GameFramework])
actions (Optional[Dict[Any, CallableActionT]])
description (str)
priority (RulePriority)
cooldown_reset_value (Optional[int])
group (Optional[str])
enabled (bool)
- DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0
Value the cooldown is reset to when
reset_cooldown()is called without a value.Used only when
cooldown_reset_valueis not set.
- _auto_init_: ClassVar[bool] = True
If set to False the automatic
__init__creation is disabled when subclassing. This automatic__init__will fix parameters likephasesandconditionto the class.Declaring an
__init__method in the class has the same effect as setting_auto_init_to False.Note
Using
class NewRuleType(metarule=Rule)is nearly equivalent to_auto_init_=False, but is not inherited.
- blocked: bool = False
Indicates if the rule is blocked for this tick only. Is reset to False after the tick.
- property cooldown: int
Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.
- property enabled: bool
If
Falsethe rule will not be evaluated. Contrary toblockedthis permanently disables the rule.
- evaluate_children(ctx: Context) NoReturn
Not implemented for this rule class.
Note
This is an interface function of meta rules that is executed during
__call__()to call further rules.
- execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)
Attention
Use with care to avoid loops or recursions.
If a
Contextis available prefer usingctx.agent.execute_phaseinstead.
Helper function to execute a phase from within a rule, wrapper of
agents.lunatic_agent.LunaticAgent.execute_phase().- Warns:
ReferenceError – If the weak proxy pointing to the
Contextobject has been deleted. The phase will not be executed.- Parameters:
phase (Phase)
prior_results (Any)
update_controls (Optional[carla.VehicleControl])
See also
- group: str | None = None
Group name of rules that should share their cooldown.
None for a rule to not share its cooldown.
- priority: float | int | RulePriority = 4
Rules are executed in order of their priority, from high to low.
- reset_cooldown(value: int | None = None)
Reset or set the cooldown; for a group rule it resets the group cooldown.
- Parameters:
value (Optional[int])
- set_active(value: bool)
Enables or disables the rule. Contrary to
blockedit will not be reset after the tick.- Parameters:
value (bool)
- set_my_group_cooldown(value: int | None = None)
Update the cooldown of the group this rule belong to
- Parameters:
value (Optional[int])
- start_cooldown: ClassVar[int] = 0
Initial
cooldownwhen initialized. if >0 the rule will not be ready for the first start_cooldown ticks.
- phases: frozenset[Phase]
The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [
Phase].
- actions: dict[Any, CallableActionT]
Dictionary that maps rule results to the action that should be executed.
- action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is True. If actions is set, this is ignored.
- false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is False. May not be set if actions is set.
- overwrite_settings: dict[str, Any]
Settings that should overwrite the agent’s settings for this rule.
Note
The overwrite settings are primitive
dictobjects,omegaconf.DictConfigobjects are converted.
- self_config: RuleConfig
A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.
Can be accessed via ctx.config.current_rule or self.config.self.
Note
Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.
Attention
The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.
- condition: Any
The condition that determines if the rule’s actions should be executed.
- Simple Variant:
return True if the action should be executed, False otherwise. if
Rule.false_actionis defined, False will executeRule.false_action.- Advanced Variant:
return a Hashable value that is used as key in the
actionsdict.
- MAX_TICKS = 5000
The amount of ticks that can be performed by this rule before it is automatically disabled. If the rule has looped for this amount of ticks if will then call
max_tick_callbackand raise anUnblockRuleExceptionafterwards.As a hack
max_tick_callbackcan changeticks_passedto prevent the exception and continue the rule.
- max_tick_callback: Callable[[Self, Context], Any] | None = None
An optional callback that is executed when
ticks_passedreachesMAX_TICKS.
- loop_agent(ctx: Context, control: carla.VehicleControl | None = None, *, execute_planner: bool, execute_phases: Any = True) carla.VehicleControl | None[source]
A combination of LunaticAgent.parse_keyboard_input, LunaticAgent.apply_control, BlockingRule.update_world, and Context.get_or_calculate_control to advance agent and world.
- Parameters:
- Return type:
carla.VehicleControl | None
See also
Executes the following methods:
BlockingRule.update_world()ticks thecarla.Worldand renders everything.Context.get_or_calculate_control()to acquire thecarla.VehicleControlobject.
- static get_world() carla.World[source]
Method to access the world object
- Return type:
- update_world(ctx: Context, *, execute_phases: bool | Container[Phase] = True) carla.VehicleControl | None[source]
Ticks the world and takes care of the rendering.
- Will call
ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=<this Rule instance>)LunaticAgent.update_information(), with or without executing the phases, depending on execute_phases.
- Parameters:
- Raises:
UnblockRuleException – If the ticks passed are over
MAX_TICKS- Return type:
carla.VehicleControl | None
Attention
The usage with Leaderboard is working but experimental. The scenario expects the agent to return a control object in every step, however as this rule takes over the ticks completely an outside ScenarioManager might not work as expected.
- __call__(ctx: Context, overwrite: Dict[str, Any] | None = None, in_loop: bool = False, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE][source]
First checks if the rule is applicable, i.e. is its
cooldown == 0, if not returnsNOT_APPLICABLE.- Afterwards evaluates the rules
condition()function. if the result is not in
actionsreturnsNOT_APPLICABLE.otherwise merges the
overwrite_settingswith the py:attr:.Context.config and executes the action.
- Afterwards evaluates the rules
- Parameters:
ctx (Context) – The context object that is passed to the condition function.
overwrite (Optional[Dict[str, Any]]) – Extends
overwrite_settingsfor this call only. Defaults toNone.ignore_phase (bool) – If
Truethe phase check is skipped. Defaults toFalse.ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to
False.in_loop (bool)
- Return type:
Union[Any, Literal[RuleResult.NOT_APPLICABLE]]
- class classes.rule.MultiRule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]
Bases:
RuleThis metarule allows to execute one or multiple rules if it is applicable. Depending on
execute_all_rulesit will either execute all rules or only the first applicable rule from itsruleslist.- Parameters:
rules (List[Rule])
condition (Optional[ConditionFunctionLikeT])
description (str)
priority (RulePriority)
sort_rules (bool)
execute_all_rules (bool)
ignore_phase (bool)
cooldown_reset_value (Optional[int])
group (Optional[str])
enabled (bool)
- DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0
Value the cooldown is reset to when
reset_cooldown()is called without a value.Used only when
cooldown_reset_valueis not set.
- __call__(ctx: Context, overwrite: Dict[str, Any] | None = None, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE]
First checks if the rule is applicable, i.e. is its
cooldown == 0, if not returnsNOT_APPLICABLE.- Afterwards evaluates the rules
condition()function. if the result is not in
actionsreturnsNOT_APPLICABLE.otherwise merges the
overwrite_settingswith the py:attr:.Context.config and executes the action.
- Afterwards evaluates the rules
- Parameters:
ctx (Context) – The context object that is passed to the condition function.
overwrite (Optional[Dict[str, Any]]) – Extends
overwrite_settingsfor this call only. Defaults toNone.ignore_phase (bool) – If
Truethe phase check is skipped. Defaults toFalse.ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to
False.
- Return type:
Union[Any, Literal[RuleResult.NOT_APPLICABLE]]
- _auto_init_: ClassVar[bool] = True
If set to False the automatic
__init__creation is disabled when subclassing. This automatic__init__will fix parameters likephasesandconditionto the class.Declaring an
__init__method in the class has the same effect as setting_auto_init_to False.Note
Using
class NewRuleType(metarule=Rule)is nearly equivalent to_auto_init_=False, but is not inherited.
- blocked: bool = False
Indicates if the rule is blocked for this tick only. Is reset to False after the tick.
- property cooldown: int
Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.
- property enabled: bool
If
Falsethe rule will not be evaluated. Contrary toblockedthis permanently disables the rule.
- execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)
Attention
Use with care to avoid loops or recursions.
If a
Contextis available prefer usingctx.agent.execute_phaseinstead.
Helper function to execute a phase from within a rule, wrapper of
agents.lunatic_agent.LunaticAgent.execute_phase().- Warns:
ReferenceError – If the weak proxy pointing to the
Contextobject has been deleted. The phase will not be executed.- Parameters:
phase (Phase)
prior_results (Any)
update_controls (Optional[carla.VehicleControl])
See also
- group: str | None = None
Group name of rules that should share their cooldown.
None for a rule to not share its cooldown.
- priority: float | int | RulePriority = 4
Rules are executed in order of their priority, from high to low.
- reset_cooldown(value: int | None = None)
Reset or set the cooldown; for a group rule it resets the group cooldown.
- Parameters:
value (Optional[int])
- set_active(value: bool)
Enables or disables the rule. Contrary to
blockedit will not be reset after the tick.- Parameters:
value (bool)
- set_my_group_cooldown(value: int | None = None)
Update the cooldown of the group this rule belong to
- Parameters:
value (Optional[int])
- start_cooldown: ClassVar[int] = 0
Initial
cooldownwhen initialized. if >0 the rule will not be ready for the first start_cooldown ticks.
- phases: frozenset[Phase]
The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [
Phase].
- actions: dict[Any, CallableActionT]
Dictionary that maps rule results to the action that should be executed.
- action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is True. If actions is set, this is ignored.
- false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is False. May not be set if actions is set.
- overwrite_settings: dict[str, Any]
Settings that should overwrite the agent’s settings for this rule.
Note
The overwrite settings are primitive
dictobjects,omegaconf.DictConfigobjects are converted.
- self_config: RuleConfig
A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.
Can be accessed via ctx.config.current_rule or self.config.self.
Note
Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.
Attention
The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.
- condition: Any
The condition that determines if the rule’s actions should be executed.
- Simple Variant:
return True if the action should be executed, False otherwise. if
Rule.false_actionis defined, False will executeRule.false_action.- Advanced Variant:
return a Hashable value that is used as key in the
actionsdict.
- evaluate_children(ctx: classes.rule.Context) List[Any] | Any[source]
Evaluates the children rules of the current rule in the given context.
- Parameters:
ctx (classes.rule.Context) – The context in which the child rules are evaluated.
- Returns:
The results of evaluating the children rules. Returns a list of results if execute_all_rules is True, otherwise the result of the first rule that was applied.
- Return type:
- class classes.rule.RandomRule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]
Bases:
MultiRuleA rule that selects and evaluates one or more random child rules from a set of rules.
- Parameters:
phases (Union[Phase, Iterable[Phase]]) – The phase or phases in which the rule is applicable.
rules (Union[Dict[Rule, float], List[Rule]]) – The set of rules from which to select random child rules.
repeat_if_not_applicable (bool) – If False, only one rule will be evaluated even if it is not applicable. Defaults to
True.condition (Optional[Callable[[Context], Any]]) – A callable that determines if the rule is applicable in a given context. If None and the rule does not implement a
conditionattribute the rule always executes. Defaults toNone.action (Optional[Callable[[Context], Any]]) – A callable that defines the action to be performed when the rule is applicable. Defaults to
None.ignore_phase (bool) – If True, the rule will be evaluated even if it is not in the specified phase. Defaults to
True.priority (RulePriority) – The priority of the rule.
RulePriority.NORMAL.description (str) – A description of the rule. Defaults to
"If its own condition is true calls one or more random child rules from the passed rules.".overwrite_settings (Optional[Dict[str, Any]]) – A dictionary of settings to overwrite the default settings of the rule. Defaults to
None.cooldown_reset_value (Optional[int]) – The value to reset the cooldown of the rule. Defaults to
None.group (Optional[str]) – The group to which the rule belongs. Defaults to
None.enabled (bool) – If False, the rule will not be evaluated. Defaults to
True.weights (Optional[List[float]]) – The weights associated with each rule when selecting random child rules. Defaults to
None.
- Raises:
ValueError – When passing rules as a dict with weights, the weights argument must be None.
- DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0
Value the cooldown is reset to when
reset_cooldown()is called without a value.Used only when
cooldown_reset_valueis not set.
- __call__(ctx: Context, overwrite: Dict[str, Any] | None = None, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE]
First checks if the rule is applicable, i.e. is its
cooldown == 0, if not returnsNOT_APPLICABLE.- Afterwards evaluates the rules
condition()function. if the result is not in
actionsreturnsNOT_APPLICABLE.otherwise merges the
overwrite_settingswith the py:attr:.Context.config and executes the action.
- Afterwards evaluates the rules
- Parameters:
ctx (Context) – The context object that is passed to the condition function.
overwrite (Optional[Dict[str, Any]]) – Extends
overwrite_settingsfor this call only. Defaults toNone.ignore_phase (bool) – If
Truethe phase check is skipped. Defaults toFalse.ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to
False.
- Return type:
Union[Any, Literal[RuleResult.NOT_APPLICABLE]]
- _auto_init_: ClassVar[bool] = True
If set to False the automatic
__init__creation is disabled when subclassing. This automatic__init__will fix parameters likephasesandconditionto the class.Declaring an
__init__method in the class has the same effect as setting_auto_init_to False.Note
Using
class NewRuleType(metarule=Rule)is nearly equivalent to_auto_init_=False, but is not inherited.
- blocked: bool = False
Indicates if the rule is blocked for this tick only. Is reset to False after the tick.
- property cooldown: int
Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.
- property enabled: bool
If
Falsethe rule will not be evaluated. Contrary toblockedthis permanently disables the rule.
- execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)
Attention
Use with care to avoid loops or recursions.
If a
Contextis available prefer usingctx.agent.execute_phaseinstead.
Helper function to execute a phase from within a rule, wrapper of
agents.lunatic_agent.LunaticAgent.execute_phase().- Warns:
ReferenceError – If the weak proxy pointing to the
Contextobject has been deleted. The phase will not be executed.- Parameters:
phase (Phase)
prior_results (Any)
update_controls (Optional[carla.VehicleControl])
See also
- group: str | None = None
Group name of rules that should share their cooldown.
None for a rule to not share its cooldown.
- priority: float | int | RulePriority = 4
Rules are executed in order of their priority, from high to low.
- reset_cooldown(value: int | None = None)
Reset or set the cooldown; for a group rule it resets the group cooldown.
- Parameters:
value (Optional[int])
- set_active(value: bool)
Enables or disables the rule. Contrary to
blockedit will not be reset after the tick.- Parameters:
value (bool)
- set_my_group_cooldown(value: int | None = None)
Update the cooldown of the group this rule belong to
- Parameters:
value (Optional[int])
- start_cooldown: ClassVar[int] = 0
Initial
cooldownwhen initialized. if >0 the rule will not be ready for the first start_cooldown ticks.
- phases: frozenset[Phase]
The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [
Phase].
- condition: Any
The condition that determines if the rule’s actions should be executed.
- Simple Variant:
return True if the action should be executed, False otherwise. if
Rule.false_actionis defined, False will executeRule.false_action.- Advanced Variant:
return a Hashable value that is used as key in the
actionsdict.
- actions: dict[Any, CallableActionT]
Dictionary that maps rule results to the action that should be executed.
- action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is True. If actions is set, this is ignored.
- false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']
Action that should be executed if the rule is False. May not be set if actions is set.
- overwrite_settings: dict[str, Any]
Settings that should overwrite the agent’s settings for this rule.
Note
The overwrite settings are primitive
dictobjects,omegaconf.DictConfigobjects are converted.
- self_config: RuleConfig
A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.
Can be accessed via ctx.config.current_rule or self.config.self.
Note
Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.
Attention
The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.
- evaluate_children(ctx: Context, overwrite: Dict[str, Any] | None = None) Any[source]
Evaluate a random child rule. If self.repeat_if_not_applicable=False and the randomly chosen rule is not applicable, then no further rules are evaluated For self.repeat_if_not_applicable=False possible rules are evaluated in a random fashion until one rule was applicable.
classes.type_protocols module
Helper library to define type protocols for classes and functions in the project.
Note
That some types in the documentation have been simplified, e.g. some generics have been omitted.
See also
- class classes.type_protocols.AgentConfigT
A
typing.TypeVar: for aAgentConfigtype.alias of TypeVar(‘AgentConfigT’, bound=
AgentConfig)- has_default()
- class classes.type_protocols.RuleT
A
typing.TypeVar: for aRuletype.alias of TypeVar(‘RuleT’, bound=
Rule)- has_default()
- class classes.type_protocols.CallableT
A
typing.TypeVar: for a any callable.alias of TypeVar(‘CallableT’, bound=
Callable[[…],Any])- has_default()
- classes.type_protocols.CallableCondition
A generic type alias for a callable condition function to be used with a
ConditionFunction. Its first arguments must accept aRuleand aContext, or only aContext, additional keyword arguments are allowed. It must return a hashable value.alias of
Callable[[Concatenate[RuleT,Context,_CP]],_CH] |Callable[[Concatenate[Context,_CP]],_CH]
- class classes.type_protocols.CallableConditionT
typing.TypeVarvariant ofAnyCallableCondition.alias of TypeVar(‘CallableConditionT’, bound=
Callable[[Concatenate[RuleT,Context, …]],Hashable] |Callable[[Concatenate[Context, …]],Hashable])- has_default()
- classes.type_protocols.AnyCallableCondition
Non generic variant of
CallableCondition, can use used astyping.TypeAlias.alias of
Callable[[Concatenate[RuleT,Context, …]],Hashable] |Callable[[Concatenate[Context, …]],Hashable]
- classes.type_protocols.ConditionFunctionLike = ConditionFunctionLike
Callable that can be used for
Rule.condition. A callable that uses aContextobject as a single argument, or alternatively aRuleand aContextobject (in this order).The function must return a Hashable value.
- class classes.type_protocols.ConditionFunctionLikeT
TypeVarversion ofConditionFunctionLikealias of TypeVar(‘ConditionFunctionLikeT’, bound=
ConditionFunctionLike[Rule, …,Hashable])- has_default()
- classes.type_protocols.AnyConditionFunctionLike = AnyConditionFunctionLike
A generic type alias for a callable condition function to be used with a
Rule. Its first arguments must accept aRuleand aContext, or only aContext, additional keyword arguments are allowed. It must return a hashable value.
- classes.type_protocols.CallableAction
A generic type alias for a callable action function to be used with a
RuleorConditionFunction.register_action(). Its first arguments must accept aRuleand aContext, or only aContext, additional keyword arguments are allowed. It can return an arbitrary value.alias of
Callable[[Concatenate[RuleT,Context,_P]],_T] |Callable[[Concatenate[Context,_P]],_T]
- class classes.type_protocols.CallableActionT
typing.TypeVarvariant ofAnyCallableAction.alias of TypeVar(‘CallableActionT’, bound=
Callable[[Concatenate[RuleT,Context, …]],Any] |Callable[[Concatenate[Context, …]],Any])- has_default()
- classes.type_protocols.AnyCallableAction
Non generic variant of
CallableAction, can use used astyping.TypeAlias.alias of
Callable[[Concatenate[RuleT,Context, …]],Any] |Callable[[Concatenate[Context, …]],Any]
- class classes.type_protocols.HasBaseSettings(*args, **kwargs)[source]
Bases:
Protocol[AgentConfigT]- BASE_SETTINGS: type[AgentConfigT]
- class classes.type_protocols.HasConfig(*args, **kwargs)[source]
Bases:
Protocol[AgentConfigT]- property config: AgentConfigT
read-only attribute of a
AgentConfigobject; can also be a normal attribute.
- class classes.type_protocols.Has_WorldModel(*args, **kwargs)[source]
Bases:
Protocol- _world_model: WorldModel
- class classes.type_protocols.Has_Vehicle(*args, **kwargs)[source]
Bases:
Protocol- _vehicle: carla.Vehicle
- class classes.type_protocols.HasPlanner(*args, **kwargs)[source]
Bases:
Protocol[LocalPlannerT]Uses a Local planner to calculate controls
- property _local_planner: LocalPlannerT
read-only attribute for a
LocalPlannerobject; can also be a normal attribute.
- _calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl[source]
- Parameters:
debug (bool)
- Return type:
- class classes.type_protocols.HasPlannerWithConfig(*args, **kwargs)[source]
Bases:
HasPlanner[DynamicLocalPlanner],HasConfig[AgentConfigT],ProtocolUses a
DynamicLocalPlannerthat works with aAgentConfig- _calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
- Parameters:
debug (bool)
- Return type:
- property _local_planner: LocalPlannerT
read-only attribute for a
LocalPlannerobject; can also be a normal attribute.
- property config: AgentConfigT
read-only attribute of a
AgentConfigobject; can also be a normal attribute.
- class classes.type_protocols.UseableWithDynamicPlanner(*args, **kwargs)[source]
Bases:
HasPlannerWithConfig,Has_Vehicle,HasContext,ProtocolCan be used with
DynamicLocalPlanner.- _calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
- Parameters:
debug (bool)
- Return type:
- property _local_planner: LocalPlannerT
read-only attribute for a
LocalPlannerobject; can also be a normal attribute.
- property config: AgentConfigT
read-only attribute of a
AgentConfigobject; can also be a normal attribute.
- _vehicle: carla.Vehicle
- class classes.type_protocols.CanDetectObstacles(*args, **kwargs)[source]
Bases:
Has_Vehicle,HasPlanner,HasConfig[BehaviorAgentSettings | LunaticAgentSettings],ProtocolCan be used with
lunatic_agent_tools.detect_obstacles().- _calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
- Parameters:
debug (bool)
- Return type:
- property _local_planner: LocalPlannerT
read-only attribute for a
LocalPlannerobject; can also be a normal attribute.
- property config: AgentConfigT
read-only attribute of a
AgentConfigobject; can also be a normal attribute.
- _vehicle: carla.Vehicle
- class classes.type_protocols.CanDetectNearbyObstacles(*args, **kwargs)[source]
Bases:
CanDetectObstacles,ProtocolCan be used with
lunatic_agent_tools.detect_obstacles_in_path().- _calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
- Parameters:
debug (bool)
- Return type:
- property _local_planner: LocalPlannerT
read-only attribute for a
LocalPlannerobject; can also be a normal attribute.
- property config: AgentConfigT
read-only attribute of a
AgentConfigobject; can also be a normal attribute.
- _vehicle: carla.Vehicle
- all_obstacles_nearby: list[carla.Actor]
Actors that are considered to be near the actor.
- max_detection_distance(lane: Literal['same_lane', 'other_lane']) float[source]
Convenience function to be used with
lunatic_agent_tools.detect_vehicles()andLunaticAgent.detect_obstacles_in_path.The max distance to consider an obstacle in the same lane or in another lane, obstacles further away will be ignored.
- class classes.type_protocols.CanDetectNearbyTrafficLights(*args, **kwargs)[source]
Bases:
CanDetectObstacles,HasStates,Has_WorldModel,ProtocolCan be used with
lunatic_agent_tools.detect_obstacles_in_path().- _calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
- Parameters:
debug (bool)
- Return type:
- property _local_planner: LocalPlannerT
read-only attribute for a
LocalPlannerobject; can also be a normal attribute.
- property config: AgentConfigT
read-only attribute of a
AgentConfigobject; can also be a normal attribute.
- _vehicle: carla.Vehicle
- _world_model: WorldModel
- traffic_lights_nearby: list[carla.TrafficLight]
Actors that are considered to be near the actor.
- _last_traffic_light: carla.TrafficLight | None
Last traffic light that was detected. At a red traffic light it can be checked if this is still red.
Attention
Not necessarily the same as
InformationManager.Information.relevant_traffic_light.
- _current_waypoint: carla.Waypoint
classes.worldmodel module
Interface classes between CARLA, the agent, and the user interface.
- class classes.worldmodel.AccessCarlaMixin[source]
Bases:
objectMixin class that delegates the attributes
client,map, andworldto theCarlaDataProviderto keep them in sync.Note
This mixin only works for instances, they are not class attributes.
- property client: carla.Client
- property world: carla.World
- static get_blueprint_library() carla.BlueprintLibrary[source]
Access to a cached version of the blueprint library
- Raises:
ValueError – If the blueprint library is not set, which happens when the world is not set. The world must be setup before (
CarlaDataProvider.set_world()).- Return type:
- class classes.worldmodel.GameFramework(*args: Any, **kwargs: Any)[source]
Bases:
AccessCarlaMixin,CarlaDataProviderA utility class to setup CARLA, pygame, Hydra, the configuration and parts of the user interface and the agent.
A
GameFrameworkinstance can be used to control the game loop and work as a handler for theexceptionsof this project. Furthermore can it manage the cooldown of theRuleclasses.Note
The GameFramework derives from the
CarlaDataProviderwhich gives this class more utility methods that currently are not included in this part of the documentation.- Parameters:
args (LaunchConfig)
config (Optional[DictConfig])
timeout (float)
worker_threads (int)
- clock: ClassVar[pygame.time.Clock | None] = None
- display: ClassVar[pygame.Surface | Literal[False] | None] = None
The
pygamesurface to render on.Is None before the initialization.
Falseif pygame is disabled.
- property launch_config: LaunchConfig
LaunchConfigobject that was used for the initialization (args)
- property agent_config: LunaticAgentSettings
The configuration of the attached
agent, if it exists otherwise the agent attribute of the storedlaunch_config.
- classmethod quickstart(launch_config: LaunchConfig | None = None, *, logging: bool = False) Self[source]
Initializes Hydra in a limited way, i.e. does not allow for command line overrides.
Sets up the
carla.Clientand related instances as well as pygame.Note
It is recommended that you use a
@hydra.maindecorated main function instead to make full use of the Hydra framework.- Parameters:
launch_config (Optional[LaunchConfig]) – The configuration to use. If
None, will use the default configuration from./conf/launch_config.yaml.logging (bool) – If True, change the how logging is done by applying the logger settings from
./conf/config_extensions/job_logging.yaml. Default isFalse.
- Returns:
The initialized
GameFrameworkinstance.- Return type:
Self
See also
- This function uses:
- static initialize_hydra(config_dir: str = './conf', config_name: str = 'launch_config', version_base=None, *, job_name='LunaticAgentJob', logging=True, structured=True) LaunchConfig[source]
Use this function only if no hydra.main is available.
Usage:
args = GameFramework.initialize_hydra(config_dir=<abs_path_of_conf>, config_name="launch_config") game_framework = GameFramework(args)
- Parameters:
config_dir (str) – The directory where the hydra configuration is stored.
config_name (str) – The name of the configuration file.
version_base – The version base of hydra for the configuration. Default is None.
job_name – The name of the job.
logging – If True, will set up logging.
structured – If True will create the config based on
LaunchConfig, otherwise it will be based on adict. This is useful for runtime type-checks and conversions. If the configsLaunchConfig.strict_configvalue is < 2, this parameter is ignored. Disable if you experience problems. Default isTrue.
- Return type:
See also
- Hydra functions:
hydra.initialize_config_dir()hydra.compose()
- static load_hydra_config(config_name: str = 'conf/launch_config') LaunchConfig[source]
- Parameters:
config_name (str)
- Return type:
- __init__(args: LaunchConfig, config: DictConfig | None = None, timeout: float = 10.0, worker_threads: int = 0, *, map_layers=carla.MapLayer.All)[source]
- Parameters:
args (LaunchConfig) – Configuration for the GameFramework.
config (Optional[DictConfig]) – Optional config for the agent (unused in this project)
timeout (float) – Timeout for the
carla.Client.worker_threads (int) – See
carla.Client.map_layers – See
carla.Client.load_world()
- controller: weakref.ProxyType[RSSKeyboardControl] | RSSKeyboardControl
Parser for keyboard events
- traffic_manager: carla.TrafficManager | None = None
- classmethod init_pygame(launch_config: LaunchConfig | None = None, recreate: bool = False) tuple[pygame.time.Clock, pygame.Surface | Literal[False] | None][source]
- Parameters:
launch_config (Optional[LaunchConfig]) – Will use the
widthandheight<.LaunchConfig.height> attributes of this object if set the :py:mod:`pygamewindows size. Otherwise will use(1280, 720). Defaults toNone.recreate (bool) – If
True, will reinitialize pygame a second time if this function is called.
- Return type:
tuple[pygame.time.Clock, pygame.Surface | Literal[False] | None]
- static init_carla(args: LaunchConfig | None = None, timeout: float | None = None, worker_threads: int = 0, *, map_layers: carla.MapLayer = carla.MapLayer.All) carla.WorldSettings[source]
Initializes the
carla.Clientand the connects it to the simulator.- Parameters:
args (Optional[LaunchConfig]) – The configuration for the GameFramework.
timeout (Optional[float]) –
The timeout for the
carla.Client. Default is 10.0.Deprecated since version _: Use the
LaunchConfig.timeoutattribute instead.worker_threads (int) – See
carla.Client.map_layers (carla.MapLayer) – The map layers to load. Default is
carla.MapLayer.All.
- Returns:
The settings of the loaded world.
- Return type:
Note
This function has to be called before
client,worldormapcan be accessed. Alternatively the client andCarlaDataProviderneed to be setup in a different way beforehand.See also
carla.Client.load_world()
- static setup_client_map_and_world(map_name: str = 'Town04', ip: str = '127.0.0.1', port: int = 2000, *, timeout: float = 10.0, worker_threads: int = 0, reload_world: bool = False, reset_settings: bool = True, map_layers: carla.MapLayer = carla.MapLayer.All, sync: bool | None = True, fps: int = 20) tuple[carla.Client, carla.World, carla.Map][source]
Loads the
carla.Client, thecarla.World, and thecarla.Map.This is a subfunction of
init_carla()that can be used without aLaunchConfig.See also
init_carla()carla.Client.set_timeout()carla.Client.reload_world()- Parameters:
- Return type:
- init_traffic_manager(port: int | None = None) carla.TrafficManager[source]
Returns an instance of the
carla.TrafficManagerrelated to the specified port. If it does not exist, this will be created.See also
carla.Client.get_trafficmanager()- Parameters:
port (Optional[int]) – The port to use. If
None, will use the port fromCarlaDataProvider.get_traffic_manager_port(), which defaults to8000.- Return type:
- init_agent_and_interface(ego: carla.Vehicle | None, agent_class: type[LunaticAgent], config: LunaticAgentSettings | None = None, overwrites: dict[str, Any] | None = None) tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl][source]
Quick setup for the agent and the world model.
- Among others this executes:
from agents.lunatic_agent import LunaticAgent, LunaticAgentSettings ego = world.spawn_actor(world.get_blueprint_library().find("vehicle.audi.tt")) agent, world_model, global_planner, controller = ( game_framework.init_agent_and_interface(ego, LunaticAgent) )
- Parameters:
ego (Optional[carla.Vehicle]) – The ego vehicle. Can be
Noneif the agent is set to use an external actor (LaunchConfig.externalActor).agent_class (type[LunaticAgent]) – The agent class to instantiate.
config (Optional[LunaticAgentSettings]) – The configuration of the agent. If
Nonethe.agent_class.BASE_SETTINGSare used.overwrites (Optional[dict[str, Any]]) – Additional overwrites to the configuration.
- Return type:
tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl]
- make_world_model(config: LunaticAgentSettings, player: carla.Vehicle | None = None) WorldModel[source]
Creates a
WorldModelwith a backreference to the GameFramework.- Parameters:
config (LunaticAgentSettings)
player (Optional[carla.Vehicle])
- Return type:
- make_controller(world_model: WorldModel, controller_class: type[classes.type_protocols.ControllerClassT] = RSSKeyboardControl, **kwargs: Any) classes.type_protocols.ControllerClassT[source]
Creates a keyboard controller and attaches it to the world model.
- Parameters:
world_model (WorldModel) – The world model to attach the controller to.
controller_class (type[classes.type_protocols.ControllerClassT]) – The controller class to instantiate. Defaults to
RSSKeyboardControl.**kwargs (Any) – Additional arguments to pass to the controller.
- Return type:
classes.type_protocols.ControllerClassT
- static hydra_initialized() bool[source]
Checks wether Hydra is initialized. This is normally only done when the
@hydra.main()decorator is used.- Return type:
- static get_hydra_config(raw: Literal[False] = False) HydraConf[source]
- static get_hydra_config(raw: Literal[True]) hydra.core.hydra_config.HydraConfig
Retrieves the Hydra configuration object.
- Parameters:
raw – If
True, returns thehydra.conf.HydraConfdataclass, otherwise thehydra.core.hydra_config.HydraConfigsingleton. Default isFalse.- Raises:
ValueError – If the HydraConfig was not set up yet and
raw=True.
- parse_controller_events(final_controls: carla.VehicleControl | None)[source]
Parses the keyboard events with the
controller.- Raises:
AttributeError – If neither
controlnorworld_model.controllerare set.- Parameters:
final_controls (Optional[carla.VehicleControl])
- render_everything()[source]
Update render and hud
Note
This is the preferred method to update the world and render the camera.
- static skip_rest_of_loop(message: str = 'GameFramework.end_loop') NoReturn[source]
Terminates the current iteration and exits the GameFramework by raising a
ContinueLoopException.Note
It is the users responsibility to manage the agent & local planner before calling this function, i.e. that the agent has a
carla.VehicleControlset.- Raises:
ContinueLoopException – With the given message.
- Parameters:
message (str)
- Return type:
- static destroy_actors(actors: Iterable[carla.Actor | CustomSensorInterface])[source]
Destroys the given actors and customs sensors implemented in this package.
Removes destroyed actors from the
CarlaDataProvideractor pool.- Parameters:
actors (Iterable[carla.Actor | CustomSensorInterface])
- classmethod cleanup(*, disable_sync: bool = True, quit_pygame: bool = True)[source]
Cleans up resources and actors.
- Parameters:
disable_sync (bool) – If True, will disable synchronous mode. This will prevent the freezing of the Unreal Editor. Default is True.
quit_pygame (bool) – If True, will call
pygame.quit(). Default is True.
Note
When called from an instance with an attached agent, the
agent.destroy()method is called.Otherwise will call
CarlaDataProvider.cleanup().
- exceptions
shortcut to
exceptionsmodule containing custom exceptions.
- property client: carla.Client
- static get_blueprint_library() carla.BlueprintLibrary
Access to a cached version of the blueprint library
- Raises:
ValueError – If the blueprint library is not set, which happens when the world is not set. The world must be setup before (
CarlaDataProvider.set_world()).- Return type:
- property world: carla.World
- class classes.worldmodel.WorldModel(*args: Any, **kwargs: Any)[source]
Bases:
AccessCarlaMixin,CarlaDataProviderClass representing the surrounding environment.
This class is the interface between the agent, the , the
HUD, and theKeyboardControl. It handles ticking of the simulator and rendering of the pygame interface.If
LaunchConfig.externalActoris set, it will look for an actor with the role nameLaunchConfig.rolename, if such an actor does not yet exist it will wait for its creation until the calling script continues.- Parameters:
config (LunaticAgentSettings)
args (Union[LaunchConfig, Mapping[str, Any], 'os.PathLike[str]', str])
agent (Optional[LunaticAgent])
carla_world (Optional[carla.World])
player (Optional[carla.Vehicle])
map_inst (Optional[carla.Map])
- controller: RSSKeyboardControl | weakref.ProxyType[RSSKeyboardControl] | None = None
func`weakref.proxy` as backreference. This is not a
weakrefobject, whenwith gameframework(agent)is used.- Type:
Set when controller is created. Uses
- Type:
py
- __init__(config: LunaticAgentSettings, args: LaunchConfig | Mapping[str, Any] | 'os.PathLike[str]' | str = './conf/launch_config.yaml', agent: LunaticAgent | None = None, *, carla_world: carla.World | None = None, player: carla.Vehicle | None = None, map_inst: carla.Map | None = None)[source]
Constructor method
- Parameters:
config (LunaticAgentSettings)
args (Union[LaunchConfig, Mapping[str, Any], 'os.PathLike[str]', str])
agent (Optional[LunaticAgent])
carla_world (Optional[carla.World])
player (Optional[carla.Vehicle])
map_inst (Optional[carla.Map])
- property world: carla.World
- world_settings: carla.WorldSettings
Object containing data about the simulation such as synchrony or rendering mode.
- sync: bool | None
Set from
LaunchConfig.sync
- external_actor: bool
Set from
LaunchConfig.externalActor
- actor_role_name: str | None
Set from
LaunchConfig.rolename
- player: carla.Vehicle
The linked actor. If
external_actoris set this will be the first actor found with that role name.
- camera_manager: CameraManager
Manages cameras for the user interface and
HUD.
- weather: str
Name of currently used weather preset. See also:
CarlaDataProvider.find_weather_presets()
- property client: carla.Client
- static get_blueprint_library() carla.BlueprintLibrary
Access to a cached version of the blueprint library
- Raises:
ValueError – If the blueprint library is not set, which happens when the world is not set. The world must be setup before (
CarlaDataProvider.set_world()).- Return type:
- actors: List[carla.Actor | CustomSensorInterface]
Actors attached to this instance for the user interface and
HUD.
- rss_set_road_boundaries_mode(road_boundaries_mode: 'RssRoadBoundariesModeAlias' | carla.RssRoadBoundariesMode | bool | None = None) None[source]
Choose wether or not to use the RSS road boundaries feature.
Toggles:
RssSettings.use_stay_on_road_feature- Parameters:
road_boundaries_mode (Optional[Union['RssRoadBoundariesModeAlias', carla.RssRoadBoundariesMode, bool]]) – If
None, uses the value from the config. IfTrue, sets tocarla.RssRoadBoundariesMode.On. IfFalse, sets tocarla.RssRoadBoundariesMode.Off.- Return type:
None
See also
- pause_simulation(pause: bool)[source]
Pauses the simulation by setting the world to synchronous mode.
Attention
Only works reliable in asynchronous mode (
sync=False) and might lead to unexpected behavior in synchronous mode.- Parameters:
pause (bool)
- restart()[source]
Restart the world and sets up the
HUDsensors. Ifplayeris not set orexternal_actoris set, looks for an actor with the role name, or spawns a new actor.Note
Called during
__init__().
- tick_server_world() int | carla.WorldSnapshot | None[source]
When
LaunchConfig.handle_ticksisTrueuses or depending onLaunchConfig.sync.- Return type:
int | carla.WorldSnapshot | None
- tick(clock: pygame.time.Clock)[source]
Method for every tick
- Parameters:
clock (pygame.time.Clock)
- next_weather(reverse: bool = False) None[source]
Get next weather setting
- Parameters:
reverse (bool)
- Return type:
None
- toggle_recording()[source]
Start recording images from the camera output.
Saved in
LaunchConfig.camera.recorder.output_pathwith the current frame number.
- modify_vehicle_physics(actor: carla.Vehicle)[source]
- Parameters:
actor (carla.Vehicle)
- finalize_render(display: pygame.Surface)[source]
Draws the HUD and saves the image if recording is enabled.
Attention
Assumes that
render(..., finalize=False)was called before and something should be rendered in between. Usefinalize_render()as the very last step in the render process.- Parameters:
display (pygame.Surface)
- render(display: pygame.Surface, finalize: bool = True)[source]
Render the world and draw it to the
pygame.Surface. This function is part ofGameFramework.render_everything()which is the recommended way to handle the rendering process.Hint
Recording of the fully rendered output should be done at the end of the render method, however
CameraManager.render()is called first to render the camera.Call with finalize=False to only render the camera but not the
HUD.Afterwards apply other render features and call
finalize_render()to draw the HUD and save the image if recording is enabled.Note
This renders the
CameraMangerand the RSS bounding boxes. If finalize isTrue, it will also render theHUDby callingfinalize_render().- Parameters:
display (pygame.Surface)
finalize (bool)
- rss_check_control(vehicle_control: carla.VehicleControl) carla.VehicleControl | None[source]
Checks the vehicle control against the RSS restrictions and possibly proposes an alternative.
This is called during
Phase.RSS_EVALUATION.Todo
This should be an agent method, but currently the
RssSensoris tied to this class and not the agent.- Parameters:
vehicle_control (carla.VehicleControl)
- Return type:
carla.VehicleControl | None