Skip to content

event

This module defines all the communications and eventing interfaces for the system.

EventData = TypeVar('EventData') module-attribute

EventFilter = Callable[[Event[EventData]], bool] module-attribute

EventListener = Callable[[Event[EventData]], None] module-attribute

eventbus_names = set() module-attribute

BusConnection

Bases: Generic[EventData]

A connection between an EventBus and a Component, used to send Events

Parameters:

Name Type Description Default
Generic EventData

The data type that will be sent over this connection

required
Source code in roc/event.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class BusConnection(Generic[EventData]):
    """A connection between an EventBus and a Component, used to send Events

    Args:
        Generic (EventData): The data type that will be sent over this connection
    """

    def __init__(self, bus: EventBus[EventData], component: Component):
        logger.debug(f"{component.name}:{component.type} attaching to bus {bus.name}")
        self.attached_bus = bus
        self.attached_component = component
        self.subject: rx.Subject[Event[EventData]] = self.attached_bus.subject
        self.subscribers: list[Disposable] = []

    def send(self, data: EventData) -> None:
        """Send data over the EventBus. Internally, the data is converted to an Event
        with the relevant data (such as the source Component).

        Args:
            data (EventData): The data type of the data to be sent
        """
        e = Event[EventData](data, self.attached_component.id, self.attached_bus)
        logger.trace(f">>> Sending {e}")
        self.attached_bus.subject.on_next(e)

    def listen(
        self,
        listener: EventListener[EventData],
        *,
        filter: EventFilter[EventData] | None = None,
    ) -> None:
        pipe_args: list[Callable[[Any], Observable[Event[EventData]]]] = [
            # op.filter(lambda e: e.src is not self.attached_component),
            # op.do_action(lambda e: print("before filter", e)),
            op.filter(self.attached_component.event_filter),
        ]
        if filter is not None:
            pipe_args.append(op.filter(filter))

        sub = self.subject.pipe(*pipe_args).subscribe(listener)
        self.subscribers.append(sub)

    def close(self) -> None:
        logger.debug(
            f"Closing connection from component {self.attached_component.id}  -> {self.attached_bus.name} bus"
        )

        for sub in self.subscribers:
            sub.dispose()

attached_bus = bus instance-attribute

attached_component = component instance-attribute

subject = self.attached_bus.subject instance-attribute

subscribers = [] instance-attribute

__init__(bus, component)

Source code in roc/event.py
66
67
68
69
70
71
def __init__(self, bus: EventBus[EventData], component: Component):
    logger.debug(f"{component.name}:{component.type} attaching to bus {bus.name}")
    self.attached_bus = bus
    self.attached_component = component
    self.subject: rx.Subject[Event[EventData]] = self.attached_bus.subject
    self.subscribers: list[Disposable] = []

close()

Source code in roc/event.py
101
102
103
104
105
106
107
def close(self) -> None:
    logger.debug(
        f"Closing connection from component {self.attached_component.id}  -> {self.attached_bus.name} bus"
    )

    for sub in self.subscribers:
        sub.dispose()

listen(listener, *, filter=None)

Source code in roc/event.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def listen(
    self,
    listener: EventListener[EventData],
    *,
    filter: EventFilter[EventData] | None = None,
) -> None:
    pipe_args: list[Callable[[Any], Observable[Event[EventData]]]] = [
        # op.filter(lambda e: e.src is not self.attached_component),
        # op.do_action(lambda e: print("before filter", e)),
        op.filter(self.attached_component.event_filter),
    ]
    if filter is not None:
        pipe_args.append(op.filter(filter))

    sub = self.subject.pipe(*pipe_args).subscribe(listener)
    self.subscribers.append(sub)

send(data)

Send data over the EventBus. Internally, the data is converted to an Event with the relevant data (such as the source Component).

Parameters:

Name Type Description Default
data EventData

The data type of the data to be sent

required
Source code in roc/event.py
73
74
75
76
77
78
79
80
81
82
def send(self, data: EventData) -> None:
    """Send data over the EventBus. Internally, the data is converted to an Event
    with the relevant data (such as the source Component).

    Args:
        data (EventData): The data type of the data to be sent
    """
    e = Event[EventData](data, self.attached_component.id, self.attached_bus)
    logger.trace(f">>> Sending {e}")
    self.attached_bus.subject.on_next(e)

Event

Bases: ABC, Generic[EventData]

An abstract event class for sending messages between Components over an EventBus

Parameters:

Name Type Description Default
ABC ABC

Abstract base class

required
Generic EventData

The data to be carried by the event

required
Source code in roc/event.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Event(ABC, Generic[EventData]):
    """An abstract event class for sending messages between Components over an EventBus

    Args:
        ABC (ABC): Abstract base class
        Generic (EventData): The data to be carried by the event
    """

    def __init__(self, data: EventData, src_id: ComponentId, bus: EventBus[EventData]):
        """The initializer for the Event

        Args:
            data (EventData): The data for this event
            src_id (ComponentId): The name and type of the Component sending the event
            bus (EventBus): The EventBus that the event is being sent over
        """
        self.data = data
        self.src_id = src_id
        self.bus = bus

    def __repr__(self) -> str:
        data_str = pretty_repr(
            self.data,
            # max_depth=4, # Maximum depth of nested data structure
            max_length=5,  # Maximum length of containers before abbreviating
            max_string=60,  # Maximum length of string before truncating
            expand_all=False,  # Expand all containers regardless of available width
            max_width=120,
        )
        if "\n" in data_str:
            data_str = "\n" + data_str
        return f"[EVENT: {self.src_id} >>> {self.bus.name}]: {data_str}"

