Skip to content

aggregator

aggregator

Per-session aggregator — turns many internal events into one analytics event.

Without this, a single chat (50 inferences, 10 tool calls) would produce ~60 PostHog events. With it, the same chat produces one chat_session_ended event with summary properties — a ~60× reduction that keeps per-DAU event volume in the target zone (~40 events/day).

The aggregator buffers per-session counts in memory, emits on explicit session end, and also emits stale sessions on a background flusher thread (so abandoned sessions don't accumulate forever).

Classes

SessionAggregator

SessionAggregator(client: 'AnalyticsClient', *, idle_timeout_s: float = _IDLE_TIMEOUT_S, flusher_tick_s: float = _FLUSHER_TICK_S)

Buffers per-session counts; emits chat_session_ended on close.

Source code in src/openjarvis/analytics/aggregator.py
def __init__(
    self,
    client: "AnalyticsClient",
    *,
    idle_timeout_s: float = _IDLE_TIMEOUT_S,
    flusher_tick_s: float = _FLUSHER_TICK_S,
) -> None:
    self.client = client
    self.idle_timeout_s = idle_timeout_s
    self._sessions: dict[str, _SessionStats] = {}
    self._lock = threading.Lock()
    self._shutdown = threading.Event()
    self._flusher = threading.Thread(
        target=self._flush_idle_loop,
        args=(flusher_tick_s,),
        daemon=True,
        name="analytics-aggregator-flusher",
    )
    self._flusher.start()
Functions
end_session
end_session(session_id: str) -> None

Emit chat_session_ended for one session and drop the buffer.

Source code in src/openjarvis/analytics/aggregator.py
def end_session(self, session_id: str) -> None:
    """Emit ``chat_session_ended`` for one session and drop the buffer."""
    with self._lock:
        stats = self._sessions.pop(session_id, None)
    if stats is None or stats.inference_count == 0:
        # Nothing meaningful happened — don't emit a no-op event.
        return
    self._emit(stats)
shutdown
shutdown() -> None

Flush every buffered session and stop the flusher thread.

Source code in src/openjarvis/analytics/aggregator.py
def shutdown(self) -> None:
    """Flush every buffered session and stop the flusher thread."""
    self._shutdown.set()
    with self._lock:
        stats_list = list(self._sessions.values())
        self._sessions.clear()
    for stats in stats_list:
        try:
            if stats.inference_count > 0:
                self._emit(stats)
        except Exception as exc:
            logger.debug("Aggregator shutdown flush failed: %s", exc)