Skip to content

perception

The Perception system breaks down the environment into features that can be re-assembled as concepts.

FeatureNodeType = TypeVar('FeatureNodeType', bound=FeatureNode) module-attribute

FeatureType = TypeVar('FeatureType') module-attribute

PerceptionData = VisionData | Settled | Feature[Any] module-attribute

PerceptionEvent = Event[PerceptionData] module-attribute

cache_registry = defaultdict(WeakValueDictionary) module-attribute

fe_list = [] module-attribute

AreaFeature dataclass

Bases: Feature[FeatureNodeType]

Source code in roc/perception.py
160
161
162
163
164
165
166
167
168
169
170
@dataclass(kw_only=True)
class AreaFeature(Feature[FeatureNodeType]):
    type: int
    points: set[tuple[XLoc, YLoc]]
    size: int

    def get_points(self) -> set[tuple[XLoc, YLoc]]:
        return self.points

    def node_hash(self) -> int:
        return hash((self.type, self.size))

points instance-attribute

size instance-attribute

type instance-attribute

__init__(*, feature_name, origin_id, type, points, size)

get_points()

Source code in roc/perception.py
166
167
def get_points(self) -> set[tuple[XLoc, YLoc]]:
    return self.points

node_hash()

Source code in roc/perception.py
169
170
def node_hash(self) -> int:
    return hash((self.type, self.size))

Detail

Bases: Edge

Source code in roc/perception.py
101
102
class Detail(Edge):
    allowed_connections: EdgeConnectionsList = [("FeatureGroup", "FeatureNode")]

allowed_connections = [('FeatureGroup', 'FeatureNode')] class-attribute instance-attribute

Direction

Bases: str, Enum

Source code in roc/perception.py
87
88
89
90
91
92
93
94
95
96
97
98
class Direction(str, Enum):
    up = "UP"
    down = "DOWN"
    left = "LEFT"
    right = "RIGHT"
    up_right = "UP_RIGHT"
    up_left = "UP_LEFT"
    down_right = "DOWN_RIGHT"
    down_left = "DOWN_LEFT"

    def __str__(self) -> str:
        return self.value

down = 'DOWN' class-attribute instance-attribute

down_left = 'DOWN_LEFT' class-attribute instance-attribute

down_right = 'DOWN_RIGHT' class-attribute instance-attribute

left = 'LEFT' class-attribute instance-attribute

right = 'RIGHT' class-attribute instance-attribute

up = 'UP' class-attribute instance-attribute

up_left = 'UP_LEFT' class-attribute instance-attribute

up_right = 'UP_RIGHT' class-attribute instance-attribute

__str__()

Source code in roc/perception.py
97
98
def __str__(self) -> str:
    return self.value

Feature dataclass

Bases: ABC, Generic[FeatureNodeType]

Source code in roc/perception.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@dataclass(kw_only=True)
class Feature(ABC, Generic[FeatureNodeType]):
    feature_name: str
    origin_id: tuple[str, str]

    def to_nodes(self) -> FeatureNodeType:
        # check local cache
        cache = cache_registry[self.feature_name]

        h = self.node_hash()
        if h in cache:
            return cache[h]  # type: ignore

        # if cache miss, find node in database
        n = self._dbfetch_nodes()
        if n is None:
            # if node doesn't exist, create it
            n = self._create_nodes()
            # n.labels.add("Feature")
            # n.labels.add(self.feature_name)

        cache[h] = n
        return n

    @abstractmethod
    def get_points(self) -> set[tuple[XLoc, YLoc]]: ...

    @abstractmethod
    def _create_nodes(self) -> FeatureNodeType: ...

    @abstractmethod
    def _dbfetch_nodes(self) -> FeatureNodeType | None: ...

    @abstractmethod
    def node_hash(self) -> int: ...

feature_name instance-attribute

origin_id instance-attribute

__init__(*, feature_name, origin_id)

get_points() abstractmethod

Source code in roc/perception.py
147
148
@abstractmethod
def get_points(self) -> set[tuple[XLoc, YLoc]]: ...

node_hash() abstractmethod

Source code in roc/perception.py
156
157
@abstractmethod
def node_hash(self) -> int: ...

to_nodes()

Source code in roc/perception.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def to_nodes(self) -> FeatureNodeType:
    # check local cache
    cache = cache_registry[self.feature_name]

    h = self.node_hash()
    if h in cache:
        return cache[h]  # type: ignore

    # if cache miss, find node in database
    n = self._dbfetch_nodes()
    if n is None:
        # if node doesn't exist, create it
        n = self._create_nodes()
        # n.labels.add("Feature")
        # n.labels.add(self.feature_name)

    cache[h] = n
    return n

