Skip to content

motion

Converts Delta events into Motion events that signify an object moving in a direction

DeltaList = list[DeltaFeature] module-attribute

Motion

Bases: FeatureExtractor[MotionFeature]

Component that consumes Delta events and produces Motion events

Source code in roc/feature_extractors/motion.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@register_component("motion", "perception")
class Motion(FeatureExtractor[MotionFeature]):
    """Component that consumes Delta events and produces Motion events"""

    def __init__(self) -> None:
        super().__init__()
        self.delta_list: DeltaList = []

    def event_filter(self, e: PerceptionEvent) -> bool:
        # only listen to delta:perception events
        if e.src_id.name == "delta" and e.src_id.type == "perception":
            return True
        return False

    def get_feature(self, e: PerceptionEvent) -> None:
        if isinstance(e.data, Settled):
            self.delta_list.clear()
            self.settled()
            return None

        assert isinstance(e.data, DeltaFeature)
        d1 = e.data

        for d2 in self.delta_list:
            if Point.isadjacent(x1=d1.point[0], y1=d1.point[1], x2=d2.point[0], y2=d2.point[1]):
                if d2.old_val == d1.new_val:
                    emit_motion(self, d2, d1)
                if d1.old_val == d2.new_val:
                    emit_motion(self, d1, d2)

        self.delta_list.append(d1)

delta_list = [] instance-attribute

__init__()

Source code in roc/feature_extractors/motion.py
 98
 99
100
def __init__(self) -> None:
    super().__init__()
    self.delta_list: DeltaList = []

event_filter(e)

Source code in roc/feature_extractors/motion.py
102
103
104
105
106
def event_filter(self, e: PerceptionEvent) -> bool:
    # only listen to delta:perception events
    if e.src_id.name == "delta" and e.src_id.type == "perception":
        return True
    return False

get_feature(e)

Source code in roc/feature_extractors/motion.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_feature(self, e: PerceptionEvent) -> None:
    if isinstance(e.data, Settled):
        self.delta_list.clear()
        self.settled()
        return None

    assert isinstance(e.data, DeltaFeature)
    d1 = e.data

    for d2 in self.delta_list:
        if Point.isadjacent(x1=d1.point[0], y1=d1.point[1], x2=d2.point[0], y2=d2.point[1]):
            if d2.old_val == d1.new_val:
                emit_motion(self, d2, d1)
            if d1.old_val == d2.new_val:
                emit_motion(self, d1, d2)

    self.delta_list.append(d1)

MotionFeature dataclass

Bases: Feature[MotionNode]

A vector describing a motion, including the start point, end point, direction and value of the thing moving

Source code in roc/feature_extractors/motion.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass(kw_only=True)
class MotionFeature(Feature[MotionNode]):
    """A vector describing a motion, including the start point, end point,
    direction and value of the thing moving
    """

    feature_name: str = "Motion"
    start_point: tuple[XLoc, YLoc]
    end_point: tuple[XLoc, YLoc]
    type: int
    direction: Direction

    def __str__(self) -> str:
        return f"{self.type} '{chr(self.type)}' {self.direction}: ({self.start_point[0]}, {self.start_point[1]}) -> ({self.end_point[0]}, {self.end_point[1]})"

    def get_points(self) -> set[tuple[XLoc, YLoc]]:
        return {self.end_point}

    def node_hash(self) -> int:
        return hash((self.type, self.direction))

    def _create_nodes(self) -> MotionNode:
        return MotionNode(type=self.type, direction=self.direction)

    def _dbfetch_nodes(self) -> MotionNode | None:
        return MotionNode.find_one(
            "src.type = $type AND src.direction = $direction",
            params={"type": self.type, "direction": self.direction},
            # params_to_str=False,
        )

direction instance-attribute

end_point instance-attribute

feature_name = 'Motion' class-attribute instance-attribute

start_point instance-attribute

type instance-attribute

__init__(*, feature_name='Motion', start_point, end_point, type, direction)

__str__()

Source code in roc/feature_extractors/motion.py
41
42
def __str__(self) -> str:
    return f"{self.type} '{chr(self.type)}' {self.direction}: ({self.start_point[0]}, {self.start_point[1]}) -> ({self.end_point[0]}, {self.end_point[1]})"

get_points()

Source code in roc/feature_extractors/motion.py
44
45
def get_points(self) -> set[tuple[XLoc, YLoc]]:
    return {self.end_point}

node_hash()

Source code in roc/feature_extractors/motion.py
47
48
def node_hash(self) -> int:
    return hash((self.type, self.direction))

MotionNode

Bases: FeatureNode

Source code in roc/feature_extractors/motion.py
20
21
22
23
24
25
26
class MotionNode(FeatureNode):
    type: int
    direction: Direction

    @property
    def attr_strs(self) -> list[str]:
        return [str(self.type), str(self.direction)]

attr_strs property

direction instance-attribute

type instance-attribute

adjacent_direction(d1, d2)

Helper function to convert two positions into a direction such as 'UP' or 'DOWN_LEFT'

Source code in roc/feature_extractors/motion.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def adjacent_direction(d1: DeltaFeature, d2: DeltaFeature) -> Direction:
    """Helper function to convert two positions into a direction such as 'UP' or
    'DOWN_LEFT'
    """
    lr_str = ""
    if d1.point[0] < d2.point[0]:
        lr_str = "RIGHT"
    elif d1.point[0] > d2.point[0]:
        lr_str = "LEFT"

    ud_str = ""
    # XXX: top left is 0,0
    if d1.point[1] > d2.point[1]:
        ud_str = "UP"
    if d1.point[1] < d2.point[1]:
        ud_str = "DOWN"

    join_str = ""
    if len(lr_str) and len(ud_str):
        join_str = "_"

    return Direction(f"{ud_str}{join_str}{lr_str}")

emit_motion(mc, old_delta, new_delta)

Source code in roc/feature_extractors/motion.py
151
152
153
154
155
156
157
158
159
160
def emit_motion(mc: Motion, old_delta: DeltaFeature, new_delta: DeltaFeature) -> None:
    mc.pb_conn.send(
        MotionFeature(
            origin_id=mc.id,
            start_point=old_delta.point,
            end_point=new_delta.point,
            type=new_delta.new_val,
            direction=adjacent_direction(old_delta, new_delta),
        )
    )