Skip to content

flood

MIN_FLOOD_SIZE = 5 module-attribute

CheckMap

Internal utility class for tracking which points in a flood have already been checked

Source code in roc/feature_extractors/flood.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class CheckMap:
    """Internal utility class for tracking which points in a flood have already been checked"""

    def __init__(self, width: int, height: int) -> None:
        a = np.zeros((height, width))
        self.grid = a.view(IntGrid)

    def find_first_unused_point(self) -> Point | None:
        for p in self.grid.points():
            if p.val == 0:
                return p

        return None

    def set(self, x: int, y: int) -> None:
        self.grid.set_val(x, y, 1)

    def checked(self, x: int, y: int) -> bool:
        return self.grid.get_val(x, y) == 1

grid = a.view(IntGrid) instance-attribute

__init__(width, height)

Source code in roc/feature_extractors/flood.py
45
46
47
def __init__(self, width: int, height: int) -> None:
    a = np.zeros((height, width))
    self.grid = a.view(IntGrid)

checked(x, y)

Source code in roc/feature_extractors/flood.py
59
60
def checked(self, x: int, y: int) -> bool:
    return self.grid.get_val(x, y) == 1

find_first_unused_point()

Source code in roc/feature_extractors/flood.py
49
50
51
52
53
54
def find_first_unused_point(self) -> Point | None:
    for p in self.grid.points():
        if p.val == 0:
            return p

    return None

set(x, y)

Source code in roc/feature_extractors/flood.py
56
57
def set(self, x: int, y: int) -> None:
    self.grid.set_val(x, y, 1)

Flood

Bases: FeatureExtractor[TypedPointCollection]

A component for creating Flood features -- collections of adjacent points that all have the same value

Source code in roc/feature_extractors/flood.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@register_component("flood", "perception")
class Flood(FeatureExtractor[TypedPointCollection]):
    """A component for creating Flood features -- collections of adjacent points
    that all have the same value
    """

    def event_filter(self, e: PerceptionEvent) -> bool:
        return isinstance(e.data, VisionData)

    def get_feature(self, e: PerceptionEvent) -> None:
        assert isinstance(e.data, VisionData)
        data = IntGrid(e.data.glyphs)

        check_map = CheckMap(data.width, data.height)

        def recursive_flood_check(val: int, x: XLoc, y: YLoc, point_list: PointList) -> PointList:
            if x < 0 or y < 0 or x >= data.width or y >= data.height:
                # out of bounds... move on
                return point_list

            if check_map.checked(x, y):
                # we already checked this point... move on
                return point_list

            if data.get_val(x, y) != val:
                # not the right value, for this flood... move on
                return point_list

            check_map.set(x, y)
            point_list.append(Point(x, y, val))

            # explanation:
            # (0, 0) is top left, algorithm is iterating from left to right and
            # top to bottom
            # check left: already done
            # check up *: already done

            # check right
            recursive_flood_check(val, XLoc(x + 1), y, point_list)
            # check left down
            recursive_flood_check(val, XLoc(x - 1), YLoc(y + 1), point_list)
            # check down
            recursive_flood_check(val, x, YLoc(y + 1), point_list)
            # check right down
            recursive_flood_check(val, XLoc(x + 1), YLoc(y + 1), point_list)

            return point_list

        while p := check_map.find_first_unused_point():
            val = data.get_val(p.x, p.y)
            point_list = recursive_flood_check(val, p.x, p.y, [])
            if len(point_list) >= MIN_FLOOD_SIZE:
                point_set = {(p.x, p.y) for p in point_list}
                self.pb_conn.send(
                    FloodFeature(
                        origin_id=self.id,
                        points=point_set,
                        type=val,
                        size=len(point_set),
                    )
                )
            check_map.set(p.x, p.y)

        self.settled()

event_filter(e)

Source code in roc/feature_extractors/flood.py
69
70
def event_filter(self, e: PerceptionEvent) -> bool:
    return isinstance(e.data, VisionData)

get_feature(e)

Source code in roc/feature_extractors/flood.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_feature(self, e: PerceptionEvent) -> None:
    assert isinstance(e.data, VisionData)
    data = IntGrid(e.data.glyphs)

    check_map = CheckMap(data.width, data.height)

    def recursive_flood_check(val: int, x: XLoc, y: YLoc, point_list: PointList) -> PointList:
        if x < 0 or y < 0 or x >= data.width or y >= data.height:
            # out of bounds... move on
            return point_list

        if check_map.checked(x, y):
            # we already checked this point... move on
            return point_list

        if data.get_val(x, y) != val:
            # not the right value, for this flood... move on
            return point_list

        check_map.set(x, y)
        point_list.append(Point(x, y, val))

        # explanation:
        # (0, 0) is top left, algorithm is iterating from left to right and
        # top to bottom
        # check left: already done
        # check up *: already done

        # check right
        recursive_flood_check(val, XLoc(x + 1), y, point_list)
        # check left down
        recursive_flood_check(val, XLoc(x - 1), YLoc(y + 1), point_list)
        # check down
        recursive_flood_check(val, x, YLoc(y + 1), point_list)
        # check right down
        recursive_flood_check(val, XLoc(x + 1), YLoc(y + 1), point_list)

        return point_list

    while p := check_map.find_first_unused_point():
        val = data.get_val(p.x, p.y)
        point_list = recursive_flood_check(val, p.x, p.y, [])
        if len(point_list) >= MIN_FLOOD_SIZE:
            point_set = {(p.x, p.y) for p in point_list}
            self.pb_conn.send(
                FloodFeature(
                    origin_id=self.id,
                    points=point_set,
                    type=val,
                    size=len(point_set),
                )
            )
        check_map.set(p.x, p.y)

    self.settled()

FloodFeature dataclass

Bases: AreaFeature[FloodNode]

A collection of points representing similar values that are all adjacent to each other

Source code in roc/feature_extractors/flood.py
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass(kw_only=True)
class FloodFeature(AreaFeature[FloodNode]):
    """A collection of points representing similar values that are all adjacent to each other"""

    feature_name: str = "Flood"

    def _create_nodes(self) -> FloodNode:
        return FloodNode(type=self.type, size=self.size)

    def _dbfetch_nodes(self) -> FloodNode | None:
        return FloodNode.find_one(
            "src.type = $type AND src.size = $size", params={"type": self.type, "size": self.size}
        )

feature_name = 'Flood' class-attribute instance-attribute

__init__(*, feature_name='Flood')

FloodNode

Bases: FeatureNode

Source code in roc/feature_extractors/flood.py
18
19
20
21
22
23
24
class FloodNode(FeatureNode):
    type: int
    size: int

    @property
    def attr_strs(self) -> list[str]:
        return [str(self.type), str(self.size)]

attr_strs property

size instance-attribute

type instance-attribute