Skip to content

Index

analytics

External anonymous usage analytics.

Sends anonymized events to PostHog so the OpenJarvis team can measure setup success, retention, feature usage, and churn — without ever collecting chat content, prompts, file paths, emails, IPs, or hardware identifiers.

Distinct from :mod:openjarvis.telemetry, which stores local FLOPs and energy metrics in a SQLite DB and never leaves the machine.

Disable: set [analytics] enabled = false in ~/.openjarvis/config.toml.

Classes

SessionAggregator

SessionAggregator(client: 'AnalyticsClient', *, idle_timeout_s: float = _IDLE_TIMEOUT_S, flusher_tick_s: float = _FLUSHER_TICK_S)

Buffers per-session counts; emits chat_session_ended on close.

Source code in src/openjarvis/analytics/aggregator.py
def __init__(
    self,
    client: "AnalyticsClient",
    *,
    idle_timeout_s: float = _IDLE_TIMEOUT_S,
    flusher_tick_s: float = _FLUSHER_TICK_S,
) -> None:
    self.client = client
    self.idle_timeout_s = idle_timeout_s
    self._sessions: dict[str, _SessionStats] = {}
    self._lock = threading.Lock()
    self._shutdown = threading.Event()
    self._flusher = threading.Thread(
        target=self._flush_idle_loop,
        args=(flusher_tick_s,),
        daemon=True,
        name="analytics-aggregator-flusher",
    )
    self._flusher.start()
Functions
end_session
end_session(session_id: str) -> None

Emit chat_session_ended for one session and drop the buffer.

Source code in src/openjarvis/analytics/aggregator.py
def end_session(self, session_id: str) -> None:
    """Emit ``chat_session_ended`` for one session and drop the buffer."""
    with self._lock:
        stats = self._sessions.pop(session_id, None)
    if stats is None or stats.inference_count == 0:
        # Nothing meaningful happened — don't emit a no-op event.
        return
    self._emit(stats)
shutdown
shutdown() -> None

Flush every buffered session and stop the flusher thread.

Source code in src/openjarvis/analytics/aggregator.py
def shutdown(self) -> None:
    """Flush every buffered session and stop the flusher thread."""
    self._shutdown.set()
    with self._lock:
        stats_list = list(self._sessions.values())
        self._sessions.clear()
    for stats in stats_list:
        try:
            if stats.inference_count > 0:
                self._emit(stats)
        except Exception as exc:
            logger.debug("Aggregator shutdown flush failed: %s", exc)

EventBridge

EventBridge(bus: 'EventBus', client: 'AnalyticsClient', aggregator: SessionAggregator | None = None)

Subscribes to the internal bus and routes to the analytics client.

Source code in src/openjarvis/analytics/bridge.py
def __init__(
    self,
    bus: "EventBus",
    client: "AnalyticsClient",
    aggregator: SessionAggregator | None = None,
) -> None:
    self.bus = bus
    self.client = client
    self.aggregator = aggregator or SessionAggregator(client)
    self._first_tool_uses: set[str] = set()
    self._first_chat_emitted = False
    self._lock = threading.Lock()
    self._subscribed = False
Functions
start
start() -> None

Attach subscribers to the bus. Idempotent.

Source code in src/openjarvis/analytics/bridge.py
def start(self) -> None:
    """Attach subscribers to the bus. Idempotent."""
    if self._subscribed:
        return
    self.bus.subscribe(EventType.INFERENCE_END, self._on_inference_end)
    self.bus.subscribe(EventType.TOOL_CALL_END, self._on_tool_end)
    self.bus.subscribe(EventType.SESSION_END, self._on_session_end)
    self.bus.subscribe(EventType.AGENT_TURN_END, self._on_agent_turn_end)
    self.bus.subscribe(EventType.FEEDBACK_RECEIVED, self._on_feedback)
    self.bus.subscribe(EventType.SECURITY_ALERT, self._on_security_alert)
    self._subscribed = True
    logger.debug("Analytics bridge subscribed to internal event bus")
stop
stop() -> None

Detach subscribers and flush buffered sessions.

