Skip to content

object

ObjectId = NewType('ObjectId', int) module-attribute

CandidateObjects

Source code in roc/object.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class CandidateObjects:
    def __init__(self, feature_nodes: Collection[FeatureNode]) -> None:
        # TODO: this currently only uses features, not context, for resolution
        # the other objects in the current context should influence resolution
        distance_idx: dict[NodeId, float] = defaultdict(float)

        # TODO: getting all objects for the set of features is going to be a
        # huge explosion of objects... need to come back to this an make a
        # smarter selection algorithm
        feature_groups = [
            fg for n in feature_nodes for fg in n.predecessors.select(labels={"FeatureGroup"})
        ]
        objs = [obj for fg in feature_groups for obj in fg.predecessors.select(labels={"Object"})]
        for obj in objs:
            assert isinstance(obj, Object)
            distance_idx[obj.id] += Object.distance(obj, feature_nodes)

        self.distance_idx = distance_idx
        self.order: list[NodeId] = sorted(self.distance_idx, key=lambda k: self.distance_idx[k])

    def __getitem__(self, idx: int) -> tuple[Object, float]:
        n = self.order[idx]
        return (Object.get(n), self.distance_idx[n])

    def __len__(self) -> int:
        return len(self.order)

distance_idx = distance_idx instance-attribute

order = sorted(self.distance_idx, key=lambda k: self.distance_idx[k]) instance-attribute

__getitem__(idx)

Source code in roc/object.py
112
113
114
def __getitem__(self, idx: int) -> tuple[Object, float]:
    n = self.order[idx]
    return (Object.get(n), self.distance_idx[n])

__init__(feature_nodes)

Source code in roc/object.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(self, feature_nodes: Collection[FeatureNode]) -> None:
    # TODO: this currently only uses features, not context, for resolution
    # the other objects in the current context should influence resolution
    distance_idx: dict[NodeId, float] = defaultdict(float)

    # TODO: getting all objects for the set of features is going to be a
    # huge explosion of objects... need to come back to this an make a
    # smarter selection algorithm
    feature_groups = [
        fg for n in feature_nodes for fg in n.predecessors.select(labels={"FeatureGroup"})
    ]
    objs = [obj for fg in feature_groups for obj in fg.predecessors.select(labels={"Object"})]
    for obj in objs:
        assert isinstance(obj, Object)
        distance_idx[obj.id] += Object.distance(obj, feature_nodes)

    self.distance_idx = distance_idx
    self.order: list[NodeId] = sorted(self.distance_idx, key=lambda k: self.distance_idx[k])

__len__()

Source code in roc/object.py
116
117
def __len__(self) -> int:
    return len(self.order)

FeatureGroup

Bases: Node

Source code in roc/object.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class FeatureGroup(Node):
    @staticmethod
    def with_features(features: Collection[PerceptionFeature[Any]]) -> FeatureGroup:
        feature_nodes: set[FeatureNode] = {f.to_nodes() for f in features}

        return FeatureGroup.from_nodes(feature_nodes)

    @staticmethod
    def from_nodes(feature_nodes: Collection[FeatureNode]) -> FeatureGroup:
        fg = FeatureGroup()
        for f in feature_nodes:
            Detail.connect(fg, f)

        return fg

    @property
    def feature_nodes(self) -> list[FeatureNode]:
        return [cast(FeatureNode, e.dst) for e in self.src_edges if e.type == "Detail"]

feature_nodes property

from_nodes(feature_nodes) staticmethod

Source code in roc/object.py
79
80
81
82
83
84
85
@staticmethod
def from_nodes(feature_nodes: Collection[FeatureNode]) -> FeatureGroup:
    fg = FeatureGroup()
    for f in feature_nodes:
        Detail.connect(fg, f)

    return fg

with_features(features) staticmethod

Source code in roc/object.py
73
74
75
76
77
@staticmethod
def with_features(features: Collection[PerceptionFeature[Any]]) -> FeatureGroup:
    feature_nodes: set[FeatureNode] = {f.to_nodes() for f in features}

    return FeatureGroup.from_nodes(feature_nodes)

Features

Bases: Edge

Source code in roc/object.py
21
22
class Features(Edge):
    allowed_connections: EdgeConnectionsList = [("Object", "FeatureGroup")]

allowed_connections = [('Object', 'FeatureGroup')] class-attribute instance-attribute

Object

Bases: Node

Source code in roc/object.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class Object(Node):
    uuid: ObjectId = Field(default_factory=lambda: ObjectId(uuid4().int))
    annotations: list[str] = Field(default_factory=list)
    resolve_count: int = Field(default=0)

    @property
    def features(self) -> list[FeatureNode]:
        feature_groups = [e.dst for e in self.src_edges if e.type == "Features"]
        feature_nodes: list[FeatureNode] = []
        for fg in feature_groups:
            assert isinstance(fg, FeatureGroup)
            feature_nodes += fg.feature_nodes

        return feature_nodes

    def __str__(self) -> str:
        fhh = FlexiHumanHash(
            "{{adj}}-{{noun}}-named-{{firstname|lower}}-{{lastname|lower}}-{{hex(6)}}"
        )
        h = fhh.hash(self.uuid)
        ret = f"Object({h})"
        for f in self.features:
            ret += f"\n\t{f}"

        return ret

    @staticmethod
    def with_features(fg: FeatureGroup) -> Object:
        o = Object()
        Features.connect(o, fg)

        return o

    @staticmethod
    def distance(obj: Object, features: Collection[FeatureNode]) -> float:
        assert isinstance(obj, Object)
        # TODO: allowed_attrs is physical attributes, not really great but
        # NetHack doesn't give us much feature-space to work with. in the future
        # we may want to come back and use motion or other features for object recognition
        allowed_attrs = {"SingleNode", "ColorNode", "ShapeNode"}  # TODO: line? flood?
        features_strs: set[str] = {str(f) for f in features if f.labels & allowed_attrs}
        obj_features: set[str] = {
            str(f) for f in obj.features if isinstance(f, FeatureNode) and f.labels & allowed_attrs
        }
        return float(len(features_strs ^ obj_features))

