Skip to content

attention

Aggregates all Perception events and determines which locations / objects should received focus

AttentionData = VisionAttentionData module-attribute

AttentionEvent = Event[AttentionData] module-attribute

Attention

Bases: Component, ABC

Source code in roc/attention.py
61
62
class Attention(Component, ABC):
    bus = EventBus[AttentionData]("attention")

bus = EventBus[AttentionData]('attention') class-attribute instance-attribute

CrossModalAttention

Bases: Attention

Source code in roc/attention.py
283
284
285
286
class CrossModalAttention(Attention):
    # TODO: listen for attention events
    # TODO: select and emit a single event
    pass

SaliencyMap

Bases: Grid[list[Feature[Any]]]

Source code in roc/attention.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class SaliencyMap(Grid[list[Feature[Any]]]):
    grid: IntGrid | None

    def __new__(cls, grid: IntGrid | None = None) -> Self:
        settings = Config.get()
        my_shape = grid.shape if grid is not None else settings.observation_shape
        assert my_shape is not None
        obj = np.ndarray(my_shape, dtype=object).view(cls)
        for row, col in np.ndindex(my_shape):
            obj[row, col] = list()
        obj.grid = grid

        return obj

    def __str__(self) -> str:
        assert self.grid is not None
        dg = DebugGrid(self.grid)
        max_str = self.get_max_strength()

        # prevent divide by zero
        if max_str == 0:
            max_str = 1

        for p in self.grid.points():
            rel_strength = self.get_strength(p.x, p.y) / max_str
            color = DebugGrid.blue_to_red_hue(rel_strength)
            dg.set_style(p.x, p.y, back_brightness=1, back_hue=color)
        return str(dg)

    def __array_finalize__(self, obj: npt.NDArray[Any] | None) -> None:
        if obj is None:
            return
        self.grid = getattr(obj, "grid", None)

    def __deepcopy__(self, memodict: object | None = None) -> SaliencyMap:
        sm = SaliencyMap(deepcopy(self.grid))
        for row, col in np.ndindex(self.shape):
            sm[row, col] = self[row, col].copy()
        return sm

    def clear(self) -> None:
        """Clears out all values from the SaliencyMap."""
        for row, col in np.ndindex(self.shape):
            self[row, col].clear()

    @property
    def size(self) -> int:
        sz = 0
        for val in self:
            sz = sz + len(val)

        return sz

    def add_val(self, x: int, y: int, val: Feature[Any]) -> None:
        feature_list = self.get_val(x, y)
        feature_list.append(val)

    def get_max_strength(self) -> int:
        max = 0
        for y in range(self.height):
            for x in range(self.width):
                curr = self.get_strength(x, y)
                if max < curr:
                    max = curr

        return max

    def get_strength(self, x: int, y: int) -> int:
        feature_list = self.get_val(x, y)
        # TODO: not really sure that the strength should depend on the number of features
        ret = len(feature_list)

        def add_strength(f: Feature[Any]) -> None:
            nonlocal ret

            # TODO: this is pretty arbitrary and might be biased based on my
            # domain knowledge... I suspect I will come back and modify this
            # based on object recognition and other factors at some point in
            # the future
            if f.feature_name == "Single":
                ret += 10
            if f.feature_name == "Delta":
                ret += 15
            if f.feature_name == "Motion":
                ret += 20

        for f in feature_list:
            add_strength(f)

        return ret

    def feature_report(self) -> dict[str, int]:
        feature_id: dict[str, set[int]] = dict()

        # create a set of unique IDs for every distinct feature
        for row, col in np.ndindex(self.shape):
            feature_list = self[row, col]
            for f in feature_list:
                feature_name = f.feature_name
                if feature_name not in feature_id:
                    feature_id[feature_name] = set()
                feature_id[feature_name].add(id(f))

        # count all the sets
        ret = {k: len(feature_id[k]) for k in feature_id}
        return ret

    def get_focus(self) -> DataSet[VisionAttentionSchema]:
        max_str = self.get_max_strength()

        # prevent divide by zero
        if max_str == 0:
            max_str = 1

        fkimg = np.array(
            [
                [self.get_strength(x, y) / max_str for y in range(self.height)]
                for x in range(self.width)
            ]
        )

        # find peaks through dilation
        seed = np.copy(fkimg)
        seed[1:-1, 1:-1] = fkimg.min()
        rec = reconstruction(seed, fkimg, method="dilation")
        peaks = fkimg - rec

        # get coordinates of peaks
        nz = peaks.nonzero()
        coords = np.column_stack(nz)

        # label points that are adjacent / diagonal
        structure = np.ones((3, 3), dtype=int)
        labeled, ncomponents = label(peaks, structure)

        # get values for each coordinate
        flat_indicies = np.ravel_multi_index(tuple(coords.T), fkimg.shape)
        vals = np.take(fkimg, flat_indicies)
        labels = np.take(labeled, flat_indicies)

        # create table of peak info, ordered by strength
        df = (
            pd.DataFrame(
                {
                    "x": nz[0],
                    "y": nz[1],
                    "strength": vals,
                    "label": labels,
                }
            )
            .astype({"x": int, "y": int, "strength": float, "label": int})
            .sort_values("strength", ascending=False)
            .reset_index(drop=True)
        )

        return DataSet[VisionAttentionSchema](df)

