Skip to content

sequencer

tick = 0 module-attribute

Frame

Bases: Node

Source code in roc/sequencer.py
19
20
class Frame(Node):
    tick: int = Field(default_factory=get_next_tick)

tick = Field(default_factory=get_next_tick) class-attribute instance-attribute

FrameAttributes

Bases: Edge

Source code in roc/sequencer.py
23
24
25
26
27
class FrameAttributes(Edge):
    allowed_connections: EdgeConnectionsList = [
        ("Frame", "FeatureGroup"),
        ("Frame", "TakeAction"),
    ]

allowed_connections = [('Frame', 'FeatureGroup'), ('Frame', 'TakeAction')] class-attribute instance-attribute

NextFrame

Bases: Edge

Source code in roc/sequencer.py
30
31
class NextFrame(Edge):
    allowed_connections: EdgeConnectionsList = [("Frame", "Frame")]

allowed_connections = [('Frame', 'Frame')] class-attribute instance-attribute

Sequencer

Bases: Component

Source code in roc/sequencer.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@register_component("sequencer", "sequencer", auto=True)
class Sequencer(Component):
    def __init__(self) -> None:
        super().__init__()
        self.obj_res_conn = self.connect_bus(ObjectResolver.bus)
        self.obj_res_conn.listen(self.event_collector)
        self.action_conn = self.connect_bus(Action.bus)
        self.action_conn.listen(self.event_collector)
        self.last_frame: Frame | None = None
        # TODO: listen to intrinsics bus

    def event_filter(self, e: Event[Any]) -> bool:
        return isinstance(e, FeatureGroup) or isinstance(e, TakeAction)

    def event_collector(self, e: Event[Any]) -> None:
        this_frame = Frame()
        if self.last_frame is not None:
            NextFrame.connect(self.last_frame, this_frame)

        self.last_frame = this_frame

action_conn = self.connect_bus(Action.bus) instance-attribute

last_frame = None instance-attribute

obj_res_conn = self.connect_bus(ObjectResolver.bus) instance-attribute

__init__()

Source code in roc/sequencer.py
36
37
38
39
40
41
42
def __init__(self) -> None:
    super().__init__()
    self.obj_res_conn = self.connect_bus(ObjectResolver.bus)
    self.obj_res_conn.listen(self.event_collector)
    self.action_conn = self.connect_bus(Action.bus)
    self.action_conn.listen(self.event_collector)
    self.last_frame: Frame | None = None

event_collector(e)

Source code in roc/sequencer.py
48
49
50
51
52
53
def event_collector(self, e: Event[Any]) -> None:
    this_frame = Frame()
    if self.last_frame is not None:
        NextFrame.connect(self.last_frame, this_frame)

    self.last_frame = this_frame

event_filter(e)

Source code in roc/sequencer.py
45
46
def event_filter(self, e: Event[Any]) -> bool:
    return isinstance(e, FeatureGroup) or isinstance(e, TakeAction)

get_next_tick()

Source code in roc/sequencer.py
14
15
16
def get_next_tick() -> int:
    tick += 1
    return tick