Source code in src/openjarvis/analytics/bridge.py
def stop(self) -> None:
    """Detach subscribers and flush buffered sessions."""
    if self._subscribed:
        try:
            self.bus.unsubscribe(EventType.INFERENCE_END, self._on_inference_end)
            self.bus.unsubscribe(EventType.TOOL_CALL_END, self._on_tool_end)
            self.bus.unsubscribe(EventType.SESSION_END, self._on_session_end)
            self.bus.unsubscribe(EventType.AGENT_TURN_END, self._on_agent_turn_end)
            self.bus.unsubscribe(EventType.FEEDBACK_RECEIVED, self._on_feedback)
            self.bus.unsubscribe(EventType.SECURITY_ALERT, self._on_security_alert)
        except Exception as exc:
            logger.debug("Analytics bridge unsubscribe error: %s", exc)
        self._subscribed = False
    self.aggregator.shutdown()

AnalyticsClient

AnalyticsClient(config: AnalyticsConfig, anon_id: str | None = None)

Send anonymized usage events to PostHog.

Construct once at server / CLI startup, share for the process lifetime, call :meth:shutdown on exit to flush pending events.

Source code in src/openjarvis/analytics/client.py
def __init__(self, config: AnalyticsConfig, anon_id: str | None = None) -> None:
    self.config = config
    self.anon_id = anon_id or get_or_create_anon_id(config.anon_id_path)
    self._lock = threading.Lock()
    self._posthog: Any = None
    self._enabled = is_analytics_enabled(config)
    if self._enabled:
        self._init_sdk()
Functions
capture
capture(event_name: str, properties: dict[str, Any] | None = None) -> None

Send one event. Unknown events are silently dropped.

Runs through redaction → event-spec validation → SDK capture. Failures at any stage are swallowed; analytics is best-effort.

Source code in src/openjarvis/analytics/client.py
def capture(
    self,
    event_name: str,
    properties: dict[str, Any] | None = None,
) -> None:
    """Send one event. Unknown events are silently dropped.

    Runs through redaction → event-spec validation → SDK capture.
    Failures at any stage are swallowed; analytics is best-effort.
    """
    if not self.enabled:
        return
    try:
        raw = properties or {}
        cleaned = redact(raw)
        validated = validate_event(event_name, cleaned)
        if validated is None:
            logger.debug("Dropped unknown analytics event: %s", event_name)
            return
        self._posthog.capture(
            distinct_id=self.anon_id,
            event=event_name,
            properties=validated,
        )
    except Exception as exc:
        logger.debug("Analytics capture failed for %s: %s", event_name, exc)
flush
flush() -> None

Force-flush queued events. Safe to call repeatedly.

Source code in src/openjarvis/analytics/client.py
def flush(self) -> None:
    """Force-flush queued events. Safe to call repeatedly."""
    if self._posthog is None:
        return
    try:
        self._posthog.flush()
    except Exception:
        pass
shutdown
shutdown() -> None

Flush and close the SDK. Call once on process exit.

Source code in src/openjarvis/analytics/client.py
def shutdown(self) -> None:
    """Flush and close the SDK. Call once on process exit."""
    with self._lock:
        if self._posthog is None:
            return
        try:
            self._posthog.flush()
            self._posthog.shutdown()
        except Exception:
            pass
        self._posthog = None
        self._enabled = False

Functions

get_or_create_anon_id

get_or_create_anon_id(path: Path | str) -> str

Return the persisted anon ID, generating one on first call.

Idempotent across processes — if the file already exists with a non-empty value, return it; otherwise generate a fresh UUID v4 and write atomically (rename-after-write so a crashed write leaves no half-file).

Source code in src/openjarvis/analytics/identity.py
def get_or_create_anon_id(path: Path | str) -> str:
    """Return the persisted anon ID, generating one on first call.

    Idempotent across processes — if the file already exists with a
    non-empty value, return it; otherwise generate a fresh UUID v4 and
    write atomically (rename-after-write so a crashed write leaves no
    half-file).
    """
    p = Path(path)
    if p.exists():
        existing = p.read_text(encoding="utf-8").strip()
        if existing:
            return existing
    new_id = str(uuid.uuid4())
    p.parent.mkdir(parents=True, exist_ok=True)
    tmp = p.with_suffix(p.suffix + ".tmp")
    tmp.write_text(new_id + "\n", encoding="utf-8")
    tmp.replace(p)
    return new_id

