Skip to content

component

This module defines the Component base class, which is instantiated for nearly every part of the system. It implements interfaces for communications, initialization, shutdown, etc.

T = TypeVar('T') module-attribute

WrappedComponentBase = TypeVar('WrappedComponentBase', bound=Component) module-attribute

component_registry = {} module-attribute

component_set = WeakSet() module-attribute

default_components = set() module-attribute

loaded_components = {} module-attribute

Component

Bases: ABC

An abstract component class for building pieces of ROC that will talk to each other.

Source code in roc/component.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class Component(ABC):
    """An abstract component class for building pieces of ROC that will talk to each other."""

    name: str = "<name unassigned>"
    type: str = "<type unassigned>"

    def __init__(self) -> None:
        global component_set
        component_set.add(self)
        self.bus_conns: dict[str, BusConnection[Any]] = {}
        logger.trace(f"++ incrementing component count: {self.name}:{self.type} {self}")
        # traceback.print_stack()

    def __del__(self) -> None:
        global component_set
        component_set.add(self)
        logger.trace(f"-- decrementing component count: {self.name}:{self.type} {self}")

    def connect_bus(self, bus: EventBus[T]) -> BusConnection[T]:
        """Create a new bus connection for the component, storing the result for
        later shutdown.

        Args:
            bus (EventBus[T]): The event bus to attach to

        Raises:
            ValueError: if the bus has already been connected to by this component

        Returns:
            BusConnection[T]: The bus connection for listening or sending events
        """
        if bus.name in self.bus_conns:
            raise ValueError(
                f"Component '{self.name}' attempting duplicate connection to bus '{bus.name}'"
            )

        conn = bus.connect(self)
        self.bus_conns[bus.name] = conn
        return conn

    def event_filter(self, e: Event[Any]) -> bool:
        """A filter for any incoming events. By default it filters out events
        sent by itself, but it is especially useful for creating new filters in
        sub-classes.

        Args:
            e (Event[Any]): The event to be evaluated

        Returns:
            bool: True if the event should be sent, False if it should be dropped
        """
        return e.src_id != self.id

    def shutdown(self) -> None:
        """De-initializes the component, removing any bus connections and any
        other clean-up that needs to be performed
        """
        logger.debug(f"Component {self.name}:{self.type} shutting down.")

        for conn in self.bus_conns:
            for obs in self.bus_conns[conn].attached_bus.subject.observers:
                obs.on_completed()
            self.bus_conns[conn].close()

    @property
    def id(self) -> ComponentId:
        return ComponentId(self.type, self.name)

    @staticmethod
    def init() -> None:
        """Loads all components registered as `auto` and perception components
        in the `perception_components` config field.
        """
        settings = Config.get()
        component_list = default_components
        logger.debug("perception components from settings", settings.perception_components)
        component_list = component_list.union(settings.perception_components, default_components)
        logger.debug(f"Component.init: default components: {component_list}")

        # TODO: shutdown previously loaded components

        for reg_str in component_list:
            logger.trace(f"Loading component: {reg_str} ...")
            (name, type) = reg_str.split(":")
            loaded_components[reg_str] = Component.get(name, type)

    @classmethod
    def get(cls, name: str, type: str, *args: Any, **kwargs: Any) -> Self:
        """Retreives a component with the specified name from the registry and
        creates a new version of it with the specified args. Used by
        `Config.init` and for testing.

        Args:
            name (str): The name of the component to get, as specified during
                its registration
            type (str): The type of the component to get, as specified during
                its registration
            args (Any): Fixed position arguments to pass to the Component
                constructor
            kwargs (Any): Keyword args to pass to the Component constructor

        Returns:
            Self: the component that was created, casted as the calling class.
            (e.g. `Perception.get(...)` will return a Perception component and
            `Action.get(...)` will return an Action component)
        """
        reg_str = _component_registry_key(name, type)
        return cast(Self, component_registry[reg_str](*args, **kwargs))

    @staticmethod
    def get_component_count() -> int:
        """Returns the number of currently created Components. The number goes
        up on __init__ and down on __del__. Primarily used for testing to ensure
        Components are being shutdown appropriately.

        Returns:
            int: The number of currently active Component instances
        """
        # global component_count
        # return component_count
        global component_set
        return len(component_set)

    @staticmethod
    def get_loaded_components() -> list[str]:
        """Returns the names and types of all initiated components.

        Returns:
            list[str]: A list of the names and types of components, as strings.
        """
        global loaded_components
        return [s for s in loaded_components.keys()]

    @staticmethod
    def deregister(name: str, type: str) -> None:
        """Removes a component from the Component registry. Primarlly used for testing.

        Args:
            name (str): The name of the Component to deregister
            type (str): The type of the Component to deregister
        """
        reg_str = _component_registry_key(name, type)
        del component_registry[reg_str]

    @staticmethod
    def reset() -> None:
        """Shuts down all components"""
        # shutdown all components
        global loaded_components
        for name in loaded_components:
            logger.trace(f"Shutting down component: {name}.")
            c = loaded_components[name]
            c.shutdown()

        loaded_components.clear()

        global component_set
        for c in component_set:
            c.shutdown()

