Skip to content

Index

telemetry

Telemetry — SQLite-backed inference recording and instrumented wrappers.

Classes

AggregatedStats dataclass

AggregatedStats(total_calls: int = 0, total_tokens: int = 0, total_cost: float = 0.0, total_latency: float = 0.0, total_energy_joules: float = 0.0, avg_throughput_tok_per_sec: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_energy_per_output_token_joules: float = 0.0, avg_throughput_per_watt: float = 0.0, total_prefill_energy_joules: float = 0.0, total_decode_energy_joules: float = 0.0, avg_mean_itl_ms: float = 0.0, avg_median_itl_ms: float = 0.0, avg_p95_itl_ms: float = 0.0, per_model: List[ModelStats] = list(), per_engine: List[EngineStats] = list())

Top-level summary combining per-model and per-engine stats.

EngineStats dataclass

EngineStats(engine: str = '', call_count: int = 0, total_tokens: int = 0, total_latency: float = 0.0, avg_latency: float = 0.0, total_cost: float = 0.0, avg_ttft: float = 0.0, total_energy_joules: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_throughput_tok_per_sec: float = 0.0, avg_tokens_per_joule: float = 0.0, avg_energy_per_output_token_joules: float = 0.0, avg_throughput_per_watt: float = 0.0, total_prefill_energy_joules: float = 0.0, total_decode_energy_joules: float = 0.0, avg_mean_itl_ms: float = 0.0, avg_median_itl_ms: float = 0.0, avg_p95_itl_ms: float = 0.0)

Aggregated statistics for a single engine backend.

ModelStats dataclass

ModelStats(model_id: str = '', call_count: int = 0, total_tokens: int = 0, prompt_tokens: int = 0, completion_tokens: int = 0, total_latency: float = 0.0, avg_latency: float = 0.0, total_cost: float = 0.0, avg_ttft: float = 0.0, total_energy_joules: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_throughput_tok_per_sec: float = 0.0, avg_tokens_per_joule: float = 0.0, avg_energy_per_output_token_joules: float = 0.0, avg_throughput_per_watt: float = 0.0, total_prefill_energy_joules: float = 0.0, total_decode_energy_joules: float = 0.0, avg_mean_itl_ms: float = 0.0, avg_median_itl_ms: float = 0.0, avg_p95_itl_ms: float = 0.0)

Aggregated statistics for a single model.

TelemetryAggregator

TelemetryAggregator(db_path: str | Path)

Read-only query layer over the telemetry SQLite database.

Source code in src/openjarvis/telemetry/aggregator.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.row_factory = sqlite3.Row
Functions
per_batch_stats
per_batch_stats(*, since: Optional[float] = None, until: Optional[float] = None, exclude_warmup: bool = False) -> List[Dict[str, Any]]

Aggregate telemetry by batch_id.

Returns list of dicts with batch_id, total_requests, total_tokens, total_energy_joules, energy_per_token_joules.

Source code in src/openjarvis/telemetry/aggregator.py
def per_batch_stats(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
    exclude_warmup: bool = False,
) -> List[Dict[str, Any]]:
    """Aggregate telemetry by batch_id.

    Returns list of dicts with batch_id, total_requests, total_tokens,
    total_energy_joules, energy_per_token_joules.
    """
    clauses: list[str] = ["batch_id != ''"]
    params: list[Any] = []
    if since is not None:
        clauses.append("timestamp >= ?")
        params.append(since)
    if until is not None:
        clauses.append("timestamp <= ?")
        params.append(until)
    if exclude_warmup:
        clauses.append("is_warmup = 0")
    where = " WHERE " + " AND ".join(clauses)

    sql = (
        "SELECT batch_id,"
        " COUNT(*) AS total_requests,"
        " SUM(prompt_tokens + completion_tokens) AS total_tokens,"
        " SUM(energy_joules) AS total_energy_joules"
        f" FROM telemetry{where}"
        " GROUP BY batch_id ORDER BY total_requests DESC"
    )
    rows = self._conn.execute(sql, params).fetchall()
    results: List[Dict[str, Any]] = []
    for r in rows:
        total_tokens = r["total_tokens"] or 0
        total_energy = r["total_energy_joules"] or 0.0
        results.append(
            {
                "batch_id": r["batch_id"],
                "total_requests": r["total_requests"],
                "total_tokens": total_tokens,
                "total_energy_joules": total_energy,
                "energy_per_token_joules": (
                    total_energy / total_tokens if total_tokens > 0 else 0.0
                ),
            }
        )
    return results

TelemetryStore

TelemetryStore(db_path: str | Path)

Append-only SQLite store for inference telemetry records.

Source code in src/openjarvis/telemetry/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.execute(_CREATE_TABLE)
    self._conn.commit()
    self._migrate_schema()