annotations = Field(default_factory=list) class-attribute instance-attribute

features property

resolve_count = Field(default=0) class-attribute instance-attribute

uuid = Field(default_factory=lambda: ObjectId(uuid4().int)) class-attribute instance-attribute

__str__()

Source code in roc/object.py
40
41
42
43
44
45
46
47
48
49
def __str__(self) -> str:
    fhh = FlexiHumanHash(
        "{{adj}}-{{noun}}-named-{{firstname|lower}}-{{lastname|lower}}-{{hex(6)}}"
    )
    h = fhh.hash(self.uuid)
    ret = f"Object({h})"
    for f in self.features:
        ret += f"\n\t{f}"

    return ret

distance(obj, features) staticmethod

Source code in roc/object.py
58
59
60
61
62
63
64
65
66
67
68
69
@staticmethod
def distance(obj: Object, features: Collection[FeatureNode]) -> float:
    assert isinstance(obj, Object)
    # TODO: allowed_attrs is physical attributes, not really great but
    # NetHack doesn't give us much feature-space to work with. in the future
    # we may want to come back and use motion or other features for object recognition
    allowed_attrs = {"SingleNode", "ColorNode", "ShapeNode"}  # TODO: line? flood?
    features_strs: set[str] = {str(f) for f in features if f.labels & allowed_attrs}
    obj_features: set[str] = {
        str(f) for f in obj.features if isinstance(f, FeatureNode) and f.labels & allowed_attrs
    }
    return float(len(features_strs ^ obj_features))

with_features(fg) staticmethod

Source code in roc/object.py
51
52
53
54
55
56
@staticmethod
def with_features(fg: FeatureGroup) -> Object:
    o = Object()
    Features.connect(o, fg)

    return o

ObjectResolver

Bases: Component

Source code in roc/object.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@register_component("resolver", "object", auto=True)
class ObjectResolver(Component):
    bus = EventBus[Object]("object")

    def __init__(self) -> None:
        super().__init__()
        self.att_conn = self.connect_bus(Attention.bus)
        self.att_conn.listen(self.do_object_resolution)
        self.obj_res_conn = self.connect_bus(ObjectResolver.bus)

    def event_filter(self, e: AttentionEvent) -> bool:
        return e.src_id.name == "vision" and e.src_id.type == "attention"

    def do_object_resolution(self, e: AttentionEvent) -> None:
        # TODO: instead of just taking the first focus_point (highest saliency
        # strength) we probably want to adjust the strength for known objects /
        # novel objects
        focus_point = e.data.focus_points.iloc[0]
        x = XLoc(int(focus_point["x"]))
        y = YLoc(int(focus_point["y"]))
        features = e.data.saliency_map.get_val(x, y)
        fg = FeatureGroup.with_features(features)
        # Argument 1 to "with_features" of "FeatureGroup" has incompatible type "list[Feature[Any]]"; expected "FeatureGroup"
        objs = CandidateObjects(fg.feature_nodes)

        o: Object | None = None
        if len(objs) > 0:
            o, dist = objs[0]
            o.resolve_count += 1

        # TODO: "> 1" as a cutoff for matching is pretty arbitrary
        # should it be a % of features?
        # or the cutoff for matching be determined by how well the prediction is works?
        if o is None or dist > 1:
            o = Object.with_features(fg)

        self.obj_res_conn.send(o)

att_conn = self.connect_bus(Attention.bus) instance-attribute

bus = EventBus[Object]('object') class-attribute instance-attribute

obj_res_conn = self.connect_bus(ObjectResolver.bus) instance-attribute

__init__()

Source code in roc/object.py
124
125
126
127
128
def __init__(self) -> None:
    super().__init__()
    self.att_conn = self.connect_bus(Attention.bus)
    self.att_conn.listen(self.do_object_resolution)
    self.obj_res_conn = self.connect_bus(ObjectResolver.bus)

do_object_resolution(e)

Source code in roc/object.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def do_object_resolution(self, e: AttentionEvent) -> None:
    # TODO: instead of just taking the first focus_point (highest saliency
    # strength) we probably want to adjust the strength for known objects /
    # novel objects
    focus_point = e.data.focus_points.iloc[0]
    x = XLoc(int(focus_point["x"]))
    y = YLoc(int(focus_point["y"]))
    features = e.data.saliency_map.get_val(x, y)
    fg = FeatureGroup.with_features(features)
    # Argument 1 to "with_features" of "FeatureGroup" has incompatible type "list[Feature[Any]]"; expected "FeatureGroup"
    objs = CandidateObjects(fg.feature_nodes)

    o: Object | None = None
    if len(objs) > 0:
        o, dist = objs[0]
        o.resolve_count += 1

    # TODO: "> 1" as a cutoff for matching is pretty arbitrary
    # should it be a % of features?
    # or the cutoff for matching be determined by how well the prediction is works?
    if o is None or dist > 1:
        o = Object.with_features(fg)

    self.obj_res_conn.send(o)

event_filter(e)

Source code in roc/object.py
130
131
def event_filter(self, e: AttentionEvent) -> bool:
    return e.src_id.name == "vision" and e.src_id.type == "attention"