bus_conns = {} instance-attribute

id property

name = '<name unassigned>' class-attribute instance-attribute

type = '<type unassigned>' class-attribute instance-attribute

__del__()

Source code in roc/component.py
48
49
50
51
def __del__(self) -> None:
    global component_set
    component_set.add(self)
    logger.trace(f"-- decrementing component count: {self.name}:{self.type} {self}")

__init__()

Source code in roc/component.py
41
42
43
44
45
def __init__(self) -> None:
    global component_set
    component_set.add(self)
    self.bus_conns: dict[str, BusConnection[Any]] = {}
    logger.trace(f"++ incrementing component count: {self.name}:{self.type} {self}")

connect_bus(bus)

Create a new bus connection for the component, storing the result for later shutdown.

Parameters:

Name Type Description Default
bus EventBus[T]

The event bus to attach to

required

Raises:

Type Description
ValueError

if the bus has already been connected to by this component

Returns:

Type Description
BusConnection[T]

BusConnection[T]: The bus connection for listening or sending events

Source code in roc/component.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def connect_bus(self, bus: EventBus[T]) -> BusConnection[T]:
    """Create a new bus connection for the component, storing the result for
    later shutdown.

    Args:
        bus (EventBus[T]): The event bus to attach to

    Raises:
        ValueError: if the bus has already been connected to by this component

    Returns:
        BusConnection[T]: The bus connection for listening or sending events
    """
    if bus.name in self.bus_conns:
        raise ValueError(
            f"Component '{self.name}' attempting duplicate connection to bus '{bus.name}'"
        )

    conn = bus.connect(self)
    self.bus_conns[bus.name] = conn
    return conn

deregister(name, type) staticmethod

Removes a component from the Component registry. Primarlly used for testing.

Parameters:

Name Type Description Default
name str

The name of the Component to deregister

required
type str

The type of the Component to deregister

required
Source code in roc/component.py
168
169
170
171
172
173
174
175
176
177
@staticmethod
def deregister(name: str, type: str) -> None:
    """Removes a component from the Component registry. Primarlly used for testing.

    Args:
        name (str): The name of the Component to deregister
        type (str): The type of the Component to deregister
    """
    reg_str = _component_registry_key(name, type)
    del component_registry[reg_str]

event_filter(e)

A filter for any incoming events. By default it filters out events sent by itself, but it is especially useful for creating new filters in sub-classes.

Parameters:

Name Type Description Default
e Event[Any]

The event to be evaluated

required

Returns:

Name Type Description
bool bool

True if the event should be sent, False if it should be dropped

Source code in roc/component.py
75
76
77
78
79
80
81
82
83
84
85
86
def event_filter(self, e: Event[Any]) -> bool:
    """A filter for any incoming events. By default it filters out events
    sent by itself, but it is especially useful for creating new filters in
    sub-classes.

    Args:
        e (Event[Any]): The event to be evaluated

    Returns:
        bool: True if the event should be sent, False if it should be dropped
    """
    return e.src_id != self.id

get(name, type, *args, **kwargs) classmethod

Retreives a component with the specified name from the registry and creates a new version of it with the specified args. Used by Config.init and for testing.

Parameters:

Name Type Description Default
name str

The name of the component to get, as specified during its registration

required
type str

The type of the component to get, as specified during its registration

required
args Any

Fixed position arguments to pass to the Component constructor

()
kwargs Any

Keyword args to pass to the Component constructor

{}

Returns:

Name Type Description
Self Self

the component that was created, casted as the calling class.

Self

(e.g. Perception.get(...) will return a Perception component and

Self

Action.get(...) will return an Action component)

Source code in roc/component.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@classmethod
def get(cls, name: str, type: str, *args: Any, **kwargs: Any) -> Self:
    """Retreives a component with the specified name from the registry and
    creates a new version of it with the specified args. Used by
    `Config.init` and for testing.

    Args:
        name (str): The name of the component to get, as specified during
            its registration
        type (str): The type of the component to get, as specified during
            its registration
        args (Any): Fixed position arguments to pass to the Component
            constructor
        kwargs (Any): Keyword args to pass to the Component constructor

    Returns:
        Self: the component that was created, casted as the calling class.
        (e.g. `Perception.get(...)` will return a Perception component and
        `Action.get(...)` will return an Action component)
    """
    reg_str = _component_registry_key(name, type)
    return cast(Self, component_registry[reg_str](*args, **kwargs))

get_component_count() staticmethod

