Skip to content

steady_state

steady_state

Steady-state detection for energy measurement at thermal equilibrium.

Classes

SteadyStateConfig dataclass

SteadyStateConfig(warmup_samples: int = 5, window_size: int = 5, cv_threshold: float = 0.05, min_steady_samples: int = 3, metric: str = 'throughput')

Configuration for steady-state detection.

SteadyStateResult dataclass

SteadyStateResult(total_samples: int = 0, warmup_samples: int = 0, steady_state_samples: int = 0, steady_state_reached: bool = False, warmup_throughputs: List[float] = list(), warmup_energies: List[float] = list(), steady_throughputs: List[float] = list(), steady_energies: List[float] = list())

Result of steady-state detection.

SteadyStateDetector

SteadyStateDetector(config: SteadyStateConfig | None = None)

Detect steady state using coefficient of variation over a sliding window.

The first warmup_samples recordings are always classified as warmup. After warmup, the CV (stdev / mean) of the last window_size values is checked. When CV < cv_threshold for min_steady_samples consecutive checks, steady state is declared.

Source code in src/openjarvis/telemetry/steady_state.py
def __init__(self, config: SteadyStateConfig | None = None) -> None:
    self._config = config or SteadyStateConfig()
    self._throughputs: List[float] = []
    self._energies: List[float] = []
    self._consecutive_stable: int = 0
    self._steady_state_reached: bool = False
Attributes
result property

Return a snapshot of the detection state.

Functions
record
record(throughput: float, energy: float = 0.0, latency: float = 0.0) -> bool

Record a sample. Returns True when steady state is reached.

Source code in src/openjarvis/telemetry/steady_state.py
def record(
    self,
    throughput: float,
    energy: float = 0.0,
    latency: float = 0.0,
) -> bool:
    """Record a sample.  Returns ``True`` when steady state is reached."""
    self._throughputs.append(throughput)
    self._energies.append(energy)

    cfg = self._config

    # Still in warmup phase
    if len(self._throughputs) <= cfg.warmup_samples:
        return False

    # Already declared steady
    if self._steady_state_reached:
        return True

    # Not enough post-warmup samples for a full window yet
    post_warmup = self._throughputs[cfg.warmup_samples:]
    if len(post_warmup) < cfg.window_size:
        return False

    # Compute CV over the last window_size values
    window = post_warmup[-cfg.window_size:]
    mean = statistics.mean(window)
    if mean == 0:
        self._consecutive_stable = 0
        return False

    cv = statistics.stdev(window) / mean if len(window) > 1 else 0.0

    if cv < cfg.cv_threshold:
        self._consecutive_stable += 1
    else:
        self._consecutive_stable = 0

    if self._consecutive_stable >= cfg.min_steady_samples:
        self._steady_state_reached = True
        return True

    return False
reset
reset() -> None

Clear all recorded state.

Source code in src/openjarvis/telemetry/steady_state.py
def reset(self) -> None:
    """Clear all recorded state."""
    self._throughputs.clear()
    self._energies.clear()
    self._consecutive_stable = 0
    self._steady_state_reached = False