grid instance-attribute

size property

__array_finalize__(obj)

Source code in roc/attention.py
94
95
96
97
def __array_finalize__(self, obj: npt.NDArray[Any] | None) -> None:
    if obj is None:
        return
    self.grid = getattr(obj, "grid", None)

__deepcopy__(memodict=None)

Source code in roc/attention.py
 99
100
101
102
103
def __deepcopy__(self, memodict: object | None = None) -> SaliencyMap:
    sm = SaliencyMap(deepcopy(self.grid))
    for row, col in np.ndindex(self.shape):
        sm[row, col] = self[row, col].copy()
    return sm

__new__(grid=None)

Source code in roc/attention.py
68
69
70
71
72
73
74
75
76
77
def __new__(cls, grid: IntGrid | None = None) -> Self:
    settings = Config.get()
    my_shape = grid.shape if grid is not None else settings.observation_shape
    assert my_shape is not None
    obj = np.ndarray(my_shape, dtype=object).view(cls)
    for row, col in np.ndindex(my_shape):
        obj[row, col] = list()
    obj.grid = grid

    return obj

__str__()

Source code in roc/attention.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __str__(self) -> str:
    assert self.grid is not None
    dg = DebugGrid(self.grid)
    max_str = self.get_max_strength()

    # prevent divide by zero
    if max_str == 0:
        max_str = 1

    for p in self.grid.points():
        rel_strength = self.get_strength(p.x, p.y) / max_str
        color = DebugGrid.blue_to_red_hue(rel_strength)
        dg.set_style(p.x, p.y, back_brightness=1, back_hue=color)
    return str(dg)

add_val(x, y, val)

Source code in roc/attention.py
118
119
120
def add_val(self, x: int, y: int, val: Feature[Any]) -> None:
    feature_list = self.get_val(x, y)
    feature_list.append(val)

clear()

Clears out all values from the SaliencyMap.

Source code in roc/attention.py
105
106
107
108
def clear(self) -> None:
    """Clears out all values from the SaliencyMap."""
    for row, col in np.ndindex(self.shape):
        self[row, col].clear()

feature_report()

Source code in roc/attention.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def feature_report(self) -> dict[str, int]:
    feature_id: dict[str, set[int]] = dict()

    # create a set of unique IDs for every distinct feature
    for row, col in np.ndindex(self.shape):
        feature_list = self[row, col]
        for f in feature_list:
            feature_name = f.feature_name
            if feature_name not in feature_id:
                feature_id[feature_name] = set()
            feature_id[feature_name].add(id(f))

    # count all the sets
    ret = {k: len(feature_id[k]) for k in feature_id}
    return ret