Returns the number of currently created Components. The number goes up on init and down on del. Primarily used for testing to ensure Components are being shutdown appropriately.

Returns:

Name Type Description
int int

The number of currently active Component instances

Source code in roc/component.py
144
145
146
147
148
149
150
151
152
153
154
155
156
@staticmethod
def get_component_count() -> int:
    """Returns the number of currently created Components. The number goes
    up on __init__ and down on __del__. Primarily used for testing to ensure
    Components are being shutdown appropriately.

    Returns:
        int: The number of currently active Component instances
    """
    # global component_count
    # return component_count
    global component_set
    return len(component_set)

get_loaded_components() staticmethod

Returns the names and types of all initiated components.

Returns:

Type Description
list[str]

list[str]: A list of the names and types of components, as strings.

Source code in roc/component.py
158
159
160
161
162
163
164
165
166
@staticmethod
def get_loaded_components() -> list[str]:
    """Returns the names and types of all initiated components.

    Returns:
        list[str]: A list of the names and types of components, as strings.
    """
    global loaded_components
    return [s for s in loaded_components.keys()]

init() staticmethod

Loads all components registered as auto and perception components in the perception_components config field.

Source code in roc/component.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@staticmethod
def init() -> None:
    """Loads all components registered as `auto` and perception components
    in the `perception_components` config field.
    """
    settings = Config.get()
    component_list = default_components
    logger.debug("perception components from settings", settings.perception_components)
    component_list = component_list.union(settings.perception_components, default_components)
    logger.debug(f"Component.init: default components: {component_list}")

    # TODO: shutdown previously loaded components

    for reg_str in component_list:
        logger.trace(f"Loading component: {reg_str} ...")
        (name, type) = reg_str.split(":")
        loaded_components[reg_str] = Component.get(name, type)

reset() staticmethod

Shuts down all components

Source code in roc/component.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@staticmethod
def reset() -> None:
    """Shuts down all components"""
    # shutdown all components
    global loaded_components
    for name in loaded_components:
        logger.trace(f"Shutting down component: {name}.")
        c = loaded_components[name]
        c.shutdown()

    loaded_components.clear()

    global component_set
    for c in component_set:
        c.shutdown()

shutdown()

De-initializes the component, removing any bus connections and any other clean-up that needs to be performed

Source code in roc/component.py
88
89
90
91
92
93
94
95
96
97
def shutdown(self) -> None:
    """De-initializes the component, removing any bus connections and any
    other clean-up that needs to be performed
    """
    logger.debug(f"Component {self.name}:{self.type} shutting down.")

    for conn in self.bus_conns:
        for obs in self.bus_conns[conn].attached_bus.subject.observers:
            obs.on_completed()
        self.bus_conns[conn].close()

ComponentId

Bases: NamedTuple

Source code in roc/component.py
27
28
29
30
31
32
class ComponentId(NamedTuple):
    type: str
    name: str

    def __str__(self) -> str:
        return f"{self.name}:{self.type}"

name instance-attribute

type instance-attribute

__str__()

Source code in roc/component.py
31
32
def __str__(self) -> str:
    return f"{self.name}:{self.type}"

register_component

A decorator to register a new component.

Source code in roc/component.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class register_component:
    """A decorator to register a new component."""

    def __init__(self, name: str, type: str, *, auto: bool = False) -> None:
        self.name = name
        self.type = type
        self.auto = auto

    def __call__(self, cls: type[Component]) -> type[Component]:  # noqa: D102
        global register_component
        global component_registry

        logger.trace(f"Registering component: {self.name}:{self.type} (auto={self.auto})")

        reg_str = _component_registry_key(self.name, self.type)
        if reg_str in component_registry:
            raise ValueError(f"Registering duplicate component name: '{self.name}'")

        if self.auto:
            global default_components
            default_components.add(reg_str)

        component_registry[reg_str] = cls
        cls.name = self.name
        cls.type = self.type

        return cls

auto = auto instance-attribute

name = name instance-attribute

type = type instance-attribute

__call__(cls)

Source code in roc/component.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def __call__(self, cls: type[Component]) -> type[Component]:  # noqa: D102
    global register_component
    global component_registry

    logger.trace(f"Registering component: {self.name}:{self.type} (auto={self.auto})")

    reg_str = _component_registry_key(self.name, self.type)
    if reg_str in component_registry:
        raise ValueError(f"Registering duplicate component name: '{self.name}'")

    if self.auto:
        global default_components
        default_components.add(reg_str)

    component_registry[reg_str] = cls
    cls.name = self.name
    cls.type = self.type

    return cls

__init__(name, type, *, auto=False)

Source code in roc/component.py
208
209
210
211
def __init__(self, name: str, type: str, *, auto: bool = False) -> None:
    self.name = name
    self.type = type
    self.auto = auto