Functions
record
record(rec: TelemetryRecord) -> None

Persist a single telemetry record.

Source code in src/openjarvis/telemetry/store.py
def record(self, rec: TelemetryRecord) -> None:
    """Persist a single telemetry record."""
    self._conn.execute(
        _INSERT,
        (
            rec.timestamp,
            rec.model_id,
            rec.engine,
            rec.agent,
            rec.prompt_tokens,
            rec.completion_tokens,
            rec.total_tokens,
            rec.latency_seconds,
            rec.ttft,
            rec.cost_usd,
            rec.energy_joules,
            rec.power_watts,
            rec.gpu_utilization_pct,
            rec.gpu_memory_used_gb,
            rec.gpu_temperature_c,
            rec.throughput_tok_per_sec,
            rec.prefill_latency_seconds,
            rec.decode_latency_seconds,
            rec.energy_method,
            rec.energy_vendor,
            rec.batch_id,
            1 if rec.is_warmup else 0,
            rec.cpu_energy_joules,
            rec.gpu_energy_joules,
            rec.dram_energy_joules,
            rec.tokens_per_joule,
            rec.energy_per_output_token_joules,
            rec.throughput_per_watt,
            rec.prefill_energy_joules,
            rec.decode_energy_joules,
            rec.mean_itl_ms,
            rec.median_itl_ms,
            rec.p90_itl_ms,
            rec.p95_itl_ms,
            rec.p99_itl_ms,
            rec.std_itl_ms,
            1 if rec.is_streaming else 0,
            json.dumps(rec.metadata),
        ),
    )
    self._conn.commit()
subscribe_to_bus
subscribe_to_bus(bus: EventBus) -> None

Subscribe to TELEMETRY_RECORD events on bus.

Source code in src/openjarvis/telemetry/store.py
def subscribe_to_bus(self, bus: EventBus) -> None:
    """Subscribe to ``TELEMETRY_RECORD`` events on *bus*."""
    bus.subscribe(EventType.TELEMETRY_RECORD, self._on_event)
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/telemetry/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()

GpuHardwareSpec dataclass

GpuHardwareSpec(tflops_fp16: float, bandwidth_gb_s: float, tdp_watts: float)

Peak theoretical capabilities for a known GPU model.

GpuMonitor

GpuMonitor(poll_interval_ms: int = 50)

Background GPU poller using pynvml.

Usage::

mon = GpuMonitor(poll_interval_ms=50)
with mon.sample() as result:
    # ... run inference ...
    pass
print(result.energy_joules)
mon.close()
Source code in src/openjarvis/telemetry/gpu_monitor.py
def __init__(self, poll_interval_ms: int = 50) -> None:
    self._poll_interval_s = poll_interval_ms / 1000.0
    self._handles: List = []
    self._device_count = 0
    self._initialized = False

    if _PYNVML_AVAILABLE:
        try:
            pynvml.nvmlInit()
            self._device_count = pynvml.nvmlDeviceGetCount()
            self._handles = [
                pynvml.nvmlDeviceGetHandleByIndex(i)
                for i in range(self._device_count)
            ]
            self._initialized = True
        except Exception as exc:
            logger.debug("GPU monitor initialization failed: %s", exc)
            self._initialized = False
Functions
available staticmethod
available() -> bool

Return True if pynvml is importable and can be initialized.

Source code in src/openjarvis/telemetry/gpu_monitor.py
@staticmethod
def available() -> bool:
    """Return ``True`` if pynvml is importable and can be initialized."""
    if not _PYNVML_AVAILABLE:
        return False
    try:
        pynvml.nvmlInit()
        pynvml.nvmlShutdown()
        return True
    except Exception as exc:
        logger.debug("GPU monitor availability check failed: %s", exc)
        return False
sample
sample() -> Generator[GpuSample, None, None]

Context manager that polls GPUs during the block, then populates the sample.

If pynvml is unavailable or no devices are found, yields an empty :class:GpuSample without starting a background thread.