FeatureExtractor

Bases: Perception, Generic[FeatureType], ABC

Source code in roc/perception.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class FeatureExtractor(Perception, Generic[FeatureType], ABC):
    def __init__(self) -> None:
        super().__init__()
        fe_list.append(ref(self))

    def do_perception(self, e: PerceptionEvent) -> None:
        f = self.get_feature(e)
        if f is None:
            return

        self.pb_conn.send(f)

    def settled(self) -> None:
        self.pb_conn.send(Settled())

    @abstractmethod
    def get_feature(self, e: PerceptionEvent) -> Feature[Any] | None: ...

    @classmethod
    def list(cls) -> list[str]:
        ret: list[str] = []
        for fe_ref in fe_list:
            fe = fe_ref()
            if fe is None:
                continue
            ret.append(str(fe.id))

        return ret

__init__()

Source code in roc/perception.py
214
215
216
def __init__(self) -> None:
    super().__init__()
    fe_list.append(ref(self))

do_perception(e)

Source code in roc/perception.py
218
219
220
221
222
223
def do_perception(self, e: PerceptionEvent) -> None:
    f = self.get_feature(e)
    if f is None:
        return

    self.pb_conn.send(f)

get_feature(e) abstractmethod

Source code in roc/perception.py
228
229
@abstractmethod
def get_feature(self, e: PerceptionEvent) -> Feature[Any] | None: ...

list() classmethod

Source code in roc/perception.py
231
232
233
234
235
236
237
238
239
240
@classmethod
def list(cls) -> list[str]:
    ret: list[str] = []
    for fe_ref in fe_list:
        fe = fe_ref()
        if fe is None:
            continue
        ret.append(str(fe.id))

    return ret

settled()

Source code in roc/perception.py
225
226
def settled(self) -> None:
    self.pb_conn.send(Settled())

FeatureNode

Bases: Node

Source code in roc/perception.py
105
106
107
108
109
110
111
112
113
114
115
116
class FeatureNode(Node):
    def __hash__(self) -> int:
        # XXX: this is dangerous because ID changes when a node is saved
        # should be okay for this use case though
        return self.id

    def __str__(self) -> str:
        return f"""{self.__class__.__name__}({",".join(self.attr_strs)})"""

    @property
    @abstractmethod
    def attr_strs(self) -> list[str]: ...

attr_strs abstractmethod property

__hash__()

Source code in roc/perception.py
106
107
108
109
def __hash__(self) -> int:
    # XXX: this is dangerous because ID changes when a node is saved
    # should be okay for this use case though
    return self.id

__str__()

Source code in roc/perception.py
111
112
def __str__(self) -> str:
    return f"""{self.__class__.__name__}({",".join(self.attr_strs)})"""

HashingNoneFeature

Bases: Exception

Source code in roc/perception.py
243
244
class HashingNoneFeature(Exception):
    pass

Perception

Bases: Component, ABC

The abstract class for Perception components. Handles perception bus connections and corresponding clean-up.

Source code in roc/perception.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class Perception(Component, ABC):
    """The abstract class for Perception components. Handles perception bus
    connections and corresponding clean-up.
    """

    bus = EventBus[PerceptionData]("perception")

    def __init__(self) -> None:
        super().__init__()
        self.pb_conn = self.connect_bus(Perception.bus)
        self.pb_conn.listen(self.do_perception)

    @abstractmethod
    def do_perception(self, e: PerceptionEvent) -> None: ...

    @classmethod
    def init(cls) -> None:
        global perception_bus
        cls.bus = EventBus[PerceptionData]("perception")

bus = EventBus[PerceptionData]('perception') class-attribute instance-attribute

pb_conn = self.connect_bus(Perception.bus) instance-attribute

__init__()

Source code in roc/perception.py
196
197
198
199
def __init__(self) -> None:
    super().__init__()
    self.pb_conn = self.connect_bus(Perception.bus)
    self.pb_conn.listen(self.do_perception)

do_perception(e) abstractmethod

Source code in roc/perception.py
201
202
@abstractmethod
def do_perception(self, e: PerceptionEvent) -> None: ...

init() classmethod

Source code in roc/perception.py
204
205
206
207
@classmethod
def init(cls) -> None:
    global perception_bus
    cls.bus = EventBus[PerceptionData]("perception")

PointFeature dataclass

Bases: Feature[FeatureNodeType]

