classes package

This package contains helper classes and constants used in this project.

The classes from carla_originals are mostly identical to the ones provided in the examples from the CARLA PythonAPI.

Classes Overview

GameFramework

The GameFramework is a helper-class class for a quicker setup. It manages the game loop of the agent and takes care of:

WorldModel

The WorldModel is a helper-class serving as a interface between the agent, the simulator and the user. It handles the world ticks of the simulator and rendering of the pygame interface.

It is based on the World classes used in the examples from CARLA. It is a extension of the carla.World class and provides the following functionalities:

  • Spawning of the ego actor (optional)

    • Or waits until an external script provides it. Command line argument external_actor="hero".

  • Ticking (sync=True) or waiting for the tick (sync=False) of the carla.World

  • HUD management

  • RSS features

  • Weather change

  • Cleanup when the program ends

HUD and Camera Manager

The HUD is a modification of the HUD classes used in the manual_control_rss.py example of Carla.

It closely combines with the classes.ui.camera_manager.CameraManager to provide the visual human interface for the pygame window.

Rules

See the Rule documentation for details.

RssSensor and Visualization

See the RSS documentation from CARLA.

KeyboardControls

See RSSKeyboardControl API for a more details.

Constants and Enums

See contents of the classes.constants module.

carla_originals

This package contains (mostly) unmodified classes extracted from examples provided by CARLA. The classes have are imported into this package.

Driver, Vehicle, VehicleSpawner

These are classes that are deprecated or will be merged or are up to major changes.

Rule Interpreter

This class allows for an alternative way to execute rules through different interfaces (python, yaml, xml, etc). It is not yet further integrated and documented.

Exceptions Overview

  • AgentDoneException Raised when there is no more waypoint in the queue to follow and no rule set a new destination.

  • ContinueLoopException Raise when the agent.run_step of the agent should not be continued further. The agent then returns the current ctx.control to the caller of run_step.

  • UserInterruption Terminate the run_step loop if user input is detected, for example KeyboardInterrupt or a pygame hotkey like Esc. This allows the scenario runner and leaderboard to exit gracefully.

  • UpdatedPathException Should be raised when the path has been updated and the agent should replan. Phase.DONE | END

    For more details see the classes.exceptions API documentation.


class classes.Any(*args, **kwargs)[source]

Bases: object

Special type indicating an unconstrained type.

  • Any is compatible with every type.

  • Any assumed to have all methods.

  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.

classes.overload(func)[source]

Decorator for overloaded functions/methods.

In a stub file, place two or more stub definitions for the same function in a row, each decorated with @overload.

For example:

@overload
def utf8(value: None) -> None: ...
@overload
def utf8(value: bytes) -> bytes: ...
@overload
def utf8(value: str) -> bytes: ...

In a non-stub file (i.e. a regular .py file), do the same but follow it with an implementation. The implementation should not be decorated with @overload:

@overload
def utf8(value: None) -> None: ...
@overload
def utf8(value: bytes) -> bytes: ...
@overload
def utf8(value: str) -> bytes: ...
def utf8(value):
    ...  # implementation goes here

The overloads for a function can be retrieved at runtime using the get_overloads() function.

class classes.TypeVar(name, *constraints, bound=None, covariant=False, contravariant=False, default=NoDefault, infer_variance=False)[source]

Bases: object

Type variable.

classes.final(f)[source]

Decorator to indicate final methods and final classes.

Use this decorator to indicate to type checkers that the decorated method cannot be overridden, and decorated class cannot be subclassed.

For example:

class Base:
    @final
    def done(self) -> None:
        ...
class Sub(Base):
    def done(self) -> None:  # Error reported by type checker
        ...

@final
class Leaf:
    ...
class Other(Leaf):  # Error reported by type checker
    ...

There is no runtime checking of these properties. The decorator attempts to set the __final__ attribute to True on the decorated object to allow runtime introspection.

class classes.Protocol[source]

Bases: Generic

Base class for protocol classes.

Protocol classes are defined as:

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example:

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...

Subpackages

Submodules

classes.constants module

In this module defines enums and constants that are used throughout the project.

classes.constants.AD_RSS_AVAILABLE: bool

Indicator if the carla.ad module is available, i.e. if the current carla version was build with RSS support. As a rule of thumb: On Windows this will be always false. If this is false some objects will be missing in the carla module, for example the carla.RssSensor, likewise are some utilities of this project involving RSS not be available.

class classes.constants.AgentState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Flag

High-level states that the agent currently can be in.

Used with LunaticAgent.current_states and InformationManager.state_counter.

DRIVING
STOPPED
BLOCKED_BY_VEHICLE
BLOCKED_RED_LIGHT
BLOCKED_BY_STATIC

Static obstacle.

Attention

Not updated by the information manager but in the Phase.DETECT_STATIC_OBSTACLES phase.

BLOCKED_OTHER
REVERSE
OVERTAKING
AGAINST_LANE_DIRECTION
PARKED = 6

Includes STOPPED

BLOCKED

Combination of BLOCKED_OTHER | BLOCKED_BY_VEHICLE | BLOCKED_RED_LIGHT | BLOCKED_BY_STATIC