Source code in src/openjarvis/telemetry/gpu_monitor.py
@contextmanager
def sample(self) -> Generator[GpuSample, None, None]:
    """Context manager that polls GPUs during the block, then populates the sample.

    If pynvml is unavailable or no devices are found, yields an empty
    :class:`GpuSample` without starting a background thread.
    """
    result = GpuSample()

    if not self._initialized or self._device_count == 0:
        t_start = time.monotonic()
        yield result
        result.duration_seconds = time.monotonic() - t_start
        return

    snapshots: List[List[GpuSnapshot]] = []
    timestamps: List[float] = []
    lock = threading.Lock()
    stop_event = threading.Event()

    thread = threading.Thread(
        target=self._polling_loop,
        args=(snapshots, timestamps, lock, stop_event),
        daemon=True,
    )

    t_start = time.monotonic()
    thread.start()
    try:
        yield result
    finally:
        stop_event.set()
        thread.join(timeout=2.0)
        wall = time.monotonic() - t_start

        with lock:
            snap_copy = list(snapshots)
            ts_copy = list(timestamps)

        aggregated = self._aggregate(snap_copy, ts_copy, wall)

        # Copy aggregated values into the yielded result object
        result.energy_joules = aggregated.energy_joules
        result.mean_power_watts = aggregated.mean_power_watts
        result.peak_power_watts = aggregated.peak_power_watts
        result.mean_utilization_pct = aggregated.mean_utilization_pct
        result.peak_utilization_pct = aggregated.peak_utilization_pct
        result.mean_memory_used_gb = aggregated.mean_memory_used_gb
        result.peak_memory_used_gb = aggregated.peak_memory_used_gb
        result.mean_temperature_c = aggregated.mean_temperature_c
        result.peak_temperature_c = aggregated.peak_temperature_c
        result.duration_seconds = aggregated.duration_seconds
        result.num_snapshots = aggregated.num_snapshots
close
close() -> None

Shut down pynvml if it was initialized.

Source code in src/openjarvis/telemetry/gpu_monitor.py
def close(self) -> None:
    """Shut down pynvml if it was initialized."""
    if self._initialized:
        try:
            pynvml.nvmlShutdown()
        except Exception as exc:
            logger.debug("Failed to shut down GPU monitor: %s", exc)
        self._initialized = False

GpuSample dataclass

GpuSample(energy_joules: float = 0.0, mean_power_watts: float = 0.0, peak_power_watts: float = 0.0, mean_utilization_pct: float = 0.0, peak_utilization_pct: float = 0.0, mean_memory_used_gb: float = 0.0, peak_memory_used_gb: float = 0.0, mean_temperature_c: float = 0.0, peak_temperature_c: float = 0.0, duration_seconds: float = 0.0, num_snapshots: int = 0)

Aggregated GPU metrics over an inference bracket.

GpuSnapshot dataclass

GpuSnapshot(power_watts: float, utilization_pct: float, memory_used_gb: float, temperature_c: float, device_id: int = 0)

A single point-in-time reading from one GPU device.

EfficiencyMetrics dataclass

EfficiencyMetrics(mfu_pct: float = 0.0, mbu_pct: float = 0.0, actual_flops: float = 0.0, peak_flops: float = 0.0, actual_bandwidth_gb_s: float = 0.0, peak_bandwidth_gb_s: float = 0.0, ipj: float = 0.0)

Results of an MFU/MBU efficiency calculation.

VLLMMetrics dataclass

VLLMMetrics(ttft_p50: float = 0.0, ttft_p95: float = 0.0, ttft_p99: float = 0.0, gpu_cache_usage_pct: float = 0.0, e2e_latency_p50: float = 0.0, e2e_latency_p95: float = 0.0, queue_depth: float = 0.0)

Parsed vLLM performance metrics.

VLLMMetricsScraper

VLLMMetricsScraper(host: str = 'http://localhost:8000')

Scrapes vLLM's Prometheus /metrics endpoint.

Source code in src/openjarvis/telemetry/vllm_metrics.py
def __init__(self, host: str = "http://localhost:8000") -> None:
    self._host = host.rstrip("/")
Functions
scrape
scrape() -> VLLMMetrics

Fetch and parse vLLM metrics. Returns zeroed metrics on error.

Source code in src/openjarvis/telemetry/vllm_metrics.py
def scrape(self) -> VLLMMetrics:
    """Fetch and parse vLLM metrics. Returns zeroed metrics on error."""
    try:
        resp = httpx.get(f"{self._host}/metrics", timeout=5.0)
        resp.raise_for_status()
    except (
        httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError,
    ) as exc:
        logger.debug("Failed to fetch vLLM metrics: %s", exc)
        return VLLMMetrics()

    return self._parse(resp.text)

EnergyMonitor

Bases: ABC

Abstract base class for energy measurement backends.

Each vendor implementation probes for hardware support at init, exposes an available() class method, and provides a sample() context manager that measures energy over a code block.

Functions
available abstractmethod staticmethod
available() -> bool

Return True if this monitor can run on the current hardware.

Source code in src/openjarvis/telemetry/energy_monitor.py
@staticmethod
@abstractmethod
def available() -> bool:
    """Return ``True`` if this monitor can run on the current hardware."""
vendor abstractmethod
vendor() -> EnergyVendor

Return the vendor enum for this monitor.

Source code in src/openjarvis/telemetry/energy_monitor.py
@abstractmethod
def vendor(self) -> EnergyVendor:
    """Return the vendor enum for this monitor."""
energy_method abstractmethod
energy_method() -> str

Return the measurement method: 'hw_counter', 'polling', 'rapl', or 'zeus'.