Source code in roc/perception.py
173
174
175
176
177
178
179
180
181
182
@dataclass(kw_only=True)
class PointFeature(Feature[FeatureNodeType]):
    type: int
    point: tuple[XLoc, YLoc]

    def get_points(self) -> set[tuple[XLoc, YLoc]]:
        return {self.point}

    def node_hash(self) -> int:
        return self.type

point instance-attribute

type instance-attribute

__init__(*, feature_name, origin_id, type, point)

get_points()

Source code in roc/perception.py
178
179
def get_points(self) -> set[tuple[XLoc, YLoc]]:
    return {self.point}

node_hash()

Source code in roc/perception.py
181
182
def node_hash(self) -> int:
    return self.type

Settled

Source code in roc/perception.py
83
84
class Settled:
    pass

VisionData

Vision data received from the environment.

Source code in roc/perception.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class VisionData:
    """Vision data received from the environment."""

    def __init__(
        self,
        glyphs: npt.NDArray[Any],
        chars: npt.NDArray[Any],
        colors: npt.NDArray[Any],
    ) -> None:
        self.glyphs = glyphs
        self.chars = chars
        self.colors = colors

    @staticmethod
    def from_dict(d: dict[str, Any]) -> VisionData:
        """Creates VisionData from an arbitrary dictionary

        Args:
            d (dict[str, Any]): The dictionary to create VisionData from. Must
            have 'chars', 'glyphs', and 'colors' members.

        Returns:
            VisionData: The newly created vision data.
        """

        def to_numpy(d: dict[str, Any], k: str) -> np.ndarray[Any, Any]:
            if not k in d:
                raise Exception(f"Expected '{k}' to exist in dict for VisionData.from_dict()")

            v = d[k]
            if not isinstance(v, np.ndarray):
                return np.array(v)
            return v

        glyphs = to_numpy(d, "glyphs")
        chars = to_numpy(d, "chars")
        colors = to_numpy(d, "colors")
        return VisionData(glyphs, chars, colors)

    @staticmethod
    def for_test(test_data: list[list[int]]) -> VisionData:
        """Creates VisionData for a test case, using a static 2D list of values
        to create all aspects of the VisionData

        Args:
            test_data (list[list[int]]): The test data to convert into VisionData

        Returns:
            VisionData: The created VisionData
        """
        a = np.array(test_data)
        return VisionData(a.copy(), a.copy(), a.copy())

chars = chars instance-attribute

colors = colors instance-attribute

glyphs = glyphs instance-attribute

__init__(glyphs, chars, colors)

Source code in roc/perception.py
28
29
30
31
32
33
34
35
36
def __init__(
    self,
    glyphs: npt.NDArray[Any],
    chars: npt.NDArray[Any],
    colors: npt.NDArray[Any],
) -> None:
    self.glyphs = glyphs
    self.chars = chars
    self.colors = colors

for_test(test_data) staticmethod

Creates VisionData for a test case, using a static 2D list of values to create all aspects of the VisionData

Parameters:

Name Type Description Default
test_data list[list[int]]

The test data to convert into VisionData

required

Returns:

Name Type Description
VisionData VisionData

The created VisionData

Source code in roc/perception.py
64
65
66
67
68
69
70
71
72
73
74
75
76
@staticmethod
def for_test(test_data: list[list[int]]) -> VisionData:
    """Creates VisionData for a test case, using a static 2D list of values
    to create all aspects of the VisionData

    Args:
        test_data (list[list[int]]): The test data to convert into VisionData

    Returns:
        VisionData: The created VisionData
    """
    a = np.array(test_data)
    return VisionData(a.copy(), a.copy(), a.copy())

from_dict(d) staticmethod

Creates VisionData from an arbitrary dictionary

Parameters:

Name Type Description Default
d dict[str, Any]

The dictionary to create VisionData from. Must

required

Returns:

Name Type Description
VisionData VisionData

The newly created vision data.

Source code in roc/perception.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@staticmethod
def from_dict(d: dict[str, Any]) -> VisionData:
    """Creates VisionData from an arbitrary dictionary

    Args:
        d (dict[str, Any]): The dictionary to create VisionData from. Must
        have 'chars', 'glyphs', and 'colors' members.

    Returns:
        VisionData: The newly created vision data.
    """

    def to_numpy(d: dict[str, Any], k: str) -> np.ndarray[Any, Any]:
        if not k in d:
            raise Exception(f"Expected '{k}' to exist in dict for VisionData.from_dict()")

        v = d[k]
        if not isinstance(v, np.ndarray):
            return np.array(v)
        return v

    glyphs = to_numpy(d, "glyphs")
    chars = to_numpy(d, "chars")
    colors = to_numpy(d, "colors")
    return VisionData(glyphs, chars, colors)