Skip to content

store

store

SQLite-backed telemetry storage.

Classes

TelemetryStore

TelemetryStore(db_path: str | Path)

Append-only SQLite store for inference telemetry records.

Source code in src/openjarvis/telemetry/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.execute(_CREATE_TABLE)
    self._conn.execute(_CREATE_MINING_STATS_TABLE)
    self._conn.commit()
    self._migrate_schema()
Functions
record
record(rec: TelemetryRecord) -> None

Persist a single telemetry record.

Source code in src/openjarvis/telemetry/store.py
def record(self, rec: TelemetryRecord) -> None:
    """Persist a single telemetry record."""
    self._conn.execute(
        _INSERT,
        (
            rec.timestamp,
            rec.model_id,
            rec.engine,
            rec.agent,
            rec.prompt_tokens,
            rec.prompt_tokens_evaluated,
            rec.completion_tokens,
            rec.total_tokens,
            rec.latency_seconds,
            rec.ttft,
            rec.cost_usd,
            rec.energy_joules,
            rec.power_watts,
            rec.gpu_utilization_pct,
            rec.gpu_memory_used_gb,
            rec.gpu_temperature_c,
            rec.throughput_tok_per_sec,
            rec.prefill_latency_seconds,
            rec.decode_latency_seconds,
            rec.energy_method,
            rec.energy_vendor,
            rec.batch_id,
            1 if rec.is_warmup else 0,
            rec.cpu_energy_joules,
            rec.gpu_energy_joules,
            rec.dram_energy_joules,
            rec.tokens_per_joule,
            rec.energy_per_output_token_joules,
            rec.throughput_per_watt,
            rec.prefill_energy_joules,
            rec.decode_energy_joules,
            rec.mean_itl_ms,
            rec.median_itl_ms,
            rec.p90_itl_ms,
            rec.p95_itl_ms,
            rec.p99_itl_ms,
            rec.std_itl_ms,
            1 if rec.is_streaming else 0,
            rec.mining_session_id,
            json.dumps(rec.metadata),
        ),
    )
    self._conn.commit()
record_mining_stats
record_mining_stats(stats: Any) -> None

Persist one mining stats snapshot.

stats is duck-typed to keep telemetry usable without importing the optional mining package at module import time.

Source code in src/openjarvis/telemetry/store.py
    def record_mining_stats(self, stats: Any) -> None:
        """Persist one mining stats snapshot.

        ``stats`` is duck-typed to keep telemetry usable without importing the
        optional mining package at module import time.
        """
        self._conn.execute(
            """\
INSERT INTO mining_stats (
    recorded_at, provider_id, shares_submitted, shares_accepted, blocks_found,
    hashrate, uptime_seconds, last_share_at, last_error, payout_target, fees_owed
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
            (
                time.time(),
                stats.provider_id,
                stats.shares_submitted,
                stats.shares_accepted,
                stats.blocks_found,
                stats.hashrate,
                stats.uptime_seconds,
                stats.last_share_at,
                stats.last_error,
                stats.payout_target,
                stats.fees_owed,
            ),
        )
        self._conn.commit()
list_recent
list_recent(limit: int = 50) -> list[dict[str, Any]]

Return recent telemetry rows as dictionaries.

Source code in src/openjarvis/telemetry/store.py
def list_recent(self, limit: int = 50) -> list[dict[str, Any]]:
    """Return recent telemetry rows as dictionaries."""
    return self._select_dicts(
        "SELECT * FROM telemetry ORDER BY timestamp DESC LIMIT ?",
        (limit,),
    )
list_recent_mining_stats
list_recent_mining_stats(limit: int = 50) -> list[dict[str, Any]]

Return recent mining stats snapshots as dictionaries.

Source code in src/openjarvis/telemetry/store.py
def list_recent_mining_stats(self, limit: int = 50) -> list[dict[str, Any]]:
    """Return recent mining stats snapshots as dictionaries."""
    return self._select_dicts(
        "SELECT * FROM mining_stats ORDER BY recorded_at DESC LIMIT ?",
        (limit,),
    )
subscribe_to_bus
subscribe_to_bus(bus: EventBus) -> None

Subscribe to TELEMETRY_RECORD events on bus.

Source code in src/openjarvis/telemetry/store.py
def subscribe_to_bus(self, bus: EventBus) -> None:
    """Subscribe to ``TELEMETRY_RECORD`` events on *bus*."""
    bus.subscribe(EventType.TELEMETRY_RECORD, self._on_event)
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/telemetry/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()