Source code in src/openjarvis/telemetry/energy_monitor.py
@abstractmethod
def energy_method(self) -> str:
    """Return the measurement method: 'hw_counter', 'polling', 'rapl', or 'zeus'."""
sample abstractmethod
sample() -> Generator[EnergySample, None, None]

Context manager that measures energy during the enclosed block.

Yields an EnergySample that is populated when the block exits.

Source code in src/openjarvis/telemetry/energy_monitor.py
@abstractmethod
@contextmanager
def sample(self) -> Generator[EnergySample, None, None]:
    """Context manager that measures energy during the enclosed block.

    Yields an ``EnergySample`` that is populated when the block exits.
    """
    yield EnergySample()  # pragma: no cover
snapshot
snapshot() -> EnergySample

Return an instantaneous energy reading without start/stop bracket.

Subclasses should override to provide actual readings. Default returns an empty sample.

Source code in src/openjarvis/telemetry/energy_monitor.py
def snapshot(self) -> EnergySample:
    """Return an instantaneous energy reading without start/stop bracket.

    Subclasses should override to provide actual readings. Default returns
    an empty sample.
    """
    return EnergySample()
close abstractmethod
close() -> None

Release any resources (handles, threads, etc.).

Source code in src/openjarvis/telemetry/energy_monitor.py
@abstractmethod
def close(self) -> None:
    """Release any resources (handles, threads, etc.)."""

EnergySample dataclass

EnergySample(energy_joules: float = 0.0, mean_power_watts: float = 0.0, peak_power_watts: float = 0.0, duration_seconds: float = 0.0, num_snapshots: int = 0, mean_utilization_pct: float = 0.0, peak_utilization_pct: float = 0.0, mean_memory_used_gb: float = 0.0, peak_memory_used_gb: float = 0.0, mean_temperature_c: float = 0.0, peak_temperature_c: float = 0.0, vendor: str = '', device_name: str = '', device_count: int = 0, energy_method: str = '', cpu_energy_joules: float = 0.0, gpu_energy_joules: float = 0.0, dram_energy_joules: float = 0.0, ane_energy_joules: float = 0.0)

Aggregated energy metrics over an inference bracket.

Superset of GpuSample — adds vendor, device info, energy method, and per-component breakdown (CPU, GPU, DRAM, ANE).

EnergyVendor

Bases: str, Enum

Supported energy measurement vendors.

BatchMetrics dataclass

BatchMetrics(batch_id: str = '', total_requests: int = 0, total_tokens: int = 0, total_energy_joules: float = 0.0, energy_per_token_joules: float = 0.0, energy_per_request_joules: float = 0.0, mean_power_watts: float = 0.0, mean_throughput_tok_per_sec: float = 0.0, prefill_energy_joules: float = 0.0, decode_energy_joules: float = 0.0, per_request_energy: List[float] = list())

Aggregated metrics for a batch of inference requests.

EnergyBatch

EnergyBatch(energy_monitor: Optional[Any] = None, batch_id: Optional[str] = None)

Group inference requests into a batch and compute per-token energy.

Works with or without an EnergyMonitor. When no monitor is provided, request counts are still tracked but energy values stay at zero.

Source code in src/openjarvis/telemetry/batch.py
def __init__(
    self,
    energy_monitor: Optional[Any] = None,
    batch_id: Optional[str] = None,
) -> None:
    self._monitor = energy_monitor
    self.batch_id = batch_id or str(uuid.uuid4())
    self.metrics: Optional[BatchMetrics] = None
Functions
sample
sample() -> Generator[_BatchContext, None, None]

Wrap an energy monitor sample and provide a context for recording requests.

Yields a _BatchContext whose record_request() method should be called once per inference request inside the block.

Source code in src/openjarvis/telemetry/batch.py
@contextmanager
def sample(self) -> Generator[_BatchContext, None, None]:
    """Wrap an energy monitor sample and provide a context for recording requests.

    Yields a ``_BatchContext`` whose ``record_request()`` method should be
    called once per inference request inside the block.
    """
    ctx = _BatchContext()

    if self._monitor is not None:
        with self._monitor.sample() as energy_sample:
            start = time.monotonic()
            yield ctx
            elapsed = time.monotonic() - start
        total_energy = energy_sample.energy_joules
        mean_power = energy_sample.mean_power_watts
    else:
        start = time.monotonic()
        yield ctx
        elapsed = time.monotonic() - start
        total_energy = ctx._total_energy
        mean_power = 0.0

    total_tokens = ctx._total_tokens
    total_requests = ctx._total_requests
    per_request_energy = list(ctx._per_request_energy)

    energy_per_token = (
        total_energy / total_tokens if total_tokens > 0 else 0.0
    )
    energy_per_request = (
        total_energy / total_requests if total_requests > 0 else 0.0
    )
    mean_throughput = (
        total_tokens / elapsed if elapsed > 0 else 0.0
    )

    self.metrics = BatchMetrics(
        batch_id=self.batch_id,
        total_requests=total_requests,
        total_tokens=total_tokens,
        total_energy_joules=total_energy,
        energy_per_token_joules=energy_per_token,
        energy_per_request_joules=energy_per_request,
        mean_power_watts=mean_power,
        mean_throughput_tok_per_sec=mean_throughput,
        per_request_energy=per_request_energy,
    )