is_analytics_enabled

is_analytics_enabled(cfg: AnalyticsConfig) -> bool

Return True if analytics is enabled.

Disabled in three cases (any one is sufficient):

  1. Running under pytest. The PostHog SDK registers an atexit hook that synchronously joins its consumer thread; if the host is unreachable (CI runners can't reach the analytics endpoint), each queued batch retries for timeout * max_retries seconds and the interpreter never exits. Detect pytest via PYTEST_CURRENT_TEST (set per test) and "pytest" in sys.modules (covers the collection phase before the first test runs).
  2. An opt-out env var is set: DO_NOT_TRACK=1 (W3C convention) or OPENJARVIS_NO_ANALYTICS=1 (project-specific). Both take precedence over the config so users can opt out without editing ~/.openjarvis/config.toml.
  3. The [analytics] enabled = false config-file setting.
Source code in src/openjarvis/analytics/identity.py
def is_analytics_enabled(cfg: AnalyticsConfig) -> bool:
    """Return True if analytics is enabled.

    Disabled in three cases (any one is sufficient):

    1. Running under pytest. The PostHog SDK registers an ``atexit``
       hook that synchronously joins its consumer thread; if the host
       is unreachable (CI runners can't reach the analytics endpoint),
       each queued batch retries for ``timeout * max_retries`` seconds
       and the interpreter never exits. Detect pytest via
       ``PYTEST_CURRENT_TEST`` (set per test) and ``"pytest" in
       sys.modules`` (covers the collection phase before the first
       test runs).
    2. An opt-out env var is set: ``DO_NOT_TRACK=1`` (W3C convention)
       or ``OPENJARVIS_NO_ANALYTICS=1`` (project-specific). Both take
       precedence over the config so users can opt out without
       editing ``~/.openjarvis/config.toml``.
    3. The ``[analytics] enabled = false`` config-file setting.
    """
    if os.environ.get("PYTEST_CURRENT_TEST") or "pytest" in sys.modules:
        return False
    if _env_opt_out():
        return False
    return cfg.enabled

reset_anon_id

reset_anon_id(path: Path | str) -> str

Delete the persisted ID and generate a fresh one (privacy reset).

Source code in src/openjarvis/analytics/identity.py
def reset_anon_id(path: Path | str) -> str:
    """Delete the persisted ID and generate a fresh one (privacy reset)."""
    p = Path(path)
    if p.exists():
        p.unlink()
    return get_or_create_anon_id(p)

hash_id

hash_id(s: str) -> str

Return a 16-char sha256 prefix of s.

Used for model / tool / connector names that aren't on the public allowlist — we still want to see "uses-a-custom-model-X" cohorting without ever learning which model.

Source code in src/openjarvis/analytics/redaction.py
def hash_id(s: str) -> str:
    """Return a 16-char sha256 prefix of ``s``.

    Used for model / tool / connector names that aren't on the public
    allowlist — we still want to see "uses-a-custom-model-X" cohorting
    without ever learning which model.
    """
    if not s:
        return ""
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:16]

redact

redact(properties: dict[str, Any]) -> dict[str, Any]

Return a copy of properties with PII-bearing string values dropped.

Non-string values pass through unchanged. Strings exceeding MAX_STR_LEN are dropped. Strings matching any PII pattern are dropped. The event-spec validator (in :mod:events) runs after this and provides a second layer of structural enforcement.

Source code in src/openjarvis/analytics/redaction.py
def redact(properties: dict[str, Any]) -> dict[str, Any]:
    """Return a copy of ``properties`` with PII-bearing string values dropped.

    Non-string values pass through unchanged. Strings exceeding
    ``MAX_STR_LEN`` are dropped. Strings matching any PII pattern are
    dropped. The event-spec validator (in :mod:`events`) runs after
    this and provides a second layer of structural enforcement.
    """
    out: dict[str, Any] = {}
    for key, value in properties.items():
        if isinstance(value, str):
            if not value:
                # empty string is uninformative — drop
                continue
            if len(value) > MAX_STR_LEN:
                continue
            if looks_like_pii(value):
                continue
        elif isinstance(value, (list, dict, set, tuple)):
            # Composite values are never sent — keeps the surface tiny.
            continue
        out[key] = value
    return out