get_focus()

Source code in roc/attention.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def get_focus(self) -> DataSet[VisionAttentionSchema]:
    max_str = self.get_max_strength()

    # prevent divide by zero
    if max_str == 0:
        max_str = 1

    fkimg = np.array(
        [
            [self.get_strength(x, y) / max_str for y in range(self.height)]
            for x in range(self.width)
        ]
    )

    # find peaks through dilation
    seed = np.copy(fkimg)
    seed[1:-1, 1:-1] = fkimg.min()
    rec = reconstruction(seed, fkimg, method="dilation")
    peaks = fkimg - rec

    # get coordinates of peaks
    nz = peaks.nonzero()
    coords = np.column_stack(nz)

    # label points that are adjacent / diagonal
    structure = np.ones((3, 3), dtype=int)
    labeled, ncomponents = label(peaks, structure)

    # get values for each coordinate
    flat_indicies = np.ravel_multi_index(tuple(coords.T), fkimg.shape)
    vals = np.take(fkimg, flat_indicies)
    labels = np.take(labeled, flat_indicies)

    # create table of peak info, ordered by strength
    df = (
        pd.DataFrame(
            {
                "x": nz[0],
                "y": nz[1],
                "strength": vals,
                "label": labels,
            }
        )
        .astype({"x": int, "y": int, "strength": float, "label": int})
        .sort_values("strength", ascending=False)
        .reset_index(drop=True)
    )

    return DataSet[VisionAttentionSchema](df)

get_max_strength()

Source code in roc/attention.py
122
123
124
125
126
127
128
129
130
def get_max_strength(self) -> int:
    max = 0
    for y in range(self.height):
        for x in range(self.width):
            curr = self.get_strength(x, y)
            if max < curr:
                max = curr

    return max

get_strength(x, y)

Source code in roc/attention.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_strength(self, x: int, y: int) -> int:
    feature_list = self.get_val(x, y)
    # TODO: not really sure that the strength should depend on the number of features
    ret = len(feature_list)

    def add_strength(f: Feature[Any]) -> None:
        nonlocal ret

        # TODO: this is pretty arbitrary and might be biased based on my
        # domain knowledge... I suspect I will come back and modify this
        # based on object recognition and other factors at some point in
        # the future
        if f.feature_name == "Single":
            ret += 10
        if f.feature_name == "Delta":
            ret += 15
        if f.feature_name == "Motion":
            ret += 20

    for f in feature_list:
        add_strength(f)

    return ret

VisionAttention

Bases: Attention

Source code in roc/attention.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@register_component("vision", "attention", auto=True)
class VisionAttention(Attention):
    saliency_map: SaliencyMap

    def __init__(self) -> None:
        super().__init__()
        self.pb_conn = self.connect_bus(Perception.bus)
        self.pb_conn.listen(self.do_attention)
        self.att_conn = self.connect_bus(Attention.bus)
        self.saliency_map = SaliencyMap()
        self.settled: set[str] = set()

    def event_filter(self, e: PerceptionEvent) -> bool:
        allow = (
            isinstance(e.data, Feature)
            or isinstance(e.data, Settled)
            or isinstance(e.data, VisionData)
        )
        return allow

    def do_attention(self, e: PerceptionEvent) -> None:
        # create right-sized SaliencyMap based on VisionData
        if isinstance(e.data, VisionData):
            self.saliency_map.grid = IntGrid(e.data.chars)
            return

        # check to see if all feature extractors have settled
        if isinstance(e.data, Settled):
            self.settled.add(str(e.src_id))

            unsettled = set(FeatureExtractor.list()) - self.settled
            if len(unsettled) == 0:
                assert self.saliency_map is not None
                focus = self.saliency_map.get_focus()

                self.att_conn.send(
                    VisionAttentionData(
                        focus_points=self.saliency_map.get_focus(),
                        saliency_map=self.saliency_map,
                    )
                )

                # reset
                self.settled.clear()
                self.saliency_map = SaliencyMap()

            return

        # register each location in the saliency map
        assert isinstance(e.data, Feature)
        f = e.data

        # create saliency map
        for p in f.get_points():
            self.saliency_map.add_val(p[0], p[1], f)

