Skip to content

aggregator

aggregator

Read-only telemetry aggregation — query stored inference records.

Classes

ModelStats dataclass

ModelStats(model_id: str = '', call_count: int = 0, total_tokens: int = 0, prompt_tokens: int = 0, completion_tokens: int = 0, total_latency: float = 0.0, avg_latency: float = 0.0, total_cost: float = 0.0, avg_ttft: float = 0.0, total_energy_joules: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_throughput_tok_per_sec: float = 0.0, avg_tokens_per_joule: float = 0.0, avg_energy_per_output_token_joules: float = 0.0, avg_throughput_per_watt: float = 0.0, total_prefill_energy_joules: float = 0.0, total_decode_energy_joules: float = 0.0, avg_mean_itl_ms: float = 0.0, avg_median_itl_ms: float = 0.0, avg_p95_itl_ms: float = 0.0)

Aggregated statistics for a single model.

EngineStats dataclass

EngineStats(engine: str = '', call_count: int = 0, total_tokens: int = 0, total_latency: float = 0.0, avg_latency: float = 0.0, total_cost: float = 0.0, avg_ttft: float = 0.0, total_energy_joules: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_throughput_tok_per_sec: float = 0.0, avg_tokens_per_joule: float = 0.0, avg_energy_per_output_token_joules: float = 0.0, avg_throughput_per_watt: float = 0.0, total_prefill_energy_joules: float = 0.0, total_decode_energy_joules: float = 0.0, avg_mean_itl_ms: float = 0.0, avg_median_itl_ms: float = 0.0, avg_p95_itl_ms: float = 0.0)

Aggregated statistics for a single engine backend.

AggregatedStats dataclass

AggregatedStats(total_calls: int = 0, total_tokens: int = 0, total_cost: float = 0.0, total_latency: float = 0.0, total_energy_joules: float = 0.0, avg_throughput_tok_per_sec: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_energy_per_output_token_joules: float = 0.0, avg_throughput_per_watt: float = 0.0, total_prefill_energy_joules: float = 0.0, total_decode_energy_joules: float = 0.0, avg_mean_itl_ms: float = 0.0, avg_median_itl_ms: float = 0.0, avg_p95_itl_ms: float = 0.0, per_model: List[ModelStats] = list(), per_engine: List[EngineStats] = list())

Top-level summary combining per-model and per-engine stats.

TelemetryAggregator

TelemetryAggregator(db_path: str | Path)

Read-only query layer over the telemetry SQLite database.

Source code in src/openjarvis/telemetry/aggregator.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.row_factory = sqlite3.Row
Functions
per_batch_stats
per_batch_stats(*, since: Optional[float] = None, until: Optional[float] = None, exclude_warmup: bool = False) -> List[Dict[str, Any]]

Aggregate telemetry by batch_id.

Returns list of dicts with batch_id, total_requests, total_tokens, total_energy_joules, energy_per_token_joules.

Source code in src/openjarvis/telemetry/aggregator.py
def per_batch_stats(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
    exclude_warmup: bool = False,
) -> List[Dict[str, Any]]:
    """Aggregate telemetry by batch_id.

    Returns list of dicts with batch_id, total_requests, total_tokens,
    total_energy_joules, energy_per_token_joules.
    """
    clauses: list[str] = ["batch_id != ''"]
    params: list[Any] = []
    if since is not None:
        clauses.append("timestamp >= ?")
        params.append(since)
    if until is not None:
        clauses.append("timestamp <= ?")
        params.append(until)
    if exclude_warmup:
        clauses.append("is_warmup = 0")
    where = " WHERE " + " AND ".join(clauses)

    sql = (
        "SELECT batch_id,"
        " COUNT(*) AS total_requests,"
        " SUM(prompt_tokens + completion_tokens) AS total_tokens,"
        " SUM(energy_joules) AS total_energy_joules"
        f" FROM telemetry{where}"
        " GROUP BY batch_id ORDER BY total_requests DESC"
    )
    rows = self._conn.execute(sql, params).fetchall()
    results: List[Dict[str, Any]] = []
    for r in rows:
        total_tokens = r["total_tokens"] or 0
        total_energy = r["total_energy_joules"] or 0.0
        results.append(
            {
                "batch_id": r["batch_id"],
                "total_requests": r["total_requests"],
                "total_tokens": total_tokens,
                "total_energy_joules": total_energy,
                "energy_per_token_joules": (
                    total_energy / total_tokens if total_tokens > 0 else 0.0
                ),
            }
        )
    return results