External anonymous usage analytics.
Sends anonymized events to PostHog so the OpenJarvis team can measure
setup success, retention, feature usage, and churn — without ever
collecting chat content, prompts, file paths, emails, IPs, or hardware
identifiers.
Distinct from :mod:openjarvis.telemetry, which stores local FLOPs and
energy metrics in a SQLite DB and never leaves the machine.
Disable: set [analytics] enabled = false in ~/.openjarvis/config.toml.
Classes
SessionAggregator
SessionAggregator(client: 'AnalyticsClient', *, idle_timeout_s: float = _IDLE_TIMEOUT_S, flusher_tick_s: float = _FLUSHER_TICK_S)
Buffers per-session counts; emits chat_session_ended on close.
Source code in src/openjarvis/analytics/aggregator.py
| def __init__(
self,
client: "AnalyticsClient",
*,
idle_timeout_s: float = _IDLE_TIMEOUT_S,
flusher_tick_s: float = _FLUSHER_TICK_S,
) -> None:
self.client = client
self.idle_timeout_s = idle_timeout_s
self._sessions: dict[str, _SessionStats] = {}
self._lock = threading.Lock()
self._shutdown = threading.Event()
self._flusher = threading.Thread(
target=self._flush_idle_loop,
args=(flusher_tick_s,),
daemon=True,
name="analytics-aggregator-flusher",
)
self._flusher.start()
|
Functions
end_session
end_session(session_id: str) -> None
Emit chat_session_ended for one session and drop the buffer.
Source code in src/openjarvis/analytics/aggregator.py
| def end_session(self, session_id: str) -> None:
"""Emit ``chat_session_ended`` for one session and drop the buffer."""
with self._lock:
stats = self._sessions.pop(session_id, None)
if stats is None or stats.inference_count == 0:
# Nothing meaningful happened — don't emit a no-op event.
return
self._emit(stats)
|
shutdown
Flush every buffered session and stop the flusher thread.
Source code in src/openjarvis/analytics/aggregator.py
| def shutdown(self) -> None:
"""Flush every buffered session and stop the flusher thread."""
self._shutdown.set()
with self._lock:
stats_list = list(self._sessions.values())
self._sessions.clear()
for stats in stats_list:
try:
if stats.inference_count > 0:
self._emit(stats)
except Exception as exc:
logger.debug("Aggregator shutdown flush failed: %s", exc)
|
EventBridge
EventBridge(bus: 'EventBus', client: 'AnalyticsClient', aggregator: SessionAggregator | None = None)
Subscribes to the internal bus and routes to the analytics client.
Source code in src/openjarvis/analytics/bridge.py
| def __init__(
self,
bus: "EventBus",
client: "AnalyticsClient",
aggregator: SessionAggregator | None = None,
) -> None:
self.bus = bus
self.client = client
self.aggregator = aggregator or SessionAggregator(client)
self._first_tool_uses: set[str] = set()
self._first_chat_emitted = False
self._lock = threading.Lock()
self._subscribed = False
|
Functions
start
Attach subscribers to the bus. Idempotent.
Source code in src/openjarvis/analytics/bridge.py
| def start(self) -> None:
"""Attach subscribers to the bus. Idempotent."""
if self._subscribed:
return
self.bus.subscribe(EventType.INFERENCE_END, self._on_inference_end)
self.bus.subscribe(EventType.TOOL_CALL_END, self._on_tool_end)
self.bus.subscribe(EventType.SESSION_END, self._on_session_end)
self.bus.subscribe(EventType.AGENT_TURN_END, self._on_agent_turn_end)
self.bus.subscribe(EventType.FEEDBACK_RECEIVED, self._on_feedback)
self.bus.subscribe(EventType.SECURITY_ALERT, self._on_security_alert)
self._subscribed = True
logger.debug("Analytics bridge subscribed to internal event bus")
|
stop
Detach subscribers and flush buffered sessions.
Source code in src/openjarvis/analytics/bridge.py
| def stop(self) -> None:
"""Detach subscribers and flush buffered sessions."""
if self._subscribed:
try:
self.bus.unsubscribe(EventType.INFERENCE_END, self._on_inference_end)
self.bus.unsubscribe(EventType.TOOL_CALL_END, self._on_tool_end)
self.bus.unsubscribe(EventType.SESSION_END, self._on_session_end)
self.bus.unsubscribe(EventType.AGENT_TURN_END, self._on_agent_turn_end)
self.bus.unsubscribe(EventType.FEEDBACK_RECEIVED, self._on_feedback)
self.bus.unsubscribe(EventType.SECURITY_ALERT, self._on_security_alert)
except Exception as exc:
logger.debug("Analytics bridge unsubscribe error: %s", exc)
self._subscribed = False
self.aggregator.shutdown()
|
AnalyticsClient
Send anonymized usage events to PostHog.
Construct once at server / CLI startup, share for the process
lifetime, call :meth:shutdown on exit to flush pending events.
Source code in src/openjarvis/analytics/client.py
| def __init__(self, config: AnalyticsConfig, anon_id: str | None = None) -> None:
self.config = config
self.anon_id = anon_id or get_or_create_anon_id(config.anon_id_path)
self._lock = threading.Lock()
self._posthog: Any = None
self._enabled = is_analytics_enabled(config)
if self._enabled:
self._init_sdk()
|
Functions
capture
capture(event_name: str, properties: dict[str, Any] | None = None) -> None
Send one event. Unknown events are silently dropped.
Runs through redaction → event-spec validation → SDK capture.
Failures at any stage are swallowed; analytics is best-effort.
Source code in src/openjarvis/analytics/client.py
| def capture(
self,
event_name: str,
properties: dict[str, Any] | None = None,
) -> None:
"""Send one event. Unknown events are silently dropped.
Runs through redaction → event-spec validation → SDK capture.
Failures at any stage are swallowed; analytics is best-effort.
"""
if not self.enabled:
return
try:
raw = properties or {}
cleaned = redact(raw)
validated = validate_event(event_name, cleaned)
if validated is None:
logger.debug("Dropped unknown analytics event: %s", event_name)
return
self._posthog.capture(
distinct_id=self.anon_id,
event=event_name,
properties=validated,
)
except Exception as exc:
logger.debug("Analytics capture failed for %s: %s", event_name, exc)
|
flush
Force-flush queued events. Safe to call repeatedly.
Source code in src/openjarvis/analytics/client.py
| def flush(self) -> None:
"""Force-flush queued events. Safe to call repeatedly."""
if self._posthog is None:
return
try:
self._posthog.flush()
except Exception:
pass
|
shutdown
Flush and close the SDK. Call once on process exit.
Source code in src/openjarvis/analytics/client.py
| def shutdown(self) -> None:
"""Flush and close the SDK. Call once on process exit."""
with self._lock:
if self._posthog is None:
return
try:
self._posthog.flush()
self._posthog.shutdown()
except Exception:
pass
self._posthog = None
self._enabled = False
|
Functions
get_or_create_anon_id
get_or_create_anon_id(path: Path | str) -> str
Return the persisted anon ID, generating one on first call.
Idempotent across processes — if the file already exists with a
non-empty value, return it; otherwise generate a fresh UUID v4 and
write atomically (rename-after-write so a crashed write leaves no
half-file).
Source code in src/openjarvis/analytics/identity.py
| def get_or_create_anon_id(path: Path | str) -> str:
"""Return the persisted anon ID, generating one on first call.
Idempotent across processes — if the file already exists with a
non-empty value, return it; otherwise generate a fresh UUID v4 and
write atomically (rename-after-write so a crashed write leaves no
half-file).
"""
p = Path(path)
if p.exists():
existing = p.read_text(encoding="utf-8").strip()
if existing:
return existing
new_id = str(uuid.uuid4())
p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(p.suffix + ".tmp")
tmp.write_text(new_id + "\n", encoding="utf-8")
tmp.replace(p)
return new_id
|
is_analytics_enabled
Return True if analytics is enabled.
Disabled in three cases (any one is sufficient):
- Running under pytest. The PostHog SDK registers an
atexit
hook that synchronously joins its consumer thread; if the host
is unreachable (CI runners can't reach the analytics endpoint),
each queued batch retries for timeout * max_retries seconds
and the interpreter never exits. Detect pytest via
PYTEST_CURRENT_TEST (set per test) and "pytest" in
sys.modules (covers the collection phase before the first
test runs).
- An opt-out env var is set:
DO_NOT_TRACK=1 (W3C convention)
or OPENJARVIS_NO_ANALYTICS=1 (project-specific). Both take
precedence over the config so users can opt out without
editing ~/.openjarvis/config.toml.
- The
[analytics] enabled = false config-file setting.
Source code in src/openjarvis/analytics/identity.py
| def is_analytics_enabled(cfg: AnalyticsConfig) -> bool:
"""Return True if analytics is enabled.
Disabled in three cases (any one is sufficient):
1. Running under pytest. The PostHog SDK registers an ``atexit``
hook that synchronously joins its consumer thread; if the host
is unreachable (CI runners can't reach the analytics endpoint),
each queued batch retries for ``timeout * max_retries`` seconds
and the interpreter never exits. Detect pytest via
``PYTEST_CURRENT_TEST`` (set per test) and ``"pytest" in
sys.modules`` (covers the collection phase before the first
test runs).
2. An opt-out env var is set: ``DO_NOT_TRACK=1`` (W3C convention)
or ``OPENJARVIS_NO_ANALYTICS=1`` (project-specific). Both take
precedence over the config so users can opt out without
editing ``~/.openjarvis/config.toml``.
3. The ``[analytics] enabled = false`` config-file setting.
"""
if os.environ.get("PYTEST_CURRENT_TEST") or "pytest" in sys.modules:
return False
if _env_opt_out():
return False
return cfg.enabled
|
reset_anon_id
reset_anon_id(path: Path | str) -> str
Delete the persisted ID and generate a fresh one (privacy reset).
Source code in src/openjarvis/analytics/identity.py
| def reset_anon_id(path: Path | str) -> str:
"""Delete the persisted ID and generate a fresh one (privacy reset)."""
p = Path(path)
if p.exists():
p.unlink()
return get_or_create_anon_id(p)
|
hash_id
Return a 16-char sha256 prefix of s.
Used for model / tool / connector names that aren't on the public
allowlist — we still want to see "uses-a-custom-model-X" cohorting
without ever learning which model.
Source code in src/openjarvis/analytics/redaction.py
| def hash_id(s: str) -> str:
"""Return a 16-char sha256 prefix of ``s``.
Used for model / tool / connector names that aren't on the public
allowlist — we still want to see "uses-a-custom-model-X" cohorting
without ever learning which model.
"""
if not s:
return ""
return hashlib.sha256(s.encode("utf-8")).hexdigest()[:16]
|
redact
redact(properties: dict[str, Any]) -> dict[str, Any]
Return a copy of properties with PII-bearing string values dropped.
Non-string values pass through unchanged. Strings exceeding
MAX_STR_LEN are dropped. Strings matching any PII pattern are
dropped. The event-spec validator (in :mod:events) runs after
this and provides a second layer of structural enforcement.
Source code in src/openjarvis/analytics/redaction.py
| def redact(properties: dict[str, Any]) -> dict[str, Any]:
"""Return a copy of ``properties`` with PII-bearing string values dropped.
Non-string values pass through unchanged. Strings exceeding
``MAX_STR_LEN`` are dropped. Strings matching any PII pattern are
dropped. The event-spec validator (in :mod:`events`) runs after
this and provides a second layer of structural enforcement.
"""
out: dict[str, Any] = {}
for key, value in properties.items():
if isinstance(value, str):
if not value:
# empty string is uninformative — drop
continue
if len(value) > MAX_STR_LEN:
continue
if looks_like_pii(value):
continue
elif isinstance(value, (list, dict, set, tuple)):
# Composite values are never sent — keeps the surface tiny.
continue
out[key] = value
return out
|