SteadyStateConfig dataclass

SteadyStateConfig(warmup_samples: int = 5, window_size: int = 5, cv_threshold: float = 0.05, min_steady_samples: int = 3, metric: str = 'throughput')

Configuration for steady-state detection.

SteadyStateDetector

SteadyStateDetector(config: SteadyStateConfig | None = None)

Detect steady state using coefficient of variation over a sliding window.

The first warmup_samples recordings are always classified as warmup. After warmup, the CV (stdev / mean) of the last window_size values is checked. When CV < cv_threshold for min_steady_samples consecutive checks, steady state is declared.

Source code in src/openjarvis/telemetry/steady_state.py
def __init__(self, config: SteadyStateConfig | None = None) -> None:
    self._config = config or SteadyStateConfig()
    self._throughputs: List[float] = []
    self._energies: List[float] = []
    self._consecutive_stable: int = 0
    self._steady_state_reached: bool = False
Attributes
result property

Return a snapshot of the detection state.

Functions
record
record(throughput: float, energy: float = 0.0, latency: float = 0.0) -> bool

Record a sample. Returns True when steady state is reached.

Source code in src/openjarvis/telemetry/steady_state.py
def record(
    self,
    throughput: float,
    energy: float = 0.0,
    latency: float = 0.0,
) -> bool:
    """Record a sample.  Returns ``True`` when steady state is reached."""
    self._throughputs.append(throughput)
    self._energies.append(energy)

    cfg = self._config

    # Still in warmup phase
    if len(self._throughputs) <= cfg.warmup_samples:
        return False

    # Already declared steady
    if self._steady_state_reached:
        return True

    # Not enough post-warmup samples for a full window yet
    post_warmup = self._throughputs[cfg.warmup_samples:]
    if len(post_warmup) < cfg.window_size:
        return False

    # Compute CV over the last window_size values
    window = post_warmup[-cfg.window_size:]
    mean = statistics.mean(window)
    if mean == 0:
        self._consecutive_stable = 0
        return False

    cv = statistics.stdev(window) / mean if len(window) > 1 else 0.0

    if cv < cfg.cv_threshold:
        self._consecutive_stable += 1
    else:
        self._consecutive_stable = 0

    if self._consecutive_stable >= cfg.min_steady_samples:
        self._steady_state_reached = True
        return True

    return False
reset
reset() -> None

Clear all recorded state.

Source code in src/openjarvis/telemetry/steady_state.py
def reset(self) -> None:
    """Clear all recorded state."""
    self._throughputs.clear()
    self._energies.clear()
    self._consecutive_stable = 0
    self._steady_state_reached = False

SteadyStateResult dataclass

SteadyStateResult(total_samples: int = 0, warmup_samples: int = 0, steady_state_samples: int = 0, steady_state_reached: bool = False, warmup_throughputs: List[float] = list(), warmup_energies: List[float] = list(), steady_throughputs: List[float] = list(), steady_energies: List[float] = list())

Result of steady-state detection.

TelemetrySample dataclass

TelemetrySample(timestamp_ns: int, gpu_power_w: float = 0.0, cpu_power_w: float = 0.0, gpu_energy_j: float = 0.0, cpu_energy_j: float = 0.0, gpu_util_pct: float = 0.0, gpu_temp_c: float = 0.0, gpu_mem_gb: float = 0.0)

Single telemetry sample.

TelemetrySession

TelemetrySession(monitor: Optional[EnergyMonitor] = None, interval_ms: int = 100, buffer_size: int = 100000)

Background-sampling telemetry session.

Spawns a daemon thread that calls monitor.snapshot() at the configured interval. Stores samples in a ring buffer (Rust-backed if available, else pure-Python fallback).

Source code in src/openjarvis/telemetry/session.py
def __init__(
    self,
    monitor: Optional[EnergyMonitor] = None,
    interval_ms: int = 100,
    buffer_size: int = 100_000,
) -> None:
    self._monitor = monitor
    self._interval_ms = interval_ms
    self._buffer = _PythonRingBuffer(buffer_size)
    self._thread: Optional[threading.Thread] = None
    self._stop_event = threading.Event()
Functions
start
start() -> None

Start background sampling thread.

