Skip to content

events

events

Thread-safe pub/sub event bus for inter-primitive telemetry.

Extends IPW's EventRecorder into a full publish/subscribe system so that any primitive can emit events (e.g. INFERENCE_END) and any other primitive can react without direct coupling.

Classes

EventType

Bases: str, Enum

Supported event categories.

Event dataclass

Event(event_type: EventType, timestamp: float, data: Dict[str, Any] = dict())

A single event published on the bus.

EventBus

EventBus(*, record_history: bool = False)

Thread-safe publish/subscribe event bus.

Subscribers are called synchronously in registration order within the publishing thread. An optional record_history flag retains all published events for later inspection (useful in tests/telemetry).

Source code in src/openjarvis/core/events.py
def __init__(self, *, record_history: bool = False) -> None:
    self._subscribers: Dict[EventType, List[Subscriber]] = {}
    self._lock = threading.Lock()
    self._record_history = record_history
    self._history: List[Event] = []
Attributes
history property
history: List[Event]

Return a copy of all recorded events (empty if recording is off).

Functions
subscribe
subscribe(event_type: EventType, callback: Subscriber) -> None

Register callback to be called whenever event_type is published.

Source code in src/openjarvis/core/events.py
def subscribe(self, event_type: EventType, callback: Subscriber) -> None:
    """Register *callback* to be called whenever *event_type* is published."""
    with self._lock:
        self._subscribers.setdefault(event_type, []).append(callback)
unsubscribe
unsubscribe(event_type: EventType, callback: Subscriber) -> None

Remove callback from listeners for event_type.

Source code in src/openjarvis/core/events.py
def unsubscribe(self, event_type: EventType, callback: Subscriber) -> None:
    """Remove *callback* from listeners for *event_type*."""
    with self._lock:
        listeners = self._subscribers.get(event_type, [])
        try:
            listeners.remove(callback)
        except ValueError:
            pass  # Callback already removed — idempotent
publish
publish(event_type: EventType, data: Optional[Dict[str, Any]] = None) -> Event

Create and dispatch an event to all subscribers.

Returns the published Event instance.

Source code in src/openjarvis/core/events.py
def publish(
    self,
    event_type: EventType,
    data: Optional[Dict[str, Any]] = None,
) -> Event:
    """Create and dispatch an event to all subscribers.

    Returns the published ``Event`` instance.
    """
    event = Event(event_type=event_type, timestamp=time.time(), data=data or {})

    with self._lock:
        if self._record_history:
            self._history.append(event)
        listeners = list(self._subscribers.get(event_type, []))

    for callback in listeners:
        callback(event)

    return event
clear_history
clear_history() -> None

Discard all recorded events.

Source code in src/openjarvis/core/events.py
def clear_history(self) -> None:
    """Discard all recorded events."""
    with self._lock:
        self._history.clear()

Functions

get_event_bus

get_event_bus(*, record_history: bool = False) -> EventBus

Return the module-level EventBus singleton, creating it if needed.

Source code in src/openjarvis/core/events.py
def get_event_bus(*, record_history: bool = False) -> EventBus:
    """Return the module-level ``EventBus`` singleton, creating it if needed."""
    global _bus
    with _bus_lock:
        if _bus is None:
            _bus = EventBus(record_history=record_history)
        return _bus

reset_event_bus

reset_event_bus() -> None

Replace the singleton with a fresh instance (for tests).

Source code in src/openjarvis/core/events.py
def reset_event_bus() -> None:
    """Replace the singleton with a fresh instance (for tests)."""
    global _bus
    with _bus_lock:
        _bus = None