Thread-safe pub/sub event bus for inter-primitive telemetry.
Extends IPW's EventRecorder into a full publish/subscribe system so that
any primitive can emit events (e.g. INFERENCE_END) and any other primitive can
react without direct coupling.
Classes
EventType
Bases: str, Enum
Supported event categories.
Event
dataclass
Event(event_type: EventType, timestamp: float, data: Dict[str, Any] = dict())
A single event published on the bus.
EventBus
EventBus(*, record_history: bool = False)
Thread-safe publish/subscribe event bus.
Subscribers are called synchronously in registration order within the
publishing thread. An optional record_history flag retains all
published events for later inspection (useful in tests/telemetry).
Source code in src/openjarvis/core/events.py
| def __init__(self, *, record_history: bool = False) -> None:
self._subscribers: Dict[EventType, List[Subscriber]] = {}
self._lock = threading.Lock()
self._record_history = record_history
self._history: List[Event] = []
|
Attributes
history
property
Return a copy of all recorded events (empty if recording is off).
Functions
subscribe
subscribe(event_type: EventType, callback: Subscriber) -> None
Register callback to be called whenever event_type is published.
Source code in src/openjarvis/core/events.py
| def subscribe(self, event_type: EventType, callback: Subscriber) -> None:
"""Register *callback* to be called whenever *event_type* is published."""
with self._lock:
self._subscribers.setdefault(event_type, []).append(callback)
|
unsubscribe
unsubscribe(event_type: EventType, callback: Subscriber) -> None
Remove callback from listeners for event_type.
Source code in src/openjarvis/core/events.py
| def unsubscribe(self, event_type: EventType, callback: Subscriber) -> None:
"""Remove *callback* from listeners for *event_type*."""
with self._lock:
listeners = self._subscribers.get(event_type, [])
try:
listeners.remove(callback)
except ValueError:
pass # Callback already removed — idempotent
|
publish
publish(event_type: EventType, data: Optional[Dict[str, Any]] = None) -> Event
Create and dispatch an event to all subscribers.
Returns the published Event instance.
Source code in src/openjarvis/core/events.py
| def publish(
self,
event_type: EventType,
data: Optional[Dict[str, Any]] = None,
) -> Event:
"""Create and dispatch an event to all subscribers.
Returns the published ``Event`` instance.
"""
event = Event(event_type=event_type, timestamp=time.time(), data=data or {})
with self._lock:
if self._record_history:
self._history.append(event)
listeners = list(self._subscribers.get(event_type, []))
for callback in listeners:
callback(event)
return event
|
clear_history
Discard all recorded events.
Source code in src/openjarvis/core/events.py
| def clear_history(self) -> None:
"""Discard all recorded events."""
with self._lock:
self._history.clear()
|
Functions
get_event_bus
get_event_bus(*, record_history: bool = False) -> EventBus
Return the module-level EventBus singleton, creating it if needed.
Source code in src/openjarvis/core/events.py
| def get_event_bus(*, record_history: bool = False) -> EventBus:
"""Return the module-level ``EventBus`` singleton, creating it if needed."""
global _bus
with _bus_lock:
if _bus is None:
_bus = EventBus(record_history=record_history)
return _bus
|
reset_event_bus
reset_event_bus() -> None
Replace the singleton with a fresh instance (for tests).
Source code in src/openjarvis/core/events.py
| def reset_event_bus() -> None:
"""Replace the singleton with a fresh instance (for tests)."""
global _bus
with _bus_lock:
_bus = None
|