class classes.constants.RulePriority(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Priority of a Rule. The higher a value, the higher the priority. Rules are sorted by their priority before being applied.

NULL = 0
LOWEST = 1
LOW = 2
NORMAL = 4
HIGH = 8
HIGHEST = 16
class classes.constants.Phase(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Flag

A rough order of the looped through states of the agent is:

<Phases.NONE: 0>,
<Phases.UPDATE_INFORMATION|BEGIN: 5>,
<Phases.UPDATE_INFORMATION|END: 6>,
<Phases.PLAN_PATH|BEGIN: 9>,
<Phases.PLAN_PATH|END: 10>,
<Phases.DETECT_TRAFFIC_LIGHTS|BEGIN: 17>,
<Phases.DETECT_TRAFFIC_LIGHTS|END: 18>,
<Phases.DETECT_PEDESTRIANS|BEGIN: 33>,
<Phases.DETECT_PEDESTRIANS|END: 34>,
<Phases.DETECT_CARS|BEGIN: 65>,
<Phases.DETECT_CARS|END: 66>,
<Phases.POST_DETECTION_PHASE|BEGIN: 129>,
<Phases.POST_DETECTION_PHASE|END: 130>,
<Phases.EXECUTION|BEGIN: 1025>,
<Phases.EXECUTION|END: 1026>

Note

The above cycle list is not up to date.

A complete list of all the main phases can be obtained by calling
NONE = 0
BEGIN

Indicates the beginning of a phase. Should always be combined with another phase.

END

Indicates the end of a phase. Should always be combined with another phase.

UPDATE_INFORMATION

Indicates the execution of the agents update_information() method

This is the first Phase of the agent and executed every step.

PLAN_PATH

Indicates that the planning of a new path has started.

Attention

If the the path is replanned a UpdatedPathException should be raised.

DETECT_TRAFFIC_LIGHTS

Executed during the agents detect_hazard() method. Can add Hazard.TRAFFIC_LIGHT_RED or Hazard.TRAFFIC_LIGHT_YELLOW to the agents LunaticAgent.detected_hazards.

DETECT_PEDESTRIANS

Executed during the agents detect_hazard() method. Can add Hazard.PEDESTRIAN to the agents LunaticAgent.detected_hazards.

DETECT_STATIC_OBSTACLES
Checks for static obstacles in the agents path with
DETECT_CARS
Checks for vehicles in the agents path with

Note

If a car was detected executes Phase.CAR_DETECTED | BEGIN, Phase.DETECT_CARS | BEGIN is only executed if no car was detected.

TAKE_NORMAL_STEP

During this phase the carla.VehicleControl is calculated by the local planner.

RSS_EVALUATION

See also

APPLY_MANUAL_CONTROLS

Applied manually via human user interface.

EXECUTION

Executed when the control is applied to the agent.

CAR_DETECTED

The Phase.CAR_DETECTED | BEGIN is executed when a car is detected and follows the Phase.DETECT_CARS | BEGIN phase. If no car is detected the Phase.DETECT_CARS | END phase is executed.

TURNING_AT_JUNCTION

Indicates that the agent is turning at a junction.

Warning

This Phase might become obsolete.

HAZARD

Not implemented. Refer to EMERGENCY | BEGIN

EMERGENCY = 32768

Special Phase

See also

COLLISION

Special phase that is executed out-of-order when a carla.Sensor detects a collision.

DONE

Indicates that the agent is at the end of its path and agent.done() is True.

TERMINATING

Can be called when the agent is terminating. Must be executed by the user.

CUSTOM_CYCLE

experimental

Can be used to indicate that the phase change is currently handled by the user.

Warning

LunaticAgent.execute_phase() checks for exact match, i.e. a phase UPDATE_INFORMATION | BEGIN | END will not be executed in the normal loop.

See also

Executed in BlockingRule.loop_agent

DETECT_NON_CARS

Combination of DETECT_STATIC_OBSTACLES | DETECT_TRAFFIC_LIGHTS | DETECT_PEDESTRIANS

DETECTION_PHASE

Combination of DETECT_STATIC_OBSTACLES | DETECT_TRAFFIC_LIGHTS | DETECT_PEDESTRIANS | DETECT_CARS

EXCEPTIONS

Combination of HAZARD | EMERGENCY | COLLISION | TURNING_AT_JUNCTION | CAR_DETECTED | DONE | TERMINATING

USER_CONTROLLED

Phases that might or not be went through as they must be implemented manually by the user.

Combination of APPLY_MANUAL_CONTROLS | EXECUTION | TERMINATING | CUSTOM_CYCLE

NORMAL_LOOP

Combination of UPDATE_INFORMATION | PLAN_PATH | DETECTION_PHASE | TAKE_NORMAL_STEP

IN_LOOP

Phases that are executed in or before the inner step, EMERGENCY is executed, right after the inner step.

Combination of NORMAL_LOOP | EMERGENCY | COLLISION

classmethod get_phases()[source]

Get all BEGIN and END phases combination.

classmethod from_string(string: str) Phase[source]

Utility method that turns a string like Phase.NAME | BEGIN | ... into a Phase.

Note

Only supports the operator |. NAME must be a valid Phase name.

Parameters:

string (str)

Return type:

Phase

class classes.constants.Hazard(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Flag

Values currently stored in the agent’s detected_hazards. attribute.

TRAFFIC_LIGHT_RED
TRAFFIC_LIGHT_YELLOW
PEDESTRIAN
CAR
STATIC_OBSTACLE

Note

These refer to actors and not the environment barriers.

OTHER
JUNCTION
OBSTACLE

Combination of PEDESTRIAN | CAR | STATIC_OBSTACLE

TRAFFIC_LIGHT

Combination of TRAFFIC_LIGHT_RED | TRAFFIC_LIGHT_YELLOW

class classes.constants.HazardSeverity(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Flag

High level descriptions to further weight Hazard. The HazardSeverity flags are stored in detected_hazards_info.

UNKNOWN = -1
NONE = 0

Initial values for LunaticAgent.detected_hazards_info

WARNING = 1
COLLISION

Special case for collision.

CRITICAL = 3

Includes WARNING

EMERGENCY = 7

Includes WARNING and CRITICAL

class classes.constants.RoadOption(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

RoadOption represents the possible topological configurations when moving from a segment of lane to other.

See also

RoadOptionColor for the color representation of the road options.

VOID = -1

Indicated by green

LEFT

Indicated by yellow

RIGHT

Indicated by cyan

STRAIGHT

Indicated by gray

LANEFOLLOW

Indicated by green

CHANGELANELEFT

Indicated by orange

CHANGELANERIGHT

Indicated by dark cyan

class classes.constants.RoadOptionColor(option: RoadOption)[source]

Bases: object

Points to a carla.Color object that represents the color of the road option.

Supports __getitem__(name) and __call__(RoadOption) for easy access.

Parameters:

option (RoadOption)

Return type:

carla.Color

VOID

Green

LEFT

Yellow

RIGHT

Cyan

STRAIGHT

Gray

LANEFOLLOW

Green

CHANGELANELEFT

Orange

CHANGELANERIGHT

Dark Cyan

class classes.constants.RssLogLevelStub(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: _CarlaIntEnum

Enum declaration used in carla.RssSensor to set the log level.

Parameters:

names (ClassVar[Dict[str, Self]])

trace = 0
debug = 1
info = 2
warn = 3
err = 4
critical = 5
off = 6
class classes.constants.RssRoadBoundariesModeStub(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: _CarlaIntEnum

Enum declaration used in carla.RssSensor to enable or disable the stay on road feature. In summary, this feature considers the road boundaries as virtual objects. The minimum safety distance check is applied to these virtual walls, in order to make sure the vehicle does not drive off the road.

Parameters:

names (ClassVar[Dict[str, Self]])

Off = 0
On = 1
class classes.constants.RuleResult(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Special objects that indicate special return values a Rule

NO_RESULT

Indicates the the rule returned no result, e.g. because an exception was raised.

NOT_APPLICABLE

Object that indicates that no action was executed, e.g. because the rule is on cooldown or blocked.

class classes.constants.StreetType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Used by the DetectionMatrix to interpret the street type.

ON_HIGHWAY = 'On highway'
NON_HIGHWAY_STREET = 'Non highway street'
ON_JUNCTION = 'On junction'
ON_HIGHWAY_ENTRY = 'On highway entry'
ON_HIGHWAY_EXIT = 'On highway exit'
JUNCTION_AHEAD = 'Junction ahead'
HIGHWAY_TRAFFIC_LIGHT = 'Highway traffic light'
HIGHWAY_WITH_ENTRY_AND_EXIT = 'Highway with entry/exit'
class classes.constants.StreetOccupation(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum to interpret the results of the DetectionMatrix

NO_CAR = 0
EGO = 1
CAR = 2
NO_ROAD = 3

classes.detection_matrix module

classes.detection_matrix.matrix_for_actor(ego_vehicle: carla.Actor, road_lane_ids: set[RoadLaneId], radius: float = 100.0, highway_shape: 'HighWayShape' | None = None)[source]

Calculates the detection matrix for the given actor.

Parameters:
  • ego_vehicle (carla.Actor) – The ego vehicle

  • highway_shape (tuple) – Tuple containing highway_type, number of straight highway lanes, entry waypoint tuple and/ exit waypoint tuple. Format: (highway_type: string, straight_lanes: int, entry_wps: ([wp,..], [wp,..]), exit_wps: ([wp,..], [wp,..]))

  • road_lane_ids (set[RoadLaneId])

  • radius (float)

Note

CarlaDataProvider needs to be set up before calling this function.

class classes.detection_matrix.DetectionMatrix(ego_vehicle: carla.Actor, road_lane_ids: Set[RoadLaneId] | None = None, radius: float = 100.0)[source]

Bases: object

Automatically create a matrix representing the lanes around the ego vehicle on each update() call.

Parameters:
  • ego_vehicle (carla.Actor)

  • road_lane_ids (Optional[Set[RoadLaneId]])

  • radius (float)

__init__(ego_vehicle: carla.Actor, road_lane_ids: Set[RoadLaneId] | None = None, radius: float = 100.0)[source]
Parameters:
  • ego_vehicle (carla.Actor)

  • road_lane_ids (Optional[Set[RoadLaneId]])

  • radius (float)

running

If the matrix will perform updates.

matrix: Dict[int, List[int]]

An ordered dictionary representing the city matrix. The keys for existing lanes are the lane IDs in the format “road_id_lane_id”. For non-existing lanes different placeholder exist, e.g. left_outer_lane, left_inner_lane, No_4th_lane, No_opposing_direction The values indicate whether a vehicle is present: 0 - No vehicle, 1 - Ego vehicle, 3 - No road. Format example:

{
    "left_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3],
    "left_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3],
    "1_2": [0, 0, 0, 0, 0, 0, 0, 0],
    "1_1": [0, 0, 0, 0, 0, 0, 0, 0],
    "1_-1": [0, 0, 0, 0, 0, 0, 0, 0],
    "1_-2": [0, 0, 0, 0, 0, 0, 0, 0],
    "right_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3],
    "right_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3],
}

Attention

  • Currently the keys are replaces by numbers.

  • In case of an unsupported layout or before the very first update the matrix can be None. Be aware of this when using AsyncDetectionMatrix.

Type:

A collections.OrderedDict

update() Dict[int, List[int]] | None[source]

If the matrix is running, it will update the matrix and return it, otherwise returns None.

Return type:

Dict[int, List[int]] | None

getMatrix() Dict[int, List[int]][source]
Return type:

Dict[int, List[int]]

to_list() list[list[int]] | None[source]

Returns the values of matrix as a list.

Return type:

list[list[int]] | None

to_numpy() np.ndarray[int, Any] | None[source]

Returns the values of matrix as a numpy array.

Return type:

np.ndarray[int, Any] | None

render(display: pygame.Surface, imshow_settings: dict[str, Any] = {'cmap': 'jet'}, vertical: bool = True, draw_values: bool = True, text_settings: dict[str, Any] = {'color': 'orange'}, *, draw: bool = True) None[source]

Renders the matrix on the given surface using matplotlib.

Parameters:
  • display (pygame.Surface) – The surface to render the matrix on.

  • imshow_settings (dict[str, Any]) – The settings for matplotlib.pyplot.imshow(). Defaults to {'cmap': 'jet'}.

  • vertical (bool) – If the lanes should be displayed vertically. Defaults to True.

  • draw_values (bool) – If the entries should be displayed as text. Defaults to True.

  • text_settings (dict[str, Any]) – The settings for matplotlib.pyplot.text() when draw_values. Defaults to {'color': 'orange'}.

  • draw (bool) – If the matrix should be drawn. If False, this function will do nothing.

Return type:

None

start()[source]

Allows the matrix to update.

stop()[source]

Prevents the matrix from updating.

class classes.detection_matrix.AsyncDetectionMatrix(ego_vehicle: carla.Actor, *, road_lane_ids: Set[RoadLaneId] | None = None, sleep_time: float = 0.1)[source]

Bases: DetectionMatrix

Asynchronous version of the DetectionMatrix.

Will calculate the matrix update in a separate thread.

Parameters:
  • ego_vehicle (carla.Actor)

  • road_lane_ids (Optional[Set[RoadLaneId]])

  • sleep_time (float)

__init__(ego_vehicle: carla.Actor, *, road_lane_ids: Set[RoadLaneId] | None = None, sleep_time: float = 0.1)[source]
Parameters:
  • ego_vehicle (carla.Actor) – The ego vehicle.

  • sleep_time (float) – The time to sleep between updates. Defaults to 0.1 seconds

  • road_lane_ids (Optional[Set[RoadLaneId]]) – The road and lane IDs to consider. If not provided, all will be considered.

update() None[source]

Not available in the async version.

Return type:

None

getMatrix()[source]
render(display: pygame.Surface, imshow_settings: dict[str, Any] = {'cmap': 'jet'}, vertical: bool = True, draw_values: bool = True, text_settings: dict[str, Any] = {'color': 'orange'}, *, draw: bool = True) None

Renders the matrix on the given surface using matplotlib.

Parameters:
  • display (pygame.Surface) – The surface to render the matrix on.

  • imshow_settings (dict[str, Any]) – The settings for matplotlib.pyplot.imshow(). Defaults to {'cmap': 'jet'}.

  • vertical (bool) – If the lanes should be displayed vertically. Defaults to True.

  • draw_values (bool) – If the entries should be displayed as text. Defaults to True.

  • text_settings (dict[str, Any]) – The settings for matplotlib.pyplot.text() when draw_values. Defaults to {'color': 'orange'}.

  • draw (bool) – If the matrix should be drawn. If False, this function will do nothing.

Return type:

None

to_list() list[list[int]] | None

Returns the values of matrix as a list.

Return type:

list[list[int]] | None

to_numpy() np.ndarray[int, Any] | None

Returns the values of matrix as a numpy array.

Return type:

np.ndarray[int, Any] | None

matrix: Dict[int, List[int]]

An ordered dictionary representing the city matrix. The keys for existing lanes are the lane IDs in the format “road_id_lane_id”. For non-existing lanes different placeholder exist, e.g. left_outer_lane, left_inner_lane, No_4th_lane, No_opposing_direction The values indicate whether a vehicle is present: 0 - No vehicle, 1 - Ego vehicle, 3 - No road. Format example:

{
    "left_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3],
    "left_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3],
    "1_2": [0, 0, 0, 0, 0, 0, 0, 0],
    "1_1": [0, 0, 0, 0, 0, 0, 0, 0],
    "1_-1": [0, 0, 0, 0, 0, 0, 0, 0],
    "1_-2": [0, 0, 0, 0, 0, 0, 0, 0],
    "right_inner_lane": [3, 3, 3, 3, 3, 3, 3, 3],
    "right_outer_lane": [3, 3, 3, 3, 3, 3, 3, 3],
}

Attention

  • Currently the keys are replaces by numbers.

  • In case of an unsupported layout or before the very first update the matrix can be None. Be aware of this when using AsyncDetectionMatrix.

Type:

A collections.OrderedDict

running

If the matrix will perform updates.

start()[source]

Allows the matrix to update.

stop(timeout: float | None = None)[source]
Raises:

RuntimeError – if timeout is not None and the thread is still alive after the time.

Parameters:

timeout (float | None)

classes.evaluation_function module

class classes.evaluation_function.ConditionFunction(first_argument: str(name) | ~typing.Callable[[~typing.Concatenate[~classes.type_protocols.RuleT, Context, ...]], ~typing.Hashable] | ~typing.Callable[[~typing.Concatenate[Context, ...]], ~typing.Hashable] | None = None, name: str = 'ConditionFunction', *, truthy: bool = False, use_self: bool | None = None)[source]

Bases: Generic[_CP, _CH]

Implements a decorator to wrap function to be used with Rule classes. The function must return a hashable type, which is used to access the action to be taken by the Rule.condition function of the rule.

Evaluation functions can be combined using the AND, OR and NOT operators to build up more complex rules from simpler ones. The operators +, &, | and ~ are aliases for AND, OR and NOT respectively. e.g. with these two functions: func1 = ConditionFunction(lambda ctx: ctx.speed > 10) func2 = ConditionFunction(lambda ctx: ctx.speed < 20)

These statements are all equivalent:
  • ConditionFunction(lambda ctx: 10 < ctx.speed < 20)

  • func1 + func2

  • func1 & func2

  • func1.AND(func2)

  • ConditionFunction.AND(func1, func2)

Hint

ConditionFunctions also allow for more specific returns types

Example:

@ConditionFunction
def is_speeding(ctx: Context) -> Hashable:
    config = ctx.agent.config
    if config.speed > config.speed_limit+20:
        return "very fast"
    elif config.speed > config.speed_limit+5:
        return "fast"
    elif: config.speed < config.speed_limit-20:
        return "very slow"
    else:
        return "normal"

Rule(is_speeding,
    action={
        "very fast": lambda ctx: ctx.agent.config.follow_speed_limits(),
        "fast" : lambda ctx: ctx.agent.config.set_target_speed(ctx.speed_limit+5)
    })
Parameters:
  • first_argument (str(name) | Callable[[Concatenate[RuleT, Context, ...]], Hashable] | Callable[[Concatenate[Context, ...]], Hashable] | None) – If None or a string, the class will create a decorator that expects a callable as the first argument. If a string is passed it substitutes as the name argument. Otherwise a callable is expected like in the snippet used above.

  • name (str) – The name to represent the function. Defaults to "ConditionFunction".

  • truthy (bool) – If True, the function will always cast the return value to a boolean value. Defaults to False.

  • use_self (Optional[bool]) – If True, the function will be treated as a method and the first argument will be the instance of the Rule that uses this function. If None, the decision depends on the signature of the function, if it has only one argument only the Context object is passed, if it has two or more arguments the first argument that is passed is the instance of the Rule. Use False to not use the instance of the Rule as the first argument. Defaults to None.

Returns:

A ConditionFunction or a partially initialized version to be used as a decorator when the first_argument is not a callable.

Return type:

ConditionFunction | partial[ConditionFunction]

Generics:
evaluation_function: CallableConditionT

The function that is wrapped by the ConditionFunction. Uses the generic type hints _CP, _CH of the class.

actions: dict[Hashable, CallableActionT] = {}

Mapping of return values to actions to be executed. If this dictionary is not empty it will be used as the Rule.actions dictionary.

copy(copy_actions: bool = False)[source]

Copies the class by creating a new instance.

Parameters:

copy_actions (bool) – If True, the ConditionFunction.actions dictionary is copied as well. Defaults to False.

Returns:

A new instance, with the same __init__ arguments.

Return type:

ConditionFunction

Warning

Be aware that when using copy_actions the actions themselves are not copied; they are identical and shared.

register_action(key: Hashable = True, *, use_self: bool | None = None, **kwargs: ParamSpecKwargs) Callable[[CallableT], CallableT][source]
register_action(action_function: CallableT, key: Hashable = True, *, use_self: bool | None = None, **kwargs: ParamSpecKwargs) CallableT

Add an action to be executed when the condition function returns a specific value.

This function can be used as a decorator or as a method:

# As decorator
@ConditionFunction
def is_speeding(ctx: Context) -> Literal["very fast", "fast", "normal", "slow", True]:
    ...

@is_speeding.register_action(key="very fast")
# or
@is_speeding.register_action  # default key is True
def very_fast_action(ctx: Context):
    ctx.agent.set_target_speed(ctx.config.target_speed-5)

# Or as function
def fast_action(ctx: Context, speed: int):
    ctx.agent.set_target_speed(speed)

is_speeding.register_action("fast", fast_action)

# Custom keywords without Rule instance
is_speeding.register_action("normal",
                            use_self=False,  # no Rule in the signature
                            speed=ctx.config.target_speed)  # keyword argument
def custom_speed(ctx: Context, speed: int):
    ctx.agent.set_target_speed(speed)
Parameters:
  • key (Hashable) – If the condition function returns this value, this action will be executed. Defaults to True.

  • action_function – (When not used as decorator) The action to be executed.

  • use_self (Optional[bool]) – If not None, a use_self attribute is set on the function to indicate if the function is a method or not.

  • kwargs (ParamSpecKwargs) – Keyword arguments for the decorated function.

Returns:

Decorator to be used or the decorated function as ConditionFunction.

Return type:

_RegisterActionDecorator

Note

Only one action is allowed per key. If an action is already registered for the key, it will be overwritten.

classmethod AND(func1: ConditionFunction[ParamSpec, _CH], func2: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]

Combine two functions with and, i.e. to return True if both return True.

Parameters:
Return type:

ConditionFunction[ParamSpec, _CH | Hashable]

classmethod OR(func1: ConditionFunction[ParamSpec, _CH], func2: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]

Combine two functions to return True if either returns True.

Parameters:
Return type:

ConditionFunction[ParamSpec, _CH | Hashable]

classmethod NOT(func: ConditionFunction[ParamSpec, _CH]) ConditionFunction[ParamSpec, bool][source]

Invert the return value of a function.

Parameters:

func (ConditionFunction[ParamSpec, _CH])

Return type:

ConditionFunction[ParamSpec, bool]

__add__(other: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]

Combine with another function using AND().

Parameters:

other (ConditionFunction[ParamSpec, Hashable])

Return type:

ConditionFunction[ParamSpec, _CH | Hashable]

__and__(other: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]

Combine with another function using AND().

Parameters:

other (ConditionFunction[ParamSpec, Hashable])

Return type:

ConditionFunction[ParamSpec, _CH | Hashable]

__or__(other: ConditionFunction[ParamSpec, Hashable]) ConditionFunction[ParamSpec, _CH | Hashable][source]

Combine with another function using OR().

Parameters:

other (ConditionFunction[ParamSpec, Hashable])

Return type:

ConditionFunction[ParamSpec, _CH | Hashable]

__invert__() ConditionFunction[ParamSpec, bool][source]

Invert the return value of the function.

Return type:

ConditionFunction[ParamSpec, bool]

classes.exceptions module

Helper module that contains all the custom exceptions used in the project

exception classes.exceptions.AgentDoneException[source]

Bases: LunaticAgentException

Raised when there is no more waypoint in the queue to follow and no rule set a new destination.

When the a GameFramework instance is used as context manager will set game_framework.continue_loop to False.

exception classes.exceptions.ContinueLoopException[source]

Bases: LunaticAgentException

Raise when LunaticAgent.run_step() action of the agent should not be continued further.

The agent returns the current ctx.control to the caller of run_step.

Note

Handled in LunaticAgent.run_step(), this exception should not propagate outside. It can be caught by GameFramework and skip the current loop and not apply any controls, an error will be logged.

exception classes.exceptions.DoNotEvaluateChildRules(result: Any = RuleResult.NO_RESULT, *args: object)[source]

Bases: _RuleResultException

Can be raised in a MultiRule to prevent the evaluation of child rules.

Can also be raised by child rules to prevent the evaluation of further child rules.

Parameters:
exception classes.exceptions.EmergencyStopException(hazards: set[Hazard], *args: object)[source]

Bases: LunaticAgentException

Parameters:
Return type:

None

__init__(hazards: set[Hazard], *args: object) None[source]
Parameters:
Return type:

None

hazards_detected: set[Hazard]
exception classes.exceptions.LunaticAgentException[source]

Bases: Exception

Base class for all custom exceptions that influence the Workflow of the LunaticAgent.

exception classes.exceptions.NoFurtherRulesException(result: Any = RuleResult.NO_RESULT, *args: object)[source]

Bases: _RuleResultException

Raised when no further rules should be executed in this phase.

Caught by LunaticAgent.execute_phase().

The agent will continue at the phase where the BlockedRule was triggered.

Parameters:
exception classes.exceptions.SkipInnerLoopException(planned_control: carla.VehicleControl, *args: object)[source]

Bases: LunaticAgentException

Can be raised in LunaticAgent._inner_step. A new control object must be provided.

Parameters:
Return type:

None

__init__(planned_control: carla.VehicleControl, *args: object) None[source]
Parameters:
Return type:

None

planned_control: carla.VehicleControl
exception classes.exceptions.UnblockRuleException(result: Any = RuleResult.NO_RESULT, *args: object)[source]

Bases: _RuleResultException

Can be raised in a BlockedRule to end it.

The agent will continue at the phase where the BlockedRule was triggered.

Note

Further rules that are in this phase can still be executed. Alternatively, consider raising a NoFurtherRulesException.

Parameters:
exception classes.exceptions.UpdatedPathException[source]

Bases: LunaticAgentException

Should be raised when the path has been updated and the agent should replan.

Rules that replan on Phase.DONE | END, should throw this exception at the end.

exception classes.exceptions.UserInterruption[source]

Bases: Exception

Terminate the loop if user input is detected. Allows the scenario runner and Leaderboard to exit gracefully, if handled appropriately, e.g. by directly returning.

Thrown by LunaticAgent.parse_keyboard_input.

Note

Is not a LunaticAgentException.

exception classes.exceptions._RuleResultException(result: Any = RuleResult.NO_RESULT, *args: object)[source]

Bases: LunaticAgentException

Abstract class for exceptions that can be raised by rules that still are able to return a result.

Parameters:
__init__(result: Any = RuleResult.NO_RESULT, *args: object)[source]
Parameters:
result: Any = <object object>

classes.information_manager module

Aim of this module is to provide a less convoluted access to information, i.e. distill the information from the data and return high level information

class classes.information_manager.InformationManager(agent: LunaticAgent, update_information: bool = True)[source]

Bases: object

Tracks global information, e.g. all actors, traffic lights, etc. as well as agent specific information, e.g. current speed, current location, and nearby actors in relation to the agent.

Tip

global information is attributed by ClassVar and staticmethod and is shared across all instances, this information is constant for the current tick.

Parameters:
relevant_traffic_light: carla.TrafficLight | None = None

Result of CarlaDataProvider.get_next_traffic_light()

relevant_traffic_light_distance: float = inf

Distance to the relevant_traffic_light Is float(‘inf’) if relevant_traffic_light is None.

gathered_information: InformationManager.Information

InformationManager.Information gathered during tick()

vehicles: ClassVar['list[carla.Vehicle]']

List of all tracked vehicles

walkers: ClassVar['list[carla.Walker]']

List of all tracked pedestrians

static_obstacles: ClassVar['list[carla.Actor]']

List of all tracked static obstacles

obstacles: ClassVar['list[carla.Actor]']

Union of vehicles, py:attr:walkers and py:attr:static_obstacles

lights_map: ClassVar['Dict[int, carla.Waypoint]'] = {}

Map of traffic lights to their trigger waypoints

frame: ClassVar['int | None'] = None

Last frame the InformationManager was updated.

The current frame of the world should be passed to the global_tick method.

__init__(agent: LunaticAgent, update_information: bool = True)[source]
Parameters:
state_counter: Dict[AgentState, int]

Tracks different AgentState and the amount of ticks the agent is in this state.

detect_next_traffic_light()[source]

Set the relevant_traffic_light and relevant_traffic_light_distance if not set.

Note

Does not check for planned path but current route along waypoints, might not be exact.

This function is automatically called in :py:meth:`tick`

tick()[source]

Tick the information manager and update the information for the corresponding agent.

class Information(current_waypoint: carla.Waypoint, current_speed: float, current_states: Dict[AgentState, int], relevant_traffic_light: carla.TrafficLight | None, relevant_traffic_light_distance: float, vehicles: List[carla.Vehicle], walkers: List[carla.Walker], static_obstacles: List[carla.Actor], obstacles: List[carla.Actor], walkers_nearby: List[carla.Walker], vehicles_nearby: List[carla.Vehicle], static_obstacles_nearby: List[carla.Actor], obstacles_nearby: List[carla.Actor], traffic_lights_nearby: List[carla.TrafficLight], distances: Dict[carla.Actor, float])[source]

Bases: NamedTuple

Data gathered by the InformationManager which is passed to the agent.

Parameters:
current_waypoint: carla.Waypoint

Alias for field number 0

current_speed: float

Alias for field number 1

current_states: Dict[AgentState, int]

Alias for field number 2

relevant_traffic_light: carla.TrafficLight | None

Alias for field number 3

relevant_traffic_light_distance: float

Alias for field number 4

vehicles: List[carla.Vehicle]

Alias for field number 5

walkers: List[carla.Walker]

Alias for field number 6

static_obstacles: List[carla.Actor]

Filtered obstacles by InformationManager.OBSTACLE_FILTER

count(value, /)

Return number of occurrences of value.

index(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

obstacles: List[carla.Actor]

Union of vehicles, walkers and static_obstacles

walkers_nearby: List[carla.Walker]

Alias for field number 9

vehicles_nearby: List[carla.Vehicle]

Alias for field number 10

static_obstacles_nearby: List[carla.Actor]

Alias for field number 11

obstacles_nearby: List[carla.Actor]

Alias for field number 12

traffic_lights_nearby: List[carla.TrafficLight]

Alias for field number 13

distances: Dict[carla.Actor, float]

Distances to all actors in obstacles

OBSTACLE_FILTER: str = 'static.prop.[cistmw]*'

fnmatch for obstacles that the agent will consider in its path. https://carla.readthedocs.io/en/latest/bp_library/#static

static get_traffic_lights() Dict[carla.TrafficLight, carla.Transform][source]
Return type:

Dict[carla.TrafficLight, carla.Transform]

static get_trafficlight_trigger_waypoint(traffic_light: carla.TrafficLight) carla.Waypoint[source]

Get the location where the traffic light is triggered.

Parameters:

traffic_light (carla.TrafficLight)

Return type:

carla.Waypoint

static global_tick(frame: int | None = None) None[source]

Update global information that is constant for the current tick and not agent specific.

Updates:
Parameters:

frame (Optional[int]) – The id of the current frame. If None retrieves the id from the current carla.WorldSnapshot. Multiple calls with the same frame are ignored. (default: None)

Return type:

None

static get_vehicles() List[carla.Vehicle][source]
Return type:

List[carla.Vehicle]

static get_walkers() List[carla.Walker][source]
Return type:

List[carla.Walker]

static cleanup() None[source]

Resets the global information.

Return type:

None

classes.rule module

class classes.rule.Context(*args: Any, **kwargs: Any)[source]

Bases: CarlaDataProvider

Object to be passed as the first argument (instead of self) to rules, actions and evaluation functions.

The Context class derives from the scenario runner’s CarlaDataProvider to allow access to the world, map, etc.

Tip

There is normally no need to initialize the context object manually. Its recommended to initialize the context object with LunaticAgent._make_context().

Parameters:
prior_result: Any | None

Result of the current phase.

last_context: 'Context' | None

The context object of the last tick. Used to access the last phase’s results.

second_pass: bool | None = None

Whether or not the run_step function performs a second pass, i.e. after the route has been replanned.

Warning

The correctness should not be assumed. The user is responsible for setting this value to True if a second pass is required.

PHASE_NOT_EXECUTED: object

Value in phase_results to indicate that agent.execute_phase(phase) was called for the respective phase

agent: LunaticAgent

Backreference to the agent.

phase_results: dict[Phase, Any]

Stores the results of the phases the agent has been in. By default the keys are set to Context.PHASE_NOT_EXECUTED.

config: ContextSettings

A copy of the agents config. Overwritten by the condition’s settings.

detected_hazards_info: dict[Hazard, HazardSeverity | Any]

Information about the detected hazards. add_hazard() inserts the given HazardSeverity as value, however, note that the values are can be arbitrary if used otherwise.

evaluation_results: dict[Phase, Hashable]

Stores the result from the Rule.condition() of the last rule that was evaluated in a phase.

Deprecated since version in: consideration

action_results: dict[Phase, Any]

Stores the result from the action() of the last rule that was applicable in a phase.

Deprecated since version in: consideration

property current_phase: Phase

Current Phase the agent is in.

property control: carla.VehicleControl | None

Current control the agent should use. Set by execute_phase(update_controls=...).

Note

Safeguarded to be not set to None. Setting it to None is discouraged. Use set_control() if setting it to None is really needed.

set_control(control: carla.VehicleControl | None)[source]

Set the control, allows to set it to None.

Parameters:

control (Optional[carla.VehicleControl])

get_or_calculate_control() carla.VehicleControl[source]

Get the control if it is set, otherwise calculate it by executing the local planner.

Returns:

The control the agent should use.

Return type:

carla.VehicleControl

Note

Use this function inside rules to acquire a control object-

Warning

This is equivalent to ending the inner step of the agent.

See also

LunaticAgent._calculate_control()

property detected_hazards: Set[Hazard]

Detected hazards in the current phase.

If not empty at the end of the inner step an EmergencyStopException is raised.

add_hazard(hazard: Hazard, hazard_level: HazardSeverity = HazardSeverity.EMERGENCY)[source]

Add the specified hazard to the detected hazards, in parallel the hazard_level can be set which which is stored in detected_hazards_info.

Parameters:
discard_hazard(hazard: Hazard, match: Literal['exact', 'subset', 'intersection'] = 'subset')[source]

Discards a hazard from the detected hazards.

Parameters:
  • hazard (Hazard) – Hazard to remove from detected_hazards.

  • match (Literal['exact', 'subset', 'intersection']) –

    How to match the hazard to remove.

    • ”exact” removes if the exact Hazard flag is present.

    • ”subset” removes if the hazard is a subset of the detected hazard, e.g.:

      • discard_hazard(Hazard.VEHICLE, match="subset") would remove Hazard.OBSTACLE = Hazard.VEHICLE | PEDESTRIAN | STATIC_OBSTACLE.

      • discard_hazard(Hazard.TRAFFIC_LIGHT, match="subset") would not remove Hazard.TRAFFIC_LIGHT_RED.

has_hazard(hazard: Hazard, match: Literal['exact', 'subset', 'intersection'] = 'intersection') bool[source]

Checks if the hazard intersects with any of the detected hazards.

See discard_hazard() for the different matching options.

Parameters:
  • hazard (Hazard)

  • match (Literal['exact', 'subset', 'intersection'])

Return type:

bool

max_detection_distance(lane: Literal['same_lane', 'other_lane', 'overtaking', 'tailgating']) float

Convenience function to be used with lunatic_agent_tools.detect_vehicles() and LunaticAgent.detect_obstacles_in_path.

The max distance to consider an obstacle is calculated as:

max(obstacles.min_proximity_threshold,
    live_info.current_speed_limit / obstacles.speed_detection_downscale.[same|other]_lane)
Parameters:
  • self (HasConfig['BehaviorAgentSettings | LunaticAgentSettings']) – An object that implements the config and live_info attributes

  • lane (Literal['same_lane', 'other_lane', 'overtaking', 'tailgating']) – The lane to consider.

Return type:

float

Note

lane must be a key in BehaviorAgentObstacleSettings.SpeedLimitDetectionDownscale.

property live_info: LiveInfo
property active_blocking_rules: Set[BlockingRule]
__call__(*args: Any, **kwargs: Any) Any

Call self as a function.

Parameters:
Return type:

Any

classes.rule.always_execute()[source]

This is an ConditionFunction that always returns True. It can be used to always execute an action.

class classes.rule.Rule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]

Bases: _GroupRule

Parameters:
_auto_init_: ClassVar[bool] = True

If set to False the automatic __init__ creation is disabled when subclassing. This automatic __init__ will fix parameters like phases and condition to the class.

Declaring an __init__ method in the class has the same effect as setting _auto_init_ to False.

Note

Using class NewRuleType(metarule=Rule) is nearly equivalent to _auto_init_=False, but is not inherited.

NOT_APPLICABLE: ClassVar[Literal[RuleResult.NOT_APPLICABLE]]

Unique object RuleResult.NOT_APPLICABLE that indicates that no action was executed.

Deprecated since version Use: RuleResult.NOT_APPLICABLE directly

NO_RESULT: ClassVar[Literal[RuleResult.NO_RESULT]]

Unique object RuleResult.NO_RESULT that indicates that the rules action() did not return a result, e.g. because an exception was raised.

Deprecated since version Use: RuleResult.NOT_APPLICABLE directly

phase: Phase

For the Class API the phase attribute be set to a single Phase object.

false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is False. May not be set if actions is set.

action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is True. If actions is set, this is ignored.

clone()[source]

Create a new instance of the rule with the same settings.

Note

  • The current cooldown is not taken into account.

  • The current enabled state is taken into account.

class CooldownFramework

Bases: object

Context manager that can reduce all cooldowns at the end of a with statement.

static tick()

Update all cooldowns and unblock all rules.

Calls:
DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0

Value the cooldown is reset to when reset_cooldown() is called without a value.

Used only when cooldown_reset_value is not set.

blocked: bool = False

Indicates if the rule is blocked for this tick only. Is reset to False after the tick.

property cooldown: int

Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.

property enabled: bool

If False the rule will not be evaluated. Contrary to blocked this permanently disables the rule.

group: str | None = None

Group name of rules that should share their cooldown.

None for a rule to not share its cooldown.

property has_group: bool

Indicates if the rule belongs to a group, i.e. self.group is not None.

is_ready() bool

Group aware check if a rule is ready.

Return type:

bool

reset_cooldown(value: int | None = None)

Reset or set the cooldown; for a group rule it resets the group cooldown.

Parameters:

value (Optional[int])

set_active(value: bool)

Enables or disables the rule. Contrary to blocked it will not be reset after the tick.

Parameters:

value (bool)

classmethod set_cooldown_of_group(group: str, value: int)

Updates the cooldown of the specified group to a specific value

Parameters:
set_my_group_cooldown(value: int | None = None)

Update the cooldown of the group this rule belong to

Parameters:

value (Optional[int])

start_cooldown: ClassVar[int] = 0

Initial cooldown when initialized. if >0 the rule will not be ready for the first start_cooldown ticks.

classmethod unblock_all_rules()

Unblocks all rules

classmethod update_all_cooldowns()

Globally updates the cooldown of all rules.

phases: frozenset[Phase]

The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [Phase].

condition: Any

The condition that determines if the rule’s actions should be executed.

Simple Variant:

return True if the action should be executed, False otherwise. if Rule.false_action is defined, False will execute Rule.false_action.

Advanced Variant:

return a Hashable value that is used as key in the actions dict.

actions: dict[Any, CallableActionT]

Dictionary that maps rule results to the action that should be executed.

description: str

Description of what this rule should do

priority: float | int | RulePriority = 4

Rules are executed in order of their priority, from high to low.

overwrite_settings: dict[str, Any]

Settings that should overwrite the agent’s settings for this rule.

Note

The overwrite settings are primitive dict objects, omegaconf.DictConfig objects are converted.

self_config: RuleConfig

A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.

Can be accessed via ctx.config.current_rule or self.config.self.

Note

Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.

Attention

The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.

execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)[source]

Attention

Helper function to execute a phase from within a rule, wrapper of agents.lunatic_agent.LunaticAgent.execute_phase().

Warns:

ReferenceError – If the weak proxy pointing to the Context object has been deleted. The phase will not be executed.

Parameters:
evaluate_children(ctx: Context) NoReturn[source]

Not implemented for this rule class.

Note

This is an interface function of meta rules that is executed during __call__() to call further rules.

Parameters:

ctx (Context)

Return type:

NoReturn

__call__(ctx: Context, overwrite: Dict[str, Any] | None = None, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE][source]
  1. First checks if the rule is applicable, i.e. is its cooldown == 0, if not returns NOT_APPLICABLE.

  2. Afterwards evaluates the rules condition() function.
Parameters:
  • ctx (Context) – The context object that is passed to the condition function.

  • overwrite (Optional[Dict[str, Any]]) – Extends overwrite_settings for this call only. Defaults to None.

  • ignore_phase (bool) – If True the phase check is skipped. Defaults to False.

  • ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to False.

Return type:

Union[Any, Literal[RuleResult.NOT_APPLICABLE]]

class classes.rule.BlockingRule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]

Bases: Rule

This meta rule allows to define rules that are able to takeover the agent’s workflow and apply the VehicleControl directly from withhin the rule.

Parameters:
DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0

Value the cooldown is reset to when reset_cooldown() is called without a value.

Used only when cooldown_reset_value is not set.

_auto_init_: ClassVar[bool] = True

If set to False the automatic __init__ creation is disabled when subclassing. This automatic __init__ will fix parameters like phases and condition to the class.

Declaring an __init__ method in the class has the same effect as setting _auto_init_ to False.

Note

Using class NewRuleType(metarule=Rule) is nearly equivalent to _auto_init_=False, but is not inherited.

blocked: bool = False

Indicates if the rule is blocked for this tick only. Is reset to False after the tick.

property cooldown: int

Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.

property enabled: bool

If False the rule will not be evaluated. Contrary to blocked this permanently disables the rule.

evaluate_children(ctx: Context) NoReturn

Not implemented for this rule class.

Note

This is an interface function of meta rules that is executed during __call__() to call further rules.

Parameters:

ctx (Context)

Return type:

NoReturn

execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)

Attention

Helper function to execute a phase from within a rule, wrapper of agents.lunatic_agent.LunaticAgent.execute_phase().

Warns:

ReferenceError – If the weak proxy pointing to the Context object has been deleted. The phase will not be executed.

Parameters:
group: str | None = None

Group name of rules that should share their cooldown.

None for a rule to not share its cooldown.

property has_group: bool

Indicates if the rule belongs to a group, i.e. self.group is not None.

is_ready() bool

Group aware check if a rule is ready.

Return type:

bool

priority: float | int | RulePriority = 4

Rules are executed in order of their priority, from high to low.

reset_cooldown(value: int | None = None)

Reset or set the cooldown; for a group rule it resets the group cooldown.

Parameters:

value (Optional[int])

set_active(value: bool)

Enables or disables the rule. Contrary to blocked it will not be reset after the tick.

Parameters:

value (bool)

set_my_group_cooldown(value: int | None = None)

Update the cooldown of the group this rule belong to

Parameters:

value (Optional[int])

start_cooldown: ClassVar[int] = 0

Initial cooldown when initialized. if >0 the rule will not be ready for the first start_cooldown ticks.

description: str

Description of what this rule should do

phases: frozenset[Phase]

The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [Phase].

phase: Phase

For the Class API the phase attribute be set to a single Phase object.

actions: dict[Any, CallableActionT]

Dictionary that maps rule results to the action that should be executed.

action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is True. If actions is set, this is ignored.

false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is False. May not be set if actions is set.

overwrite_settings: dict[str, Any]

Settings that should overwrite the agent’s settings for this rule.

Note

The overwrite settings are primitive dict objects, omegaconf.DictConfig objects are converted.

self_config: RuleConfig

A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.

Can be accessed via ctx.config.current_rule or self.config.self.

Note

Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.

Attention

The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.

condition: Any

The condition that determines if the rule’s actions should be executed.

Simple Variant:

return True if the action should be executed, False otherwise. if Rule.false_action is defined, False will execute Rule.false_action.

Advanced Variant:

return a Hashable value that is used as key in the actions dict.

MAX_TICKS = 5000

The amount of ticks that can be performed by this rule before it is automatically disabled. If the rule has looped for this amount of ticks if will then call max_tick_callback and raise an UnblockRuleException afterwards.

As a hack max_tick_callback can change ticks_passed to prevent the exception and continue the rule.

max_tick_callback: Callable[[Self, Context], Any] | None = None

An optional callback that is executed when ticks_passed reaches MAX_TICKS.

ticks_passed: int

Count how many ticks have been performed by this rule and blocked the agent.

loop_agent(ctx: Context, control: carla.VehicleControl | None = None, *, execute_planner: bool, execute_phases: Any = True) carla.VehicleControl | None[source]

A combination of LunaticAgent.parse_keyboard_input, LunaticAgent.apply_control, BlockingRule.update_world, and Context.get_or_calculate_control to advance agent and world.

Parameters:
  • ctx (Context) – The current context object

  • control (Optional[carla.VehicleControl], optional) – The control to apply; will overwrite the context’s control. If None takes the context’s control. Defaults to None.

  • execute_planner (bool)

  • execute_phases (Any)

Return type:

carla.VehicleControl | None

See also

Executes the following methods:

  1. LunaticAgent.parse_keyboard_input()

  2. LunaticAgent.apply_control()

  3. BlockingRule.update_world() ticks the carla.World and renders everything.

  4. Context.get_or_calculate_control() to acquire the carla.VehicleControl object.

static get_world() carla.World[source]

Method to access the world object

Return type:

carla.World

update_world(ctx: Context, *, execute_phases: bool | Container[Phase] = True) carla.VehicleControl | None[source]

Ticks the world and takes care of the rendering.

Will call
  • ctx.agent.execute_phase(Phase.CUSTOM_CYCLE | Phase.BEGIN, prior_results=<this Rule instance>)

  • LunaticAgent.update_information(), with or without executing the phases, depending on execute_phases.

Parameters:
  • ctx (Context) – The context to use

  • execute_update_information – Whether to execute the Phase.UPDATE_INFORMATION | Phase.[BEGIN|END] with this function. Can also be a container containing one of both of these phases. Defaults to True.

  • execute_phases (Union[bool, Container[Phase]])

Raises:

UnblockRuleException – If the ticks passed are over MAX_TICKS

Return type:

carla.VehicleControl | None

Attention

The usage with Leaderboard is working but experimental. The scenario expects the agent to return a control object in every step, however as this rule takes over the ticks completely an outside ScenarioManager might not work as expected.

__call__(ctx: Context, overwrite: Dict[str, Any] | None = None, in_loop: bool = False, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE][source]
  1. First checks if the rule is applicable, i.e. is its cooldown == 0, if not returns NOT_APPLICABLE.

  2. Afterwards evaluates the rules condition() function.
    • if the result is not in actions returns NOT_APPLICABLE.

    • otherwise merges the overwrite_settings with the py:attr:.Context.config and executes the action.

Parameters:
  • ctx (Context) – The context object that is passed to the condition function.

  • overwrite (Optional[Dict[str, Any]]) – Extends overwrite_settings for this call only. Defaults to None.

  • ignore_phase (bool) – If True the phase check is skipped. Defaults to False.

  • ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to False.

  • in_loop (bool)

Return type:

Union[Any, Literal[RuleResult.NOT_APPLICABLE]]

class classes.rule.MultiRule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]

Bases: Rule

This metarule allows to execute one or multiple rules if it is applicable. Depending on execute_all_rules it will either execute all rules or only the first applicable rule from its rules list.

Parameters:
DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0

Value the cooldown is reset to when reset_cooldown() is called without a value.

Used only when cooldown_reset_value is not set.

__call__(ctx: Context, overwrite: Dict[str, Any] | None = None, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE]
  1. First checks if the rule is applicable, i.e. is its cooldown == 0, if not returns NOT_APPLICABLE.

  2. Afterwards evaluates the rules condition() function.
    • if the result is not in actions returns NOT_APPLICABLE.

    • otherwise merges the overwrite_settings with the py:attr:.Context.config and executes the action.

Parameters:
  • ctx (Context) – The context object that is passed to the condition function.

  • overwrite (Optional[Dict[str, Any]]) – Extends overwrite_settings for this call only. Defaults to None.

  • ignore_phase (bool) – If True the phase check is skipped. Defaults to False.

  • ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to False.

Return type:

Union[Any, Literal[RuleResult.NOT_APPLICABLE]]

_auto_init_: ClassVar[bool] = True

If set to False the automatic __init__ creation is disabled when subclassing. This automatic __init__ will fix parameters like phases and condition to the class.

Declaring an __init__ method in the class has the same effect as setting _auto_init_ to False.

Note

Using class NewRuleType(metarule=Rule) is nearly equivalent to _auto_init_=False, but is not inherited.

blocked: bool = False

Indicates if the rule is blocked for this tick only. Is reset to False after the tick.

property cooldown: int

Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.

property enabled: bool

If False the rule will not be evaluated. Contrary to blocked this permanently disables the rule.

execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)

Attention

Helper function to execute a phase from within a rule, wrapper of agents.lunatic_agent.LunaticAgent.execute_phase().

Warns:

ReferenceError – If the weak proxy pointing to the Context object has been deleted. The phase will not be executed.

Parameters:
group: str | None = None

Group name of rules that should share their cooldown.

None for a rule to not share its cooldown.

property has_group: bool

Indicates if the rule belongs to a group, i.e. self.group is not None.

is_ready() bool

Group aware check if a rule is ready.

Return type:

bool

priority: float | int | RulePriority = 4

Rules are executed in order of their priority, from high to low.

reset_cooldown(value: int | None = None)

Reset or set the cooldown; for a group rule it resets the group cooldown.

Parameters:

value (Optional[int])

set_active(value: bool)

Enables or disables the rule. Contrary to blocked it will not be reset after the tick.

Parameters:

value (bool)

set_my_group_cooldown(value: int | None = None)

Update the cooldown of the group this rule belong to

Parameters:

value (Optional[int])

start_cooldown: ClassVar[int] = 0

Initial cooldown when initialized. if >0 the rule will not be ready for the first start_cooldown ticks.

description: str

Description of what this rule should do

phases: frozenset[Phase]

The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [Phase].

phase: Phase

For the Class API the phase attribute be set to a single Phase object.

actions: dict[Any, CallableActionT]

Dictionary that maps rule results to the action that should be executed.

action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is True. If actions is set, this is ignored.

false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is False. May not be set if actions is set.

overwrite_settings: dict[str, Any]

Settings that should overwrite the agent’s settings for this rule.

Note

The overwrite settings are primitive dict objects, omegaconf.DictConfig objects are converted.

self_config: RuleConfig

A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.

Can be accessed via ctx.config.current_rule or self.config.self.

Note

Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.

Attention

The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.

condition: Any

The condition that determines if the rule’s actions should be executed.

Simple Variant:

return True if the action should be executed, False otherwise. if Rule.false_action is defined, False will execute Rule.false_action.

Advanced Variant:

return a Hashable value that is used as key in the actions dict.

rules: List[Rule]

The list of child rules to be called if this rule’s condition is true.

evaluate_children(ctx: classes.rule.Context) List[Any] | Any[source]

Evaluates the children rules of the current rule in the given context.

Parameters:

ctx (classes.rule.Context) – The context in which the child rules are evaluated.

Returns:

The results of evaluating the children rules. Returns a list of results if execute_all_rules is True, otherwise the result of the first rule that was applied.

Return type:

List[Any] | Any

class classes.rule.RandomRule(phases: Phase | Iterable[Phase] | 'type[Any]' | Self | str | None | None = None, bases: tuple[type[Any], ...] | None = None, clsdict: dict[str, Any] | None = None, **kwargs: Any)[source]

Bases: MultiRule

A rule that selects and evaluates one or more random child rules from a set of rules.

Parameters:
  • phases (Union[Phase, Iterable[Phase]]) – The phase or phases in which the rule is applicable.

  • rules (Union[Dict[Rule, float], List[Rule]]) – The set of rules from which to select random child rules.

  • repeat_if_not_applicable (bool) – If False, only one rule will be evaluated even if it is not applicable. Defaults to True.

  • condition (Optional[Callable[[Context], Any]]) – A callable that determines if the rule is applicable in a given context. If None and the rule does not implement a condition attribute the rule always executes. Defaults to None.

  • action (Optional[Callable[[Context], Any]]) – A callable that defines the action to be performed when the rule is applicable. Defaults to None.

  • ignore_phase (bool) – If True, the rule will be evaluated even if it is not in the specified phase. Defaults to True.

  • priority (RulePriority) – The priority of the rule. RulePriority.NORMAL.

  • description (str) – A description of the rule. Defaults to "If its own condition is true calls one or more random child rules from the passed rules.".

  • overwrite_settings (Optional[Dict[str, Any]]) – A dictionary of settings to overwrite the default settings of the rule. Defaults to None.

  • cooldown_reset_value (Optional[int]) – The value to reset the cooldown of the rule. Defaults to None.

  • group (Optional[str]) – The group to which the rule belongs. Defaults to None.

  • enabled (bool) – If False, the rule will not be evaluated. Defaults to True.

  • weights (Optional[List[float]]) – The weights associated with each rule when selecting random child rules. Defaults to None.

  • self_config (Optional[Dict[str, Any]])

Raises:

ValueError – When passing rules as a dict with weights, the weights argument must be None.

DEFAULT_COOLDOWN_RESET: ClassVar[int] = 0

Value the cooldown is reset to when reset_cooldown() is called without a value.

Used only when cooldown_reset_value is not set.

__call__(ctx: Context, overwrite: Dict[str, Any] | None = None, *, ignore_phase: bool = False, ignore_cooldown: bool = False) Any | Literal[RuleResult.NOT_APPLICABLE]
  1. First checks if the rule is applicable, i.e. is its cooldown == 0, if not returns NOT_APPLICABLE.

  2. Afterwards evaluates the rules condition() function.
    • if the result is not in actions returns NOT_APPLICABLE.

    • otherwise merges the overwrite_settings with the py:attr:.Context.config and executes the action.

Parameters:
  • ctx (Context) – The context object that is passed to the condition function.

  • overwrite (Optional[Dict[str, Any]]) – Extends overwrite_settings for this call only. Defaults to None.

  • ignore_phase (bool) – If True the phase check is skipped. Defaults to False.

  • ignore_cooldown (bool) – If True the cooldown check is skipped. Defaults to False.

Return type:

Union[Any, Literal[RuleResult.NOT_APPLICABLE]]

_auto_init_: ClassVar[bool] = True

If set to False the automatic __init__ creation is disabled when subclassing. This automatic __init__ will fix parameters like phases and condition to the class.

Declaring an __init__ method in the class has the same effect as setting _auto_init_ to False.

Note

Using class NewRuleType(metarule=Rule) is nearly equivalent to _auto_init_=False, but is not inherited.

blocked: bool = False

Indicates if the rule is blocked for this tick only. Is reset to False after the tick.

property cooldown: int

Cooldown of the rule in ticks until it can be executed again after its action was executed. If 0 the rule is ready to be executed.

property enabled: bool

If False the rule will not be evaluated. Contrary to blocked this permanently disables the rule.

execute_phase(phase: Phase, *, prior_results: Any = None, update_controls: carla.VehicleControl | None = None)

Attention

Helper function to execute a phase from within a rule, wrapper of agents.lunatic_agent.LunaticAgent.execute_phase().

Warns:

ReferenceError – If the weak proxy pointing to the Context object has been deleted. The phase will not be executed.

Parameters:
group: str | None = None

Group name of rules that should share their cooldown.

None for a rule to not share its cooldown.

property has_group: bool

Indicates if the rule belongs to a group, i.e. self.group is not None.

is_ready() bool

Group aware check if a rule is ready.

Return type:

bool

priority: float | int | RulePriority = 4

Rules are executed in order of their priority, from high to low.

reset_cooldown(value: int | None = None)

Reset or set the cooldown; for a group rule it resets the group cooldown.

Parameters:

value (Optional[int])

set_active(value: bool)

Enables or disables the rule. Contrary to blocked it will not be reset after the tick.

Parameters:

value (bool)

set_my_group_cooldown(value: int | None = None)

Update the cooldown of the group this rule belong to

Parameters:

value (Optional[int])

start_cooldown: ClassVar[int] = 0

Initial cooldown when initialized. if >0 the rule will not be ready for the first start_cooldown ticks.

description: str

Description of what this rule should do

phases: frozenset[Phase]

The phase or phases in which the rule should be evaluated. For instantiation the phases attribute can be any Iterable [Phase].

phase: Phase

For the Class API the phase attribute be set to a single Phase object.

condition: Any

The condition that determines if the rule’s actions should be executed.

Simple Variant:

return True if the action should be executed, False otherwise. if Rule.false_action is defined, False will execute Rule.false_action.

Advanced Variant:

return a Hashable value that is used as key in the actions dict.

actions: dict[Any, CallableActionT]

Dictionary that maps rule results to the action that should be executed.

action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is True. If actions is set, this is ignored.

false_action: Annotated[CallableActionT, 'attribute not available on instance -> merged into `actions`']

Action that should be executed if the rule is False. May not be set if actions is set.

overwrite_settings: dict[str, Any]

Settings that should overwrite the agent’s settings for this rule.

Note

The overwrite settings are primitive dict objects, omegaconf.DictConfig objects are converted.

self_config: RuleConfig

A custom sub-config for the rule that is not included in the agents settings. Automatically gets a instance key added with the rule instance.

Can be accessed via ctx.config.current_rule or self.config.self.

Note

Internally self.config and ctx.config is the same object, which makes interpolations to the agent’s settings possible.

Attention

The self_config object is *not* constant it is recreated each time the rule is evaluated to have the current context available.

rules: List[Rule]

The list of child rules to be called if this rule’s condition is true.

evaluate_children(ctx: Context, overwrite: Dict[str, Any] | None = None) Any[source]

Evaluate a random child rule. If self.repeat_if_not_applicable=False and the randomly chosen rule is not applicable, then no further rules are evaluated For self.repeat_if_not_applicable=False possible rules are evaluated in a random fashion until one rule was applicable.

Parameters:
Return type:

Any

classes.type_protocols module

Helper library to define type protocols for classes and functions in the project.

Note

That some types in the documentation have been simplified, e.g. some generics have been omitted.

class classes.type_protocols.AgentConfigT

A typing.TypeVar: for a AgentConfig type.

alias of TypeVar(‘AgentConfigT’, bound=AgentConfig)

has_default()
class classes.type_protocols.RuleT

A typing.TypeVar: for a Rule type.

alias of TypeVar(‘RuleT’, bound=Rule)

has_default()
class classes.type_protocols.CallableT

A typing.TypeVar: for a any callable.

alias of TypeVar(‘CallableT’, bound=Callable[[…], Any])

has_default()
classes.type_protocols.CallableCondition

A generic type alias for a callable condition function to be used with a ConditionFunction. Its first arguments must accept a Rule and a Context, or only a Context, additional keyword arguments are allowed. It must return a hashable value.

alias of Callable[[Concatenate[RuleT, Context, _CP]], _CH] | Callable[[Concatenate[Context, _CP]], _CH]

class classes.type_protocols.CallableConditionT

typing.TypeVar variant of AnyCallableCondition.

alias of TypeVar(‘CallableConditionT’, bound=Callable[[Concatenate[RuleT, Context, …]], Hashable] | Callable[[Concatenate[Context, …]], Hashable])

has_default()
classes.type_protocols.AnyCallableCondition

Non generic variant of CallableCondition, can use used as typing.TypeAlias.

alias of Callable[[Concatenate[RuleT, Context, …]], Hashable] | Callable[[Concatenate[Context, …]], Hashable]

classes.type_protocols.ConditionFunctionLike = ConditionFunctionLike

Callable that can be used for Rule.condition. A callable that uses a Context object as a single argument, or alternatively a Rule and a Context object (in this order).

The function must return a Hashable value.

class classes.type_protocols.ConditionFunctionLikeT

TypeVar version of ConditionFunctionLike

alias of TypeVar(‘ConditionFunctionLikeT’, bound=ConditionFunctionLike[Rule, …, Hashable])

has_default()
classes.type_protocols.AnyConditionFunctionLike = AnyConditionFunctionLike

A generic type alias for a callable condition function to be used with a Rule. Its first arguments must accept a Rule and a Context, or only a Context, additional keyword arguments are allowed. It must return a hashable value.

classes.type_protocols.CallableAction

A generic type alias for a callable action function to be used with a Rule or ConditionFunction.register_action(). Its first arguments must accept a Rule and a Context, or only a Context, additional keyword arguments are allowed. It can return an arbitrary value.

alias of Callable[[Concatenate[RuleT, Context, _P]], _T] | Callable[[Concatenate[Context, _P]], _T]

class classes.type_protocols.CallableActionT

typing.TypeVar variant of AnyCallableAction.

alias of TypeVar(‘CallableActionT’, bound=Callable[[Concatenate[RuleT, Context, …]], Any] | Callable[[Concatenate[Context, …]], Any])

has_default()
classes.type_protocols.AnyCallableAction

Non generic variant of CallableAction, can use used as typing.TypeAlias.

alias of Callable[[Concatenate[RuleT, Context, …]], Any] | Callable[[Concatenate[Context, …]], Any]

class classes.type_protocols.HasBaseSettings(*args, **kwargs)[source]

Bases: Protocol[AgentConfigT]

BASE_SETTINGS: type[AgentConfigT]
class classes.type_protocols.HasConfig(*args, **kwargs)[source]

Bases: Protocol[AgentConfigT]

property config: AgentConfigT

read-only attribute of a AgentConfig object; can also be a normal attribute.

class classes.type_protocols.HasContext(*args, **kwargs)[source]

Bases: Protocol

ctx: Context
class classes.type_protocols.Has_WorldModel(*args, **kwargs)[source]

Bases: Protocol

_world_model: WorldModel
class classes.type_protocols.HasStates(*args, **kwargs)[source]

Bases: Protocol

current_states: dict['AgentState', int]
class classes.type_protocols.Has_Vehicle(*args, **kwargs)[source]

Bases: Protocol

_vehicle: carla.Vehicle
class classes.type_protocols.HasPlanner(*args, **kwargs)[source]

Bases: Protocol[LocalPlannerT]

Uses a Local planner to calculate controls

property _local_planner: LocalPlannerT

read-only attribute for a LocalPlanner object; can also be a normal attribute.

_calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl[source]
Parameters:

debug (bool)

Return type:

carla.VehicleControl

class classes.type_protocols.HasPlannerWithConfig(*args, **kwargs)[source]

Bases: HasPlanner[DynamicLocalPlanner], HasConfig[AgentConfigT], Protocol

Uses a DynamicLocalPlanner that works with a AgentConfig

_calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
Parameters:

debug (bool)

Return type:

carla.VehicleControl

property _local_planner: LocalPlannerT

read-only attribute for a LocalPlanner object; can also be a normal attribute.

property config: AgentConfigT

read-only attribute of a AgentConfig object; can also be a normal attribute.

class classes.type_protocols.UseableWithDynamicPlanner(*args, **kwargs)[source]

Bases: HasPlannerWithConfig, Has_Vehicle, HasContext, Protocol

Can be used with DynamicLocalPlanner.

_calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
Parameters:

debug (bool)

Return type:

carla.VehicleControl

property _local_planner: LocalPlannerT

read-only attribute for a LocalPlanner object; can also be a normal attribute.

property config: AgentConfigT

read-only attribute of a AgentConfig object; can also be a normal attribute.

_vehicle: carla.Vehicle
ctx: Context
class classes.type_protocols.CanDetectObstacles(*args, **kwargs)[source]

Bases: Has_Vehicle, HasPlanner, HasConfig[BehaviorAgentSettings | LunaticAgentSettings], Protocol

Can be used with lunatic_agent_tools.detect_obstacles().

_calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
Parameters:

debug (bool)

Return type:

carla.VehicleControl

property _local_planner: LocalPlannerT

read-only attribute for a LocalPlanner object; can also be a normal attribute.

property config: AgentConfigT

read-only attribute of a AgentConfig object; can also be a normal attribute.

_vehicle: carla.Vehicle
class classes.type_protocols.CanDetectNearbyObstacles(*args, **kwargs)[source]

Bases: CanDetectObstacles, Protocol

Can be used with lunatic_agent_tools.detect_obstacles_in_path().

_calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
Parameters:

debug (bool)

Return type:

carla.VehicleControl

property _local_planner: LocalPlannerT

read-only attribute for a LocalPlanner object; can also be a normal attribute.

property config: AgentConfigT

read-only attribute of a AgentConfig object; can also be a normal attribute.

_vehicle: carla.Vehicle
all_obstacles_nearby: list[carla.Actor]

Actors that are considered to be near the actor.

max_detection_distance(lane: Literal['same_lane', 'other_lane']) float[source]

Convenience function to be used with lunatic_agent_tools.detect_vehicles() and LunaticAgent.detect_obstacles_in_path.

The max distance to consider an obstacle in the same lane or in another lane, obstacles further away will be ignored.

Parameters:
  • self – An object that implements the config and live_info attributes

  • lane (Literal['same_lane', 'other_lane']) – The lane to consider.

Return type:

float

class classes.type_protocols.CanDetectNearbyTrafficLights(*args, **kwargs)[source]

Bases: CanDetectObstacles, HasStates, Has_WorldModel, Protocol

Can be used with lunatic_agent_tools.detect_obstacles_in_path().

_calculate_control(debug: bool = False, *args, **kwargs) carla.VehicleControl
Parameters:

debug (bool)

Return type:

carla.VehicleControl

property _local_planner: LocalPlannerT

read-only attribute for a LocalPlanner object; can also be a normal attribute.

property config: AgentConfigT

read-only attribute of a AgentConfig object; can also be a normal attribute.

_vehicle: carla.Vehicle
current_states: dict['AgentState', int]
_world_model: WorldModel
traffic_lights_nearby: list[carla.TrafficLight]

Actors that are considered to be near the actor.

_last_traffic_light: carla.TrafficLight | None

Last traffic light that was detected. At a red traffic light it can be checked if this is still red.

Attention

Not necessarily the same as InformationManager.Information.relevant_traffic_light.

_current_waypoint: carla.Waypoint

classes.worldmodel module

Interface classes between CARLA, the agent, and the user interface.

class classes.worldmodel.AccessCarlaMixin[source]

Bases: object

Mixin class that delegates the attributes client, map, and world to the CarlaDataProvider to keep them in sync.

Note

This mixin only works for instances, they are not class attributes.

property client: carla.Client
property world: carla.World
property map: carla.Map
static get_blueprint_library() carla.BlueprintLibrary[source]

Access to a cached version of the blueprint library

Raises:

ValueError – If the blueprint library is not set, which happens when the world is not set. The world must be setup before (CarlaDataProvider.set_world()).

Return type:

carla.BlueprintLibrary

class classes.worldmodel.GameFramework(*args: Any, **kwargs: Any)[source]

Bases: AccessCarlaMixin, CarlaDataProvider

A utility class to setup CARLA, pygame, Hydra, the configuration and parts of the user interface and the agent.

A GameFramework instance can be used to control the game loop and work as a handler for the exceptions of this project. Furthermore can it manage the cooldown of the Rule classes.

Note

The GameFramework derives from the CarlaDataProvider which gives this class more utility methods that currently are not included in this part of the documentation.

Parameters:
clock: ClassVar[pygame.time.Clock | None] = None
display: ClassVar[pygame.Surface | Literal[False] | None] = None

The pygame surface to render on.

Is None before the initialization. False if pygame is disabled.

property launch_config: LaunchConfig

LaunchConfig object that was used for the initialization (args)

property agent_config: LunaticAgentSettings

The configuration of the attached agent, if it exists otherwise the agent attribute of the stored launch_config.

classmethod quickstart(launch_config: LaunchConfig | None = None, *, logging: bool = False) Self[source]

Initializes Hydra in a limited way, i.e. does not allow for command line overrides.

Sets up the carla.Client and related instances as well as pygame.

Note

It is recommended that you use a @hydra.main decorated main function instead to make full use of the Hydra framework.

Parameters:
  • launch_config (Optional[LaunchConfig]) – The configuration to use. If None, will use the default configuration from ./conf/launch_config.yaml.

  • logging (bool) – If True, change the how logging is done by applying the logger settings from ./conf/config_extensions/job_logging.yaml. Default is False.

Returns:

The initialized GameFramework instance.

Return type:

Self

See also

This function uses:
static initialize_hydra(config_dir: str = './conf', config_name: str = 'launch_config', version_base=None, *, job_name='LunaticAgentJob', logging=True, structured=True) LaunchConfig[source]

Use this function only if no hydra.main is available.

Usage:

args = GameFramework.initialize_hydra(config_dir=<abs_path_of_conf>, config_name="launch_config")
game_framework = GameFramework(args)
Parameters:
  • config_dir (str) – The directory where the hydra configuration is stored.

  • config_name (str) – The name of the configuration file.

  • version_base – The version base of hydra for the configuration. Default is None.

  • job_name – The name of the job.

  • logging – If True, will set up logging.

  • structured – If True will create the config based on LaunchConfig, otherwise it will be based on a dict. This is useful for runtime type-checks and conversions. If the configs LaunchConfig.strict_config value is < 2, this parameter is ignored. Disable if you experience problems. Default is True.

Return type:

LaunchConfig

See also

Hydra functions:
  • hydra.initialize_config_dir()

  • hydra.compose()

static load_hydra_config(config_name: str = 'conf/launch_config') LaunchConfig[source]
Parameters:

config_name (str)

Return type:

LaunchConfig

__init__(args: LaunchConfig, config: DictConfig | None = None, timeout: float = 10.0, worker_threads: int = 0, *, map_layers=carla.MapLayer.All)[source]
Parameters:
  • args (LaunchConfig) – Configuration for the GameFramework.

  • config (Optional[DictConfig]) – Optional config for the agent (unused in this project)

  • timeout (float) – Timeout for the carla.Client.

  • worker_threads (int) – See carla.Client.

  • map_layers – See carla.Client.load_world()

controller: weakref.ProxyType[RSSKeyboardControl] | RSSKeyboardControl

Parser for keyboard events

traffic_manager: carla.TrafficManager | None = None
classmethod init_pygame(launch_config: LaunchConfig | None = None, recreate: bool = False) tuple[pygame.time.Clock, pygame.Surface | Literal[False] | None][source]
Parameters:
  • launch_config (Optional[LaunchConfig]) – Will use the width and height<.LaunchConfig.height> attributes of this object if set the :py:mod:`pygame windows size. Otherwise will use (1280, 720). Defaults to None.

  • recreate (bool) – If True, will reinitialize pygame a second time if this function is called.

Return type:

tuple[pygame.time.Clock, pygame.Surface | Literal[False] | None]

static init_carla(args: LaunchConfig | None = None, timeout: float | None = None, worker_threads: int = 0, *, map_layers: carla.MapLayer = carla.MapLayer.All) carla.WorldSettings[source]

Initializes the carla.Client and the connects it to the simulator.

Parameters:
Returns:

The settings of the loaded world.

Return type:

carla.WorldSettings

Note

This function has to be called before client, world or map can be accessed. Alternatively the client and CarlaDataProvider need to be setup in a different way beforehand.

See also

static setup_client_map_and_world(map_name: str = 'Town04', ip: str = '127.0.0.1', port: int = 2000, *, timeout: float = 10.0, worker_threads: int = 0, reload_world: bool = False, reset_settings: bool = True, map_layers: carla.MapLayer = carla.MapLayer.All, sync: bool | None = True, fps: int = 20) tuple[carla.Client, carla.World, carla.Map][source]

Loads the carla.Client, the carla.World, and the carla.Map.

This is a subfunction of init_carla() that can be used without a LaunchConfig.

See also

init_carla() carla.Client.set_timeout() carla.Client.reload_world()

Parameters:
Return type:

tuple[carla.Client, carla.World, carla.Map]

init_traffic_manager(port: int | None = None) carla.TrafficManager[source]

Returns an instance of the carla.TrafficManager related to the specified port. If it does not exist, this will be created.

See also

carla.Client.get_trafficmanager()

Parameters:

port (Optional[int]) – The port to use. If None, will use the port from CarlaDataProvider.get_traffic_manager_port(), which defaults to 8000.

Return type:

carla.TrafficManager

init_agent_and_interface(ego: carla.Vehicle | None, agent_class: type[LunaticAgent], config: LunaticAgentSettings | None = None, overwrites: dict[str, Any] | None = None) tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl][source]

Quick setup for the agent and the world model.

Among others this executes:
from agents.lunatic_agent import LunaticAgent, LunaticAgentSettings

ego = world.spawn_actor(world.get_blueprint_library().find("vehicle.audi.tt"))
agent, world_model, global_planner, controller = (
    game_framework.init_agent_and_interface(ego, LunaticAgent)
)
Parameters:
Return type:

tuple[LunaticAgent, WorldModel, GlobalRoutePlanner, RSSKeyboardControl]

make_world_model(config: LunaticAgentSettings, player: carla.Vehicle | None = None) WorldModel[source]

Creates a WorldModel with a backreference to the GameFramework.

Parameters:
Return type:

WorldModel

make_controller(world_model: WorldModel, controller_class: type[classes.type_protocols.ControllerClassT] = RSSKeyboardControl, **kwargs: Any) classes.type_protocols.ControllerClassT[source]

Creates a keyboard controller and attaches it to the world model.

Parameters:
  • world_model (WorldModel) – The world model to attach the controller to.

  • controller_class (type[classes.type_protocols.ControllerClassT]) – The controller class to instantiate. Defaults to RSSKeyboardControl.

  • **kwargs (Any) – Additional arguments to pass to the controller.

Return type:

classes.type_protocols.ControllerClassT

static hydra_initialized() bool[source]

Checks wether Hydra is initialized. This is normally only done when the @hydra.main() decorator is used.

Return type:

bool

static get_hydra_config(raw: Literal[False] = False) HydraConf[source]
static get_hydra_config(raw: Literal[True]) hydra.core.hydra_config.HydraConfig

Retrieves the Hydra configuration object.

Parameters:

raw – If True, returns the hydra.conf.HydraConf dataclass, otherwise the hydra.core.hydra_config.HydraConfig singleton. Default is False.

Raises:

ValueError – If the HydraConfig was not set up yet and raw=True.

parse_controller_events(final_controls: carla.VehicleControl | None)[source]

Parses the keyboard events with the controller.

Raises:

AttributeError – If neither control nor world_model.controller are set.

Parameters:

final_controls (Optional[carla.VehicleControl])

render_everything()[source]

Update render and hud

Note

This is the preferred method to update the world and render the camera.

static skip_rest_of_loop(message: str = 'GameFramework.end_loop') NoReturn[source]

Terminates the current iteration and exits the GameFramework by raising a ContinueLoopException.

Note

It is the users responsibility to manage the agent & local planner before calling this function, i.e. that the agent has a carla.VehicleControl set.

Raises:

ContinueLoopException – With the given message.

Parameters:

message (str)

Return type:

NoReturn

static destroy_actors(actors: Iterable[carla.Actor | CustomSensorInterface])[source]

Destroys the given actors and customs sensors implemented in this package.

Removes destroyed actors from the CarlaDataProvider actor pool.

Parameters:

actors (Iterable[carla.Actor | CustomSensorInterface])

classmethod cleanup(*, disable_sync: bool = True, quit_pygame: bool = True)[source]

Cleans up resources and actors.

Parameters:
  • disable_sync (bool) – If True, will disable synchronous mode. This will prevent the freezing of the Unreal Editor. Default is True.

  • quit_pygame (bool) – If True, will call pygame.quit(). Default is True.

Note

  • When called from an instance with an attached agent, the agent.destroy() method is called.

  • Otherwise will call CarlaDataProvider.cleanup().

exceptions

shortcut to exceptions module containing custom exceptions.

property client: carla.Client
static get_blueprint_library() carla.BlueprintLibrary

Access to a cached version of the blueprint library

Raises:

ValueError – If the blueprint library is not set, which happens when the world is not set. The world must be setup before (CarlaDataProvider.set_world()).

Return type:

carla.BlueprintLibrary

property map: carla.Map
property world: carla.World
class classes.worldmodel.WorldModel(*args: Any, **kwargs: Any)[source]

Bases: AccessCarlaMixin, CarlaDataProvider

Class representing the surrounding environment.

This class is the interface between the agent, the carla.World, the HUD, and the KeyboardControl. It handles ticking of the simulator and rendering of the pygame interface.

If LaunchConfig.externalActor is set, it will look for an actor with the role name LaunchConfig.rolename, if such an actor does not yet exist it will wait for its creation until the calling script continues.

Parameters:
controller: RSSKeyboardControl | weakref.ProxyType[RSSKeyboardControl] | None = None

func`weakref.proxy` as backreference. This is not a weakref object, when with gameframework(agent) is used.

Type:

Set when controller is created. Uses

Type:

py

__init__(config: LunaticAgentSettings, args: LaunchConfig | Mapping[str, Any] | 'os.PathLike[str]' | str = './conf/launch_config.yaml', agent: LunaticAgent | None = None, *, carla_world: carla.World | None = None, player: carla.Vehicle | None = None, map_inst: carla.Map | None = None)[source]

Constructor method

Parameters:
property world: carla.World
world_settings: carla.WorldSettings

Object containing data about the simulation such as synchrony or rendering mode.

property map: carla.Map
hud: HUD

The HUD that is managed.

world_tick_id: int | None

The ID of the callback tick event for the HUD.

sync: bool | None

Set from LaunchConfig.sync

external_actor: bool

Set from LaunchConfig.externalActor

actor_role_name: str | None

Set from LaunchConfig.rolename

player: carla.Vehicle

The linked actor. If external_actor is set this will be the first actor found with that role name.

camera_manager: CameraManager

Manages cameras for the user interface and HUD.

weather: str

Name of currently used weather preset. See also: CarlaDataProvider.find_weather_presets()

property client: carla.Client
static get_blueprint_library() carla.BlueprintLibrary

Access to a cached version of the blueprint library

Raises:

ValueError – If the blueprint library is not set, which happens when the world is not set. The world must be setup before (CarlaDataProvider.set_world()).

Return type:

carla.BlueprintLibrary

actors: List[carla.Actor | CustomSensorInterface]

Actors attached to this instance for the user interface and HUD.

rss_set_road_boundaries_mode(road_boundaries_mode: 'RssRoadBoundariesModeAlias' | carla.RssRoadBoundariesMode | bool | None = None) None[source]

Choose wether or not to use the RSS road boundaries feature.

Toggles: RssSettings.use_stay_on_road_feature

Parameters:

road_boundaries_mode (Optional[Union['RssRoadBoundariesModeAlias', carla.RssRoadBoundariesMode, bool]]) – If None, uses the value from the config. If True, sets to carla.RssRoadBoundariesMode.On. If False, sets to carla.RssRoadBoundariesMode.Off.

Return type:

None

See also

pause_simulation(pause: bool)[source]

Pauses the simulation by setting the world to synchronous mode.

Attention

Only works reliable in asynchronous mode (sync=False) and might lead to unexpected behavior in synchronous mode.

Parameters:

pause (bool)

restart()[source]

Restart the world and sets up the HUD sensors. If player is not set or external_actor is set, looks for an actor with the role name, or spawns a new actor.

Note

Called during __init__().

tick_server_world() int | carla.WorldSnapshot | None[source]

When LaunchConfig.handle_ticks is True uses carla.World.tick() or carla.World.wait_for_tick() depending on LaunchConfig.sync.

Return type:

int | carla.WorldSnapshot | None

tick(clock: pygame.time.Clock)[source]

Method for every tick

Parameters:

clock (pygame.time.Clock)

next_weather(reverse: bool = False) None[source]

Get next weather setting

Parameters:

reverse (bool)

Return type:

None

next_map_layer(reverse: bool = False) None[source]
Parameters:

reverse (bool)

Return type:

None

load_map_layer(unload: bool = False)[source]
Parameters:

unload (bool)

toggle_recording()[source]

Start recording images from the camera output.

Saved in LaunchConfig.camera.recorder.output_path with the current frame number.

toggle_radar()[source]

Adds or destroys a radar sensor for the user interface

modify_vehicle_physics(actor: carla.Vehicle)[source]
Parameters:

actor (carla.Vehicle)

finalize_render(display: pygame.Surface)[source]

Draws the HUD and saves the image if recording is enabled.

Attention

Assumes that render(..., finalize=False) was called before and something should be rendered in between. Use finalize_render() as the very last step in the render process.

Parameters:

display (pygame.Surface)

render(display: pygame.Surface, finalize: bool = True)[source]

Render the world and draw it to the pygame.Surface. This function is part of GameFramework.render_everything() which is the recommended way to handle the rendering process.

Hint

Recording of the fully rendered output should be done at the end of the render method, however CameraManager.render() is called first to render the camera.

Call with finalize=False to only render the camera but not the HUD.

Afterwards apply other render features and call finalize_render() to draw the HUD and save the image if recording is enabled.

Note

This renders the CameraManger and the RSS bounding boxes. If finalize is True, it will also render the HUD by calling finalize_render().

Parameters:
destroy_sensors()[source]

Destroy sensors

destroy(destroy_ego: bool = False)[source]

Destroys all actors

Parameters:

destroy_ego (bool) – If True, will destroy the player as well. Else assume that it is destroyed by someone else.

rss_check_control(vehicle_control: carla.VehicleControl) carla.VehicleControl | None[source]

Checks the vehicle control against the RSS restrictions and possibly proposes an alternative.

This is called during Phase.RSS_EVALUATION.

Todo

This should be an agent method, but currently the RssSensor is tied to this class and not the agent.

Parameters:

vehicle_control (carla.VehicleControl)

Return type:

carla.VehicleControl | None