bus = bus instance-attribute

data = data instance-attribute

src_id = src_id instance-attribute

__init__(data, src_id, bus)

The initializer for the Event

Parameters:

Name Type Description Default
data EventData

The data for this event

required
src_id ComponentId

The name and type of the Component sending the event

required
bus EventBus

The EventBus that the event is being sent over

required
Source code in roc/event.py
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, data: EventData, src_id: ComponentId, bus: EventBus[EventData]):
    """The initializer for the Event

    Args:
        data (EventData): The data for this event
        src_id (ComponentId): The name and type of the Component sending the event
        bus (EventBus): The EventBus that the event is being sent over
    """
    self.data = data
    self.src_id = src_id
    self.bus = bus

__repr__()

Source code in roc/event.py
41
42
43
44
45
46
47
48
49
50
51
52
def __repr__(self) -> str:
    data_str = pretty_repr(
        self.data,
        # max_depth=4, # Maximum depth of nested data structure
        max_length=5,  # Maximum length of containers before abbreviating
        max_string=60,  # Maximum length of string before truncating
        expand_all=False,  # Expand all containers regardless of available width
        max_width=120,
    )
    if "\n" in data_str:
        data_str = "\n" + data_str
    return f"[EVENT: {self.src_id} >>> {self.bus.name}]: {data_str}"

EventBus

Bases: Generic[EventData]

A communication channel for sending events between Components

Parameters:

Name Type Description Default
Generic EventData

The data type that is allowed to be sent over the bus

required
Source code in roc/event.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class EventBus(Generic[EventData]):
    """A communication channel for sending events between Components

    Args:
        Generic (EventData): The data type that is allowed to be sent over the bus
    """

    name: str
    """The name of the bus. Used to ensure uniqueness."""
    subject: rx.Subject[Event[EventData]]
    """The RxPy Subject that the bus uses to communicate."""

    def __init__(self, name: str, cache_depth: int = 0) -> None:
        if name in eventbus_names:
            raise Exception(f"Duplicate EventBus name: {name}")
        self.name = name
        eventbus_names.add(name)
        self.subject = rx.Subject[Event[EventData]]()
        self.cache_depth = cache_depth
        self.cache: deque[Event[EventData]] | None = None

        if cache_depth > 0:
            self.cache = deque(maxlen=cache_depth)
            self.subject.subscribe(lambda e: self.cache.append(e))  # type: ignore

    def connect(self, component: Component) -> BusConnection[EventData]:
        """Creates a connection between an EventBus and a Component for sending Events

        Args:
            component (Component): The Component to connect to the bus

        Returns:
            BusConnection[EventData]: A new connection that can be used to send data
        """
        return BusConnection[EventData](self, component)

    @staticmethod
    def clear_names() -> None:
        """Clears all EventBusses that have been registered, mostly used for testing."""
        eventbus_names.clear()

cache = None instance-attribute

cache_depth = cache_depth instance-attribute

name = name instance-attribute

The name of the bus. Used to ensure uniqueness.

subject = rx.Subject[Event[EventData]]() instance-attribute

The RxPy Subject that the bus uses to communicate.

__init__(name, cache_depth=0)

Source code in roc/event.py
127
128
129
130
131
132
133
134
135
136
137
138
def __init__(self, name: str, cache_depth: int = 0) -> None:
    if name in eventbus_names:
        raise Exception(f"Duplicate EventBus name: {name}")
    self.name = name
    eventbus_names.add(name)
    self.subject = rx.Subject[Event[EventData]]()
    self.cache_depth = cache_depth
    self.cache: deque[Event[EventData]] | None = None

    if cache_depth > 0:
        self.cache = deque(maxlen=cache_depth)
        self.subject.subscribe(lambda e: self.cache.append(e))  # type: ignore

clear_names() staticmethod

Clears all EventBusses that have been registered, mostly used for testing.

Source code in roc/event.py
151
152
153
154
@staticmethod
def clear_names() -> None:
    """Clears all EventBusses that have been registered, mostly used for testing."""
    eventbus_names.clear()

connect(component)

Creates a connection between an EventBus and a Component for sending Events

Parameters:

Name Type Description Default
component Component

The Component to connect to the bus

required

Returns:

Type Description
BusConnection[EventData]

BusConnection[EventData]: A new connection that can be used to send data

Source code in roc/event.py
140
141
142
143
144
145
146
147
148
149
def connect(self, component: Component) -> BusConnection[EventData]:
    """Creates a connection between an EventBus and a Component for sending Events

    Args:
        component (Component): The Component to connect to the bus

    Returns:
        BusConnection[EventData]: A new connection that can be used to send data
    """
    return BusConnection[EventData](self, component)