Skip to content

store

store

SQLite-backed storage for optimization runs and trials.

Classes

OptimizationStore

OptimizationStore(db_path: Union[str, Path])

SQLite-backed storage for optimization runs and trials.

Source code in src/openjarvis/learning/optimize/store.py
def __init__(self, db_path: Union[str, Path]) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(_CREATE_RUNS)
    self._conn.execute(_CREATE_TRIALS)
    self._conn.commit()
    self._migrate()
Functions
save_run
save_run(run: OptimizationRun) -> None

Persist an optimization run (insert or update).

Source code in src/openjarvis/learning/optimize/store.py
def save_run(self, run: OptimizationRun) -> None:
    """Persist an optimization run (insert or update)."""
    now = time.time()
    search_space_json = self._search_space_to_json(run.search_space)
    best_trial_id = run.best_trial.trial_id if run.best_trial else None
    pareto_ids = json.dumps([t.trial_id for t in run.pareto_frontier])
    self._conn.execute(
        _INSERT_RUN,
        (
            run.run_id,
            search_space_json,
            run.status,
            run.optimizer_model,
            run.benchmark,
            best_trial_id,
            run.best_recipe_path,
            now,
            now,
        ),
    )
    benchmarks_json = json.dumps(run.benchmarks)
    # Update pareto_frontier_ids and benchmarks separately
    self._conn.execute(
        "UPDATE optimization_runs SET pareto_frontier_ids = ?, "
        "benchmarks = ? WHERE run_id = ?",
        (pareto_ids, benchmarks_json, run.run_id),
    )
    self._conn.commit()
get_run
get_run(run_id: str) -> Optional[OptimizationRun]

Retrieve an optimization run by id, or None.

Source code in src/openjarvis/learning/optimize/store.py
def get_run(self, run_id: str) -> Optional[OptimizationRun]:
    """Retrieve an optimization run by id, or ``None``."""
    row = self._conn.execute(
        "SELECT * FROM optimization_runs WHERE run_id = ?",
        (run_id,),
    ).fetchone()
    if row is None:
        return None
    return self._row_to_run(row)
list_runs
list_runs(limit: int = 50) -> List[Dict[str, Any]]

Return summary dicts of recent optimization runs.

Source code in src/openjarvis/learning/optimize/store.py
def list_runs(self, limit: int = 50) -> List[Dict[str, Any]]:
    """Return summary dicts of recent optimization runs."""
    rows = self._conn.execute(
        "SELECT * FROM optimization_runs ORDER BY created_at DESC LIMIT ?",
        (limit,),
    ).fetchall()
    result: List[Dict[str, Any]] = []
    for row in rows:
        result.append(
            {
                "run_id": row[1],
                "status": row[3],
                "optimizer_model": row[4],
                "benchmark": row[5],
                "best_trial_id": row[6],
                "best_recipe_path": row[7],
                "created_at": row[8],
                "updated_at": row[9],
            }
        )
    return result
save_trial
save_trial(run_id: str, trial: TrialResult) -> None

Persist a single trial result.

Source code in src/openjarvis/learning/optimize/store.py
def save_trial(self, run_id: str, trial: TrialResult) -> None:
    """Persist a single trial result."""
    now = time.time()
    # Serialize sample_scores
    scores_json = json.dumps([
        {
            "record_id": s.record_id,
            "is_correct": s.is_correct,
            "score": s.score,
            "latency_seconds": s.latency_seconds,
            "prompt_tokens": s.prompt_tokens,
            "completion_tokens": s.completion_tokens,
            "cost_usd": s.cost_usd,
            "error": s.error,
            "ttft": s.ttft,
            "energy_joules": s.energy_joules,
            "power_watts": s.power_watts,
            "gpu_utilization_pct": s.gpu_utilization_pct,
            "throughput_tok_per_sec": s.throughput_tok_per_sec,
            "mfu_pct": s.mfu_pct,
            "mbu_pct": s.mbu_pct,
            "ipw": s.ipw,
            "ipj": s.ipj,
            "energy_per_output_token_joules": s.energy_per_output_token_joules,
            "throughput_per_watt": s.throughput_per_watt,
            "mean_itl_ms": s.mean_itl_ms,
        }
        for s in trial.sample_scores
    ])
    # Serialize structured_feedback
    fb = trial.structured_feedback
    fb_json = json.dumps({
        "summary_text": fb.summary_text,
        "failure_patterns": fb.failure_patterns,
        "primitive_ratings": fb.primitive_ratings,
        "suggested_changes": fb.suggested_changes,
        "target_primitive": fb.target_primitive,
    }) if fb else "{}"

    self._conn.execute(
        _INSERT_TRIAL,
        (
            trial.trial_id,
            run_id,
            json.dumps(trial.config.params),
            trial.config.reasoning,
            trial.accuracy,
            trial.mean_latency_seconds,
            trial.total_cost_usd,
            trial.total_energy_joules,
            trial.total_tokens,
            trial.samples_evaluated,
            trial.analysis,
            json.dumps(trial.failure_modes),
            now,
        ),
    )
    # Serialize per_benchmark
    pb_json = json.dumps([
        {
            "benchmark": b.benchmark,
            "accuracy": b.accuracy,
            "mean_latency_seconds": b.mean_latency_seconds,
            "total_cost_usd": b.total_cost_usd,
            "total_energy_joules": b.total_energy_joules,
            "total_tokens": b.total_tokens,
            "samples_evaluated": b.samples_evaluated,
            "errors": b.errors,
            "weight": b.weight,
        }
        for b in trial.per_benchmark
    ])

    # Update new columns separately
    self._conn.execute(
        "UPDATE trial_results SET sample_scores = ?, "
        "structured_feedback = ?, per_benchmark = ? "
        "WHERE trial_id = ? AND run_id = ?",
        (scores_json, fb_json, pb_json, trial.trial_id, run_id),
    )
    self._conn.commit()
get_trials
get_trials(run_id: str) -> List[TrialResult]

Retrieve all trial results for a given run.

Source code in src/openjarvis/learning/optimize/store.py
def get_trials(self, run_id: str) -> List[TrialResult]:
    """Retrieve all trial results for a given run."""
    rows = self._conn.execute(
        "SELECT * FROM trial_results WHERE run_id = ? ORDER BY id",
        (run_id,),
    ).fetchall()
    return [self._row_to_trial(r) for r in rows]
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/learning/optimize/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()