Skip to content

bridge

bridge

Bridge between the internal event bus and the analytics client.

The internal bus (:mod:openjarvis.core.events) carries dozens of event types — most are too granular or too internal to ship as analytics. The bridge:

  • Subscribes to a focused subset of EventTypes.
  • Aggregates high-frequency events (INFERENCE_END, TOOL_CALL_END) via :class:SessionAggregator so we only ship one event per chat session, not one per inference.
  • Forwards user-meaningful low-frequency events directly (FEEDBACK_RECEIVED, SECURITY_ALERT).
  • Tracks first-uses in-process so tool_first_used fires once per (anon_id, tool) per process. (First-use across processes is not promised — that would require disk state and isn't worth the complexity for v1.)

Classes

EventBridge

EventBridge(bus: 'EventBus', client: 'AnalyticsClient', aggregator: SessionAggregator | None = None)

Subscribes to the internal bus and routes to the analytics client.

Source code in src/openjarvis/analytics/bridge.py
def __init__(
    self,
    bus: "EventBus",
    client: "AnalyticsClient",
    aggregator: SessionAggregator | None = None,
) -> None:
    self.bus = bus
    self.client = client
    self.aggregator = aggregator or SessionAggregator(client)
    self._first_tool_uses: set[str] = set()
    self._first_chat_emitted = False
    self._lock = threading.Lock()
    self._subscribed = False
Functions
start
start() -> None

Attach subscribers to the bus. Idempotent.

Source code in src/openjarvis/analytics/bridge.py
def start(self) -> None:
    """Attach subscribers to the bus. Idempotent."""
    if self._subscribed:
        return
    self.bus.subscribe(EventType.INFERENCE_END, self._on_inference_end)
    self.bus.subscribe(EventType.TOOL_CALL_END, self._on_tool_end)
    self.bus.subscribe(EventType.SESSION_END, self._on_session_end)
    self.bus.subscribe(EventType.AGENT_TURN_END, self._on_agent_turn_end)
    self.bus.subscribe(EventType.FEEDBACK_RECEIVED, self._on_feedback)
    self.bus.subscribe(EventType.SECURITY_ALERT, self._on_security_alert)
    self._subscribed = True
    logger.debug("Analytics bridge subscribed to internal event bus")
stop
stop() -> None

Detach subscribers and flush buffered sessions.

Source code in src/openjarvis/analytics/bridge.py
def stop(self) -> None:
    """Detach subscribers and flush buffered sessions."""
    if self._subscribed:
        try:
            self.bus.unsubscribe(EventType.INFERENCE_END, self._on_inference_end)
            self.bus.unsubscribe(EventType.TOOL_CALL_END, self._on_tool_end)
            self.bus.unsubscribe(EventType.SESSION_END, self._on_session_end)
            self.bus.unsubscribe(EventType.AGENT_TURN_END, self._on_agent_turn_end)
            self.bus.unsubscribe(EventType.FEEDBACK_RECEIVED, self._on_feedback)
            self.bus.unsubscribe(EventType.SECURITY_ALERT, self._on_security_alert)
        except Exception as exc:
            logger.debug("Analytics bridge unsubscribe error: %s", exc)
        self._subscribed = False
    self.aggregator.shutdown()

Functions