att_conn = self.connect_bus(Attention.bus) instance-attribute

pb_conn = self.connect_bus(Perception.bus) instance-attribute

saliency_map = SaliencyMap() instance-attribute

settled = set() instance-attribute

__init__()

Source code in roc/attention.py
227
228
229
230
231
232
233
def __init__(self) -> None:
    super().__init__()
    self.pb_conn = self.connect_bus(Perception.bus)
    self.pb_conn.listen(self.do_attention)
    self.att_conn = self.connect_bus(Attention.bus)
    self.saliency_map = SaliencyMap()
    self.settled: set[str] = set()

do_attention(e)

Source code in roc/attention.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def do_attention(self, e: PerceptionEvent) -> None:
    # create right-sized SaliencyMap based on VisionData
    if isinstance(e.data, VisionData):
        self.saliency_map.grid = IntGrid(e.data.chars)
        return

    # check to see if all feature extractors have settled
    if isinstance(e.data, Settled):
        self.settled.add(str(e.src_id))

        unsettled = set(FeatureExtractor.list()) - self.settled
        if len(unsettled) == 0:
            assert self.saliency_map is not None
            focus = self.saliency_map.get_focus()

            self.att_conn.send(
                VisionAttentionData(
                    focus_points=self.saliency_map.get_focus(),
                    saliency_map=self.saliency_map,
                )
            )

            # reset
            self.settled.clear()
            self.saliency_map = SaliencyMap()

        return

    # register each location in the saliency map
    assert isinstance(e.data, Feature)
    f = e.data

    # create saliency map
    for p in f.get_points():
        self.saliency_map.add_val(p[0], p[1], f)

event_filter(e)

Source code in roc/attention.py
235
236
237
238
239
240
241
def event_filter(self, e: PerceptionEvent) -> bool:
    allow = (
        isinstance(e.data, Feature)
        or isinstance(e.data, Settled)
        or isinstance(e.data, VisionData)
    )
    return allow

VisionAttentionData dataclass

Source code in roc/attention.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass
class VisionAttentionData:
    focus_points: DataSet[VisionAttentionSchema]
    saliency_map: SaliencyMap

    def __str__(self) -> str:
        assert self.saliency_map.grid is not None
        dg = DebugGrid(self.saliency_map.grid)

        for idx, row in self.focus_points.iterrows():
            x = int(row["x"])
            y = int(row["y"])
            dg.set_style(x, y, back_brightness=row["strength"], back_hue=1)

        return f"{str(dg)}\n\nFocus Points:\n{self.focus_points}"

focus_points instance-attribute

saliency_map instance-attribute

__init__(focus_points, saliency_map)

__str__()

Source code in roc/attention.py
45
46
47
48
49
50
51
52
53
54
def __str__(self) -> str:
    assert self.saliency_map.grid is not None
    dg = DebugGrid(self.saliency_map.grid)

    for idx, row in self.focus_points.iterrows():
        x = int(row["x"])
        y = int(row["y"])
        dg.set_style(x, y, back_brightness=row["strength"], back_hue=1)

    return f"{str(dg)}\n\nFocus Points:\n{self.focus_points}"

VisionAttentionSchema

Source code in roc/attention.py
33
34
35
36
37
class VisionAttentionSchema:
    x: int
    y: int
    strength: float
    label: int

label instance-attribute

strength instance-attribute

x instance-attribute

y instance-attribute