Source code in src/openjarvis/telemetry/session.py
def start(self) -> None:
    """Start background sampling thread."""
    if self._monitor is None:
        return
    if self._thread is not None and self._thread.is_alive():
        return
    self._stop_event.clear()
    self._thread = threading.Thread(target=self._sample_loop, daemon=True)
    self._thread.start()
stop
stop() -> None

Stop sampling thread.

Source code in src/openjarvis/telemetry/session.py
def stop(self) -> None:
    """Stop sampling thread."""
    self._stop_event.set()
    if self._thread is not None:
        self._thread.join(timeout=2.0)
        self._thread = None

Functions

instrumented_generate

instrumented_generate(engine: InferenceEngine, messages: Sequence[Message], *, model: str, bus: EventBus, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> Dict[str, Any]

Call engine.generate() and publish telemetry events on bus.

Returns the raw result dict from the engine.

Source code in src/openjarvis/telemetry/wrapper.py
def instrumented_generate(
    engine: InferenceEngine,
    messages: Sequence[Message],
    *,
    model: str,
    bus: EventBus,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Call ``engine.generate()`` and publish telemetry events on *bus*.

    Returns the raw result dict from the engine.
    """
    bus.publish(EventType.INFERENCE_START, {"model": model, "engine": engine.engine_id})

    t0 = time.time()
    result = engine.generate(
        messages, model=model, temperature=temperature, max_tokens=max_tokens, **kwargs
    )
    latency = time.time() - t0

    usage = result.get("usage", {})
    rec = TelemetryRecord(
        timestamp=t0,
        model_id=model,
        engine=engine.engine_id,
        prompt_tokens=usage.get("prompt_tokens", 0),
        completion_tokens=usage.get("completion_tokens", 0),
        total_tokens=usage.get("total_tokens", 0),
        latency_seconds=latency,
        cost_usd=result.get("cost_usd", 0.0),
    )

    bus.publish(
        EventType.INFERENCE_END,
        {"model": model, "engine": engine.engine_id, "latency": latency},
    )
    bus.publish(EventType.TELEMETRY_RECORD, {"record": rec})

    return result

compute_efficiency

compute_efficiency(param_count_b: float, active_params_b: float | None, gpu_peak_tflops: float, gpu_peak_bandwidth_gb_s: float, tokens_per_sec: float, num_gpus: int = 1, energy_joules: float = 0.0, accuracy: float = 0.0, bytes_per_param: float = 2.0) -> EfficiencyMetrics

Compute MFU, MBU, and derived efficiency metrics.

Args: param_count_b: Total parameter count in billions. active_params_b: Active parameters per token in billions (None for dense). gpu_peak_tflops: Peak theoretical TFLOPS per GPU (e.g. 312 for A100 SXM FP16). gpu_peak_bandwidth_gb_s: Peak memory bandwidth per GPU in GB/s (e.g. 2039 for A100 SXM). tokens_per_sec: Measured generation throughput (tokens/second). num_gpus: Number of GPUs used for inference. energy_joules: Total energy consumed in joules (for IPJ calculation). accuracy: Accuracy score in [0, 1] (for IPJ calculation). bytes_per_param: Bytes per parameter (default 2.0 for FP16).

Returns: :class:EfficiencyMetrics with all computed values.

Source code in src/openjarvis/telemetry/efficiency.py
def compute_efficiency(
    param_count_b: float,
    active_params_b: float | None,
    gpu_peak_tflops: float,
    gpu_peak_bandwidth_gb_s: float,
    tokens_per_sec: float,
    num_gpus: int = 1,
    energy_joules: float = 0.0,
    accuracy: float = 0.0,
    bytes_per_param: float = 2.0,
) -> EfficiencyMetrics:
    """Compute MFU, MBU, and derived efficiency metrics.

    Args:
        param_count_b: Total parameter count in billions.
        active_params_b: Active parameters per token in billions (*None* for dense).
        gpu_peak_tflops: Peak theoretical TFLOPS per GPU (e.g. 312 for A100 SXM FP16).
        gpu_peak_bandwidth_gb_s: Peak memory bandwidth per GPU in GB/s
            (e.g. 2039 for A100 SXM).
        tokens_per_sec: Measured generation throughput (tokens/second).
        num_gpus: Number of GPUs used for inference.
        energy_joules: Total energy consumed in joules (for IPJ calculation).
        accuracy: Accuracy score in [0, 1] (for IPJ calculation).
        bytes_per_param: Bytes per parameter (default 2.0 for FP16).

    Returns:
        :class:`EfficiencyMetrics` with all computed values.
    """
    flops_per_token = estimate_model_flops_per_token(param_count_b, active_params_b)
    bytes_per_token = estimate_model_bytes_per_token(param_count_b, bytes_per_param)

    # Actual achieved rates
    actual_flops = flops_per_token * tokens_per_sec
    actual_bandwidth_bytes = bytes_per_token * tokens_per_sec
    actual_bandwidth_gb_s = actual_bandwidth_bytes / 1e9

    # Peak rates across all GPUs
    peak_flops = gpu_peak_tflops * 1e12 * num_gpus
    peak_bandwidth_gb_s = gpu_peak_bandwidth_gb_s * num_gpus

    # MFU and MBU
    mfu_pct = (actual_flops / peak_flops * 100.0) if peak_flops > 0 else 0.0
    mbu_pct = (
        (actual_bandwidth_gb_s / peak_bandwidth_gb_s * 100.0)
        if peak_bandwidth_gb_s > 0
        else 0.0
    )

    # Intelligence Per Joule
    ipj = (accuracy / energy_joules) if energy_joules > 0 else 0.0

    return EfficiencyMetrics(
        mfu_pct=mfu_pct,
        mbu_pct=mbu_pct,
        actual_flops=actual_flops,
        peak_flops=peak_flops,
        actual_bandwidth_gb_s=actual_bandwidth_gb_s,
        peak_bandwidth_gb_s=peak_bandwidth_gb_s,
        ipj=ipj,
    )

create_energy_monitor

create_energy_monitor(poll_interval_ms: int = 50, prefer_vendor: Optional[str] = None) -> Optional[EnergyMonitor]

Factory — auto-detect and return the best available EnergyMonitor.

Detection order: NVIDIA > AMD > Apple > CPU RAPL. If prefer_vendor is set, try that vendor first.

Returns None if no energy monitoring is available.

Source code in src/openjarvis/telemetry/energy_monitor.py
def create_energy_monitor(
    poll_interval_ms: int = 50,
    prefer_vendor: Optional[str] = None,
) -> Optional[EnergyMonitor]:
    """Factory — auto-detect and return the best available EnergyMonitor.

    Detection order: NVIDIA > AMD > Apple > CPU RAPL.
    If *prefer_vendor* is set, try that vendor first.

    Returns ``None`` if no energy monitoring is available.
    """
    # Build ordered candidate list — imports are defensive because vendor
    # packages (amdsmi, pynvml) may be installed but non-functional on the
    # current platform (e.g. amdsmi on macOS).
    vendor_map: dict[str, type[EnergyMonitor]] = {}
    default_order: list[type[EnergyMonitor]] = []

    try:
        from openjarvis.telemetry.energy_nvidia import NvidiaEnergyMonitor
        vendor_map["nvidia"] = NvidiaEnergyMonitor
        default_order.append(NvidiaEnergyMonitor)
    except Exception as exc:
        logger.debug("Failed to load NVIDIA energy monitor: %s", exc)

    try:
        from openjarvis.telemetry.energy_amd import AmdEnergyMonitor
        vendor_map["amd"] = AmdEnergyMonitor
        default_order.append(AmdEnergyMonitor)
    except Exception as exc:
        logger.debug("Failed to load AMD energy monitor: %s", exc)

    try:
        from openjarvis.telemetry.energy_apple import AppleEnergyMonitor
        vendor_map["apple"] = AppleEnergyMonitor
        default_order.append(AppleEnergyMonitor)
    except Exception as exc:
        logger.debug("Failed to load Apple energy monitor: %s", exc)

    try:
        from openjarvis.telemetry.energy_rapl import RaplEnergyMonitor
        vendor_map["cpu_rapl"] = RaplEnergyMonitor
        default_order.append(RaplEnergyMonitor)
    except Exception as exc:
        logger.debug("Failed to load RAPL energy monitor: %s", exc)

    if prefer_vendor and prefer_vendor.lower() in vendor_map:
        preferred_cls = vendor_map[prefer_vendor.lower()]
        candidates = [preferred_cls] + [
            c for c in default_order if c is not preferred_cls
        ]
    else:
        candidates = default_order

    for cls in candidates:
        try:
            if cls.available():
                return cls(poll_interval_ms=poll_interval_ms)
        except Exception as exc:
            logger.debug("Energy monitor candidate failed: %s", exc)
            continue

    return None

compute_phase_metrics

compute_phase_metrics(session: TelemetrySession, start_ns: int, end_ns: int, tokens: int) -> dict

Compute energy/power metrics for a phase window.

Source code in src/openjarvis/telemetry/phase_metrics.py
def compute_phase_metrics(
    session: TelemetrySession,
    start_ns: int,
    end_ns: int,
    tokens: int,
) -> dict:
    """Compute energy/power metrics for a phase window."""
    gpu_j, cpu_j = session.energy_delta(start_ns, end_ns)
    gpu_w, cpu_w = session.avg_power(start_ns, end_ns)
    duration_s = (end_ns - start_ns) / 1e9
    energy_per_token = gpu_j / tokens if tokens > 0 else 0.0
    return {
        "energy_j": gpu_j,
        "cpu_energy_j": cpu_j,
        "mean_power_w": gpu_w,
        "cpu_mean_power_w": cpu_w,
        "duration_s": duration_s,
        "energy_per_token_j": energy_per_token,
        "tokens": tokens,
    }

split_at_ttft

split_at_ttft(session: TelemetrySession, start_ns: int, ttft_ns: int, end_ns: int, input_tokens: int, output_tokens: int) -> tuple[dict, dict]

Split energy at TTFT boundary into prefill and decode phases.

Source code in src/openjarvis/telemetry/phase_metrics.py
def split_at_ttft(
    session: TelemetrySession,
    start_ns: int,
    ttft_ns: int,
    end_ns: int,
    input_tokens: int,
    output_tokens: int,
) -> tuple[dict, dict]:
    """Split energy at TTFT boundary into prefill and decode phases."""
    prefill = compute_phase_metrics(session, start_ns, ttft_ns, input_tokens)
    decode = compute_phase_metrics(session, ttft_ns, end_ns, output_tokens)
    return (prefill, decode)

compute_itl_stats

compute_itl_stats(token_timestamps: list[float]) -> dict

Compute ITL statistics from token arrival timestamps (in ms).

Returns dict with p50_ms, p90_ms, p95_ms, p99_ms, mean_ms, min_ms, max_ms.

Source code in src/openjarvis/telemetry/itl.py
def compute_itl_stats(token_timestamps: list[float]) -> dict:
    """Compute ITL statistics from token arrival timestamps (in ms).

    Returns dict with p50_ms, p90_ms, p95_ms, p99_ms, mean_ms, min_ms, max_ms.
    """
    if len(token_timestamps) < 2:
        return {
            "p50_ms": 0,
            "p90_ms": 0,
            "p95_ms": 0,
            "p99_ms": 0,
            "mean_ms": 0,
            "min_ms": 0,
            "max_ms": 0,
        }

    # Compute inter-token latencies
    itls = [
        token_timestamps[i] - token_timestamps[i - 1]
        for i in range(1, len(token_timestamps))
    ]
    itls.sort()

    def percentile(data: list[float], p: float) -> float:
        k = (len(data) - 1) * p
        f = int(k)
        c = f + 1
        if c >= len(data):
            return data[-1]
        return data[f] + (k - f) * (data[c] - data[f])

    return {
        "p50_ms": percentile(itls, 0.50),
        "p90_ms": percentile(itls, 0.90),
        "p95_ms": percentile(itls, 0.95),
        "p99_ms": percentile(itls, 0.99),
        "mean_ms": statistics.mean(itls),
        "min_ms": min(itls),
        "max_ms": max(itls),
    }

compute_mfu

compute_mfu(flops: float, duration_s: float, gpu_name: str, num_gpus: int = 1) -> float

Compute Model FLOPs Utilization.

MFU = actual_tflops / (peak_tflops * num_gpus)

Source code in src/openjarvis/telemetry/flops.py
def compute_mfu(
    flops: float, duration_s: float, gpu_name: str, num_gpus: int = 1
) -> float:
    """Compute Model FLOPs Utilization.

    MFU = actual_tflops / (peak_tflops * num_gpus)
    """
    peak = GPU_PEAK_TFLOPS_BF16.get(gpu_name, 0.0)
    if peak == 0.0:
        # Try substring matching
        for key, val in GPU_PEAK_TFLOPS_BF16.items():
            if key.lower() in gpu_name.lower():
                peak = val
                break
    if peak <= 0 or duration_s <= 0:
        return 0.0
    actual_tflops = flops / (duration_s * 1e12)
    return (actual_tflops / (peak * num_gpus)) * 100.0

estimate_flops

estimate_flops(model: str, input_tokens: int, output_tokens: int) -> tuple[float, float]

Estimate FLOPs for an inference pass.

Uses the 2 * P * T approximation where P = params, T = total tokens. Returns (total_flops, flops_per_token).

Source code in src/openjarvis/telemetry/flops.py
def estimate_flops(
    model: str, input_tokens: int, output_tokens: int
) -> tuple[float, float]:
    """Estimate FLOPs for an inference pass.

    Uses the 2 * P * T approximation where P = params, T = total tokens.
    Returns (total_flops, flops_per_token).
    """
    params_b = MODEL_PARAMS_B.get(model, 0.0)
    if params_b == 0.0:
        # Try prefix matching
        for key, val in MODEL_PARAMS_B.items():
            if model.startswith(key.split(":")[0]):
                params_b = val
                break

    total_tokens = input_tokens + output_tokens
    params = params_b * 1e9
    total_flops = 2.0 * params * total_tokens
    flops_per_token = 2.0 * params if total_tokens > 0 else 0.0
    return (total_flops, flops_per_token)