Skip to content

Index

learning

Learning primitive — router policies, reward functions, and trace-driven learning.

Classes

QueryAnalyzer

Bases: ABC

Query analysis for routing contexts.

Functions
analyze abstractmethod
analyze(query: str, **kwargs: object) -> 'RoutingContext'

Analyze a query and return a RoutingContext.

Source code in src/openjarvis/learning/_stubs.py
@abstractmethod
def analyze(self, query: str, **kwargs: object) -> "RoutingContext":
    """Analyze a query and return a RoutingContext."""

RewardFunction

Bases: ABC

Compute a scalar reward for a routing decision.

Functions
compute abstractmethod
compute(context: 'RoutingContext', model_key: str, response: str, **kwargs: object) -> float

Return reward in [0, 1].

Source code in src/openjarvis/learning/_stubs.py
@abstractmethod
def compute(
    self,
    context: "RoutingContext",
    model_key: str,
    response: str,
    **kwargs: object,
) -> float:
    """Return reward in [0, 1]."""

RouterPolicy

Bases: ABC

Model selection policy (used by the learning system).

Functions
select_model abstractmethod
select_model(context: 'RoutingContext') -> str

Select the best model key for the given routing context.

Source code in src/openjarvis/learning/_stubs.py
@abstractmethod
def select_model(self, context: "RoutingContext") -> str:
    """Select the best model key for the given routing context."""

RoutingContext dataclass

RoutingContext(query: str = '', query_length: int = 0, has_code: bool = False, has_math: bool = False, language: str = 'en', urgency: float = 0.5, metadata: Dict[str, Any] = dict())

Context describing a query for model routing decisions.

AgentConfigEvolver

AgentConfigEvolver(trace_store: TraceStore, *, config_dir: Union[str, Path], min_quality: float = 0.5)

Analyze traces to evolve agent TOML configs with versioning.

PARAMETER DESCRIPTION
trace_store

A :class:TraceStore used to fetch historical traces.

TYPE: TraceStore

config_dir

Directory where agent TOML configs are written.

TYPE: Union[str, Path]

min_quality

Minimum average feedback score for a recommendation to be emitted.

TYPE: float DEFAULT: 0.5

Source code in src/openjarvis/learning/agent_evolver.py
def __init__(
    self,
    trace_store: TraceStore,
    *,
    config_dir: Union[str, Path],
    min_quality: float = 0.5,
) -> None:
    self._store = trace_store
    self._config_dir = Path(config_dir)
    self._history_dir = self._config_dir / ".history"
    self._min_quality = min_quality

    self._config_dir.mkdir(parents=True, exist_ok=True)
    self._history_dir.mkdir(parents=True, exist_ok=True)
Functions
analyze
analyze() -> List[Dict[str, Any]]

Analyze traces, return recommendations per query class.

Returns a list of dicts, each containing: - query_class: the classified query category - recommended_tools: list of tool names sorted by frequency - recommended_agent: the best-performing agent for this class - recommended_max_turns: suggested max_turns value - sample_count: number of traces analyzed for this class

Source code in src/openjarvis/learning/agent_evolver.py
def analyze(self) -> List[Dict[str, Any]]:
    """Analyze traces, return recommendations per query class.

    Returns a list of dicts, each containing:
    - ``query_class``: the classified query category
    - ``recommended_tools``: list of tool names sorted by frequency
    - ``recommended_agent``: the best-performing agent for this class
    - ``recommended_max_turns``: suggested max_turns value
    - ``sample_count``: number of traces analyzed for this class
    """
    traces = self._store.list_traces(limit=10_000)
    if not traces:
        return []

    # Group traces by query class
    groups: Dict[str, List[Trace]] = defaultdict(list)
    for trace in traces:
        qclass = classify_query(trace.query)
        groups[qclass].append(trace)

    recommendations: List[Dict[str, Any]] = []
    for qclass, class_traces in sorted(groups.items()):
        rec = self._analyze_class(qclass, class_traces)
        if rec is not None:
            recommendations.append(rec)

    return recommendations
write_config
write_config(agent_name: str, *, tools: List[str], max_turns: int = 10, temperature: float = 0.3, system_prompt: str = '') -> Path

Write agent TOML config, archiving previous version first.

Returns the :class:Path to the written config file.

Source code in src/openjarvis/learning/agent_evolver.py
def write_config(
    self,
    agent_name: str,
    *,
    tools: List[str],
    max_turns: int = 10,
    temperature: float = 0.3,
    system_prompt: str = "",
) -> Path:
    """Write agent TOML config, archiving previous version first.

    Returns the :class:`Path` to the written config file.
    """
    config_path = self._config_dir / f"{agent_name}.toml"

    # Archive the existing config before overwriting
    if config_path.exists():
        self._archive(agent_name, config_path)

    # Build the TOML data
    data = {
        "agent": {
            "name": agent_name,
            "tools": tools,
            "max_turns": max_turns,
            "temperature": temperature,
            "system_prompt": system_prompt,
        }
    }

    _write_toml(config_path, data)
    return config_path
list_versions
list_versions(agent_name: str) -> List[Dict[str, Any]]

List all versions (including current) for agent_name.

Returns a list of dicts with version, path, and modified. Versions are numbered starting from 1 (oldest archived) through to the current (highest version number).

Source code in src/openjarvis/learning/agent_evolver.py
def list_versions(self, agent_name: str) -> List[Dict[str, Any]]:
    """List all versions (including current) for *agent_name*.

    Returns a list of dicts with ``version``, ``path``, and ``modified``.
    Versions are numbered starting from 1 (oldest archived) through to
    the current (highest version number).
    """
    versions: List[Dict[str, Any]] = []

    # Collect archived versions from .history/
    pattern = f"{agent_name}.v*.toml"
    archived = sorted(self._history_dir.glob(pattern))
    for idx, archived_path in enumerate(archived, start=1):
        versions.append({
            "version": idx,
            "path": str(archived_path),
            "modified": archived_path.stat().st_mtime,
        })

    # Current version
    current = self._config_dir / f"{agent_name}.toml"
    if current.exists():
        versions.append({
            "version": len(versions) + 1,
            "path": str(current),
            "modified": current.stat().st_mtime,
        })

    return versions
rollback
rollback(agent_name: str, version: int) -> None

Rollback to a specific version.

Raises :class:ValueError if the requested version does not exist.

Source code in src/openjarvis/learning/agent_evolver.py
def rollback(self, agent_name: str, version: int) -> None:
    """Rollback to a specific version.

    Raises :class:`ValueError` if the requested version does not exist.
    """
    versions = self.list_versions(agent_name)
    target = None
    for v in versions:
        if v["version"] == version:
            target = v
            break

    if target is None:
        raise ValueError(
            f"Version {version} not found for agent '{agent_name}'. "
            f"Available versions: {[v['version'] for v in versions]}"
        )

    target_path = Path(target["path"])
    config_path = self._config_dir / f"{agent_name}.toml"

    # If the target is already the current file, nothing to do
    if target_path == config_path:
        return

    # Archive current before rollback
    if config_path.exists():
        self._archive(agent_name, config_path)

    # Copy the target version to become the current config
    shutil.copy2(str(target_path), str(config_path))

HeuristicRewardFunction

HeuristicRewardFunction(*, weight_latency: float = 0.4, weight_cost: float = 0.3, weight_efficiency: float = 0.3, max_latency: float = 30.0, max_cost: float = 0.01)

Bases: RewardFunction

Computes a scalar reward based on latency, cost, and token efficiency.

Each component is normalised to [0, 1] and combined via a weighted sum.

Source code in src/openjarvis/learning/heuristic_reward.py
def __init__(
    self,
    *,
    weight_latency: float = 0.4,
    weight_cost: float = 0.3,
    weight_efficiency: float = 0.3,
    max_latency: float = 30.0,
    max_cost: float = 0.01,
) -> None:
    self.weight_latency = weight_latency
    self.weight_cost = weight_cost
    self.weight_efficiency = weight_efficiency
    self.max_latency = max_latency
    self.max_cost = max_cost

LearningOrchestrator

LearningOrchestrator(*, trace_store: Any, config_dir: Union[str, Path], eval_fn: Optional[Callable[[], float]] = None, min_improvement: float = 0.02, min_sft_pairs: int = 10, min_quality: float = 0.7, lora_config: Optional[Any] = None, model_name: Optional[str] = None)

Orchestrate a single trace->learn->eval cycle.

PARAMETER DESCRIPTION
trace_store

Object with list_traces(limit=...) returning List[Trace] (typically a :class:TraceStore).

TYPE: Any

config_dir

Directory where agent TOML configs are written / evolved.

TYPE: Union[str, Path]

eval_fn

Optional callable returning a float score (higher = better). Called before and after learning to gate acceptance.

TYPE: Optional[Callable[[], float]] DEFAULT: None

min_improvement

Minimum improvement in eval score required to accept the update.

TYPE: float DEFAULT: 0.02

min_sft_pairs

Minimum number of SFT pairs required to trigger LoRA training.

TYPE: int DEFAULT: 10

min_quality

Minimum feedback quality threshold for :class:TrainingDataMiner.

TYPE: float DEFAULT: 0.7

lora_config

Optional :class:LoRATrainingConfig. When provided (and enough SFT pairs exist and torch is available), LoRA training runs.

TYPE: Optional[Any] DEFAULT: None

model_name

Model name for LoRA training (passed to :class:LoRATrainer).

TYPE: Optional[str] DEFAULT: None

Source code in src/openjarvis/learning/learning_orchestrator.py
def __init__(
    self,
    *,
    trace_store: Any,
    config_dir: Union[str, Path],
    eval_fn: Optional[Callable[[], float]] = None,
    min_improvement: float = 0.02,
    min_sft_pairs: int = 10,
    min_quality: float = 0.7,
    lora_config: Optional[Any] = None,
    model_name: Optional[str] = None,
) -> None:
    from openjarvis.learning.agent_evolver import AgentConfigEvolver
    from openjarvis.learning.training.data import TrainingDataMiner

    self._trace_store = trace_store
    self._config_dir = Path(config_dir)
    self._eval_fn = eval_fn
    self._min_improvement = min_improvement
    self._min_sft_pairs = min_sft_pairs
    self._lora_config = lora_config
    self._model_name = model_name

    self._miner = TrainingDataMiner(trace_store, min_quality=min_quality)
    self._evolver = AgentConfigEvolver(
        trace_store, config_dir=self._config_dir
    )
Functions
run
run(*, agent_id: str | None = None) -> Dict[str, Any]

Execute one learning cycle.

PARAMETER DESCRIPTION
agent_id

When provided, only traces from this agent are considered.

TYPE: str | None DEFAULT: None

Returns

Steps
  1. Mine traces: extract sft_pairs, routing_pairs, agent_pairs
  2. If no data: return skipped
  3. Run baseline eval (if eval_fn provided)
  4. Update routing recommendations
  5. Evolve agent configs
  6. Run LoRA training (if lora_config provided AND enough pairs AND torch available)
  7. Run post-learning eval (if eval_fn provided)
  8. Accept/reject based on improvement threshold
Source code in src/openjarvis/learning/learning_orchestrator.py
def run(self, *, agent_id: str | None = None) -> Dict[str, Any]:
    """Execute one learning cycle.

    Parameters
    ----------
    agent_id:
        When provided, only traces from this agent are considered.

    Returns a dict with at least ``timestamp`` and ``status`` keys.

    Steps
    -----
    1. Mine traces: extract sft_pairs, routing_pairs, agent_pairs
    2. If no data: return skipped
    3. Run baseline eval (if eval_fn provided)
    4. Update routing recommendations
    5. Evolve agent configs
    6. Run LoRA training (if lora_config provided AND enough pairs
       AND torch available)
    7. Run post-learning eval (if eval_fn provided)
    8. Accept/reject based on improvement threshold
    """
    result: Dict[str, Any] = {
        "timestamp": time.time(),
    }

    # 1. Mine training data from traces
    sft_pairs = self._miner.extract_sft_pairs(agent=agent_id)
    routing_pairs = self._miner.extract_routing_pairs(agent=agent_id)
    agent_pairs = self._miner.extract_agent_config_pairs(agent=agent_id)

    result["sft_pairs"] = len(sft_pairs)
    result["routing_classes"] = len(routing_pairs)
    result["agent_classes"] = len(agent_pairs)

    # 2. Check if there is any data at all
    total_data = len(sft_pairs) + len(routing_pairs) + len(agent_pairs)
    if total_data == 0:
        result["status"] = "skipped"
        result["reason"] = "no training data available"
        return result

    # 3. Run baseline eval
    baseline_score: Optional[float] = None
    if self._eval_fn is not None:
        baseline_score = self._eval_fn()
        result["baseline_score"] = baseline_score

    # 4. Update routing recommendations
    result["routing_updated"] = len(routing_pairs) > 0

    # 5. Evolve agent configs
    recommendations = self._evolver.analyze()
    result["agent_configs_evolved"] = len(recommendations) > 0
    for rec in recommendations:
        agent_name = rec.get("recommended_agent", "default")
        tools = rec.get("recommended_tools", [])
        max_turns = rec.get("recommended_max_turns", 10)
        self._evolver.write_config(
            agent_name, tools=tools, max_turns=max_turns
        )

    # 6. LoRA training (optional)
    result["lora_training"] = None
    if (
        self._lora_config is not None
        and len(sft_pairs) >= self._min_sft_pairs
    ):
        lora_result = self._try_lora_training(sft_pairs)
        result["lora_training"] = lora_result

    # 7. Post-learning eval
    post_score: Optional[float] = None
    if self._eval_fn is not None:
        post_score = self._eval_fn()
        result["post_score"] = post_score

    # 8. Accept/reject based on improvement
    if baseline_score is not None and post_score is not None:
        improvement = post_score - baseline_score
        result["improvement"] = improvement
        if improvement >= self._min_improvement:
            result["accepted"] = True
            result["status"] = "completed"
        else:
            result["accepted"] = False
            result["status"] = "rejected"
            result["reason"] = (
                f"eval improvement {improvement:.4f} below "
                f"threshold {self._min_improvement}"
            )
    else:
        # No eval gate — always accept
        result["accepted"] = True
        result["status"] = "completed"

    return result

LLMOptimizer

LLMOptimizer(search_space: SearchSpace, optimizer_model: str = 'claude-sonnet-4-6', optimizer_backend: Optional[InferenceBackend] = None)

Uses a cloud LLM to propose optimal OpenJarvis configs.

Inspired by DSPy's GEPA: uses textual feedback from execution traces rather than just scalar rewards.

Source code in src/openjarvis/learning/optimize/llm_optimizer.py
def __init__(
    self,
    search_space: SearchSpace,
    optimizer_model: str = "claude-sonnet-4-6",
    optimizer_backend: Optional[InferenceBackend] = None,
) -> None:
    self.search_space = search_space
    self.optimizer_model = optimizer_model
    self.optimizer_backend = optimizer_backend
Functions
propose_initial
propose_initial() -> TrialConfig

Propose a reasonable starting config from the search space.

Source code in src/openjarvis/learning/optimize/llm_optimizer.py
def propose_initial(self) -> TrialConfig:
    """Propose a reasonable starting config from the search space."""
    if self.optimizer_backend is None:
        raise ValueError(
            "optimizer_backend is required to propose configurations"
        )

    prompt = self._build_initial_prompt()
    response = self.optimizer_backend.generate(
        prompt,
        model=self.optimizer_model,
        system="You are an expert AI systems optimizer.",
        temperature=0.7,
        max_tokens=2048,
    )
    return self._parse_config_response(response)
propose_next
propose_next(history: List[TrialResult], traces: Optional[List[Trace]] = None, frontier_ids: Optional[set] = None) -> TrialConfig

Ask the LLM to propose the next config to evaluate.

Source code in src/openjarvis/learning/optimize/llm_optimizer.py
def propose_next(
    self,
    history: List[TrialResult],
    traces: Optional[List[Trace]] = None,
    frontier_ids: Optional[set] = None,
) -> TrialConfig:
    """Ask the LLM to propose the next config to evaluate."""
    if self.optimizer_backend is None:
        raise ValueError(
            "optimizer_backend is required to propose configurations"
        )

    prompt = self._build_propose_prompt(history, traces, frontier_ids=frontier_ids)
    response = self.optimizer_backend.generate(
        prompt,
        model=self.optimizer_model,
        system="You are an expert AI systems optimizer.",
        temperature=0.7,
        max_tokens=2048,
    )
    return self._parse_config_response(response)
analyze_trial
analyze_trial(trial: TrialConfig, summary: RunSummary, traces: Optional[List[Trace]] = None, sample_scores: Optional[List[SampleScore]] = None, per_benchmark: Optional[List[BenchmarkScore]] = None) -> TrialFeedback

Ask the LLM to analyze a completed trial. Returns structured feedback.

Source code in src/openjarvis/learning/optimize/llm_optimizer.py
def analyze_trial(
    self,
    trial: TrialConfig,
    summary: RunSummary,
    traces: Optional[List[Trace]] = None,
    sample_scores: Optional[List[SampleScore]] = None,
    per_benchmark: Optional[List[BenchmarkScore]] = None,
) -> TrialFeedback:
    """Ask the LLM to analyze a completed trial. Returns structured feedback."""
    if self.optimizer_backend is None:
        raise ValueError(
            "optimizer_backend is required to analyze trials"
        )

    prompt = self._build_analyze_prompt(
        trial, summary, traces, sample_scores, per_benchmark,
    )
    response = self.optimizer_backend.generate(
        prompt,
        model=self.optimizer_model,
        system="You are an expert AI systems analyst.",
        temperature=0.3,
        max_tokens=2048,
    )
    return self._parse_feedback_response(response)
propose_targeted
propose_targeted(history: List[TrialResult], base_config: TrialConfig, target_primitive: str, frontier_ids: Optional[set] = None) -> TrialConfig

Propose a config that only changes one primitive.

Source code in src/openjarvis/learning/optimize/llm_optimizer.py
def propose_targeted(
    self,
    history: List[TrialResult],
    base_config: TrialConfig,
    target_primitive: str,
    frontier_ids: Optional[set] = None,
) -> TrialConfig:
    """Propose a config that only changes one primitive."""
    if self.optimizer_backend is None:
        raise ValueError(
            "optimizer_backend is required to propose configurations"
        )

    prompt = self._build_targeted_prompt(
        history, base_config, target_primitive, frontier_ids,
    )
    response = self.optimizer_backend.generate(
        prompt,
        model=self.optimizer_model,
        system="You are an expert AI systems optimizer.",
        temperature=0.7,
        max_tokens=2048,
    )
    proposed = self._parse_config_response(response)

    # Enforce constraint: preserve non-target params from base_config
    merged_params = dict(base_config.params)
    for key, value in proposed.params.items():
        if key.startswith(target_primitive + ".") or key.startswith(
            target_primitive.rstrip("s") + "."
        ):
            merged_params[key] = value
    proposed.params = merged_params
    return proposed
propose_merge
propose_merge(candidates: List[TrialResult], history: List[TrialResult], frontier_ids: Optional[set] = None) -> TrialConfig

Combine best aspects of frontier members into one config.

Source code in src/openjarvis/learning/optimize/llm_optimizer.py
def propose_merge(
    self,
    candidates: List[TrialResult],
    history: List[TrialResult],
    frontier_ids: Optional[set] = None,
) -> TrialConfig:
    """Combine best aspects of frontier members into one config."""
    if self.optimizer_backend is None:
        raise ValueError(
            "optimizer_backend is required to propose configurations"
        )

    prompt = self._build_merge_prompt(candidates, history, frontier_ids)
    response = self.optimizer_backend.generate(
        prompt,
        model=self.optimizer_model,
        system="You are an expert AI systems optimizer.",
        temperature=0.7,
        max_tokens=2048,
    )
    return self._parse_config_response(response)

OptimizationEngine

OptimizationEngine(search_space: SearchSpace, llm_optimizer: LLMOptimizer, trial_runner: TrialRunner, store: Optional[OptimizationStore] = None, max_trials: int = 20, early_stop_patience: int = 5)

Orchestrates the optimize loop: propose -> evaluate -> analyze -> repeat.

Source code in src/openjarvis/learning/optimize/optimizer.py
def __init__(
    self,
    search_space: SearchSpace,
    llm_optimizer: LLMOptimizer,
    trial_runner: TrialRunner,
    store: Optional[OptimizationStore] = None,
    max_trials: int = 20,
    early_stop_patience: int = 5,
) -> None:
    self.search_space = search_space
    self.llm_optimizer = llm_optimizer
    self.trial_runner = trial_runner
    self.store = store
    self.max_trials = max_trials
    self.early_stop_patience = early_stop_patience
Functions
run
run(progress_callback: Optional[Callable[[int, int], None]] = None) -> OptimizationRun

Execute the full optimization loop.

  1. Generate a run_id via uuid.
  2. llm_optimizer.propose_initial() -> first config.
  3. Loop up to max_trials: a. trial_runner.run_trial(config) -> TrialResult b. llm_optimizer.analyze_trial(config, summary, traces) c. Update TrialResult with analysis text d. Append to history e. If store, store.save_trial(result) f. Update best_trial if accuracy improved g. Check early stopping (no improvement for patience trials) h. If not stopped, llm_optimizer.propose_next(history)
  4. Set run status to "completed".
  5. If store, store.save_run(optimization_run).
  6. Return the :class:OptimizationRun.

Args: progress_callback: Optional (trial_num, max_trials) -> None called after each trial completes.

Source code in src/openjarvis/learning/optimize/optimizer.py
def run(
    self,
    progress_callback: Optional[Callable[[int, int], None]] = None,
) -> OptimizationRun:
    """Execute the full optimization loop.

    1. Generate a run_id via uuid.
    2. ``llm_optimizer.propose_initial()`` -> first config.
    3. Loop up to ``max_trials``:
       a. ``trial_runner.run_trial(config)`` -> TrialResult
       b. ``llm_optimizer.analyze_trial(config, summary, traces)``
       c. Update TrialResult with analysis text
       d. Append to history
       e. If store, ``store.save_trial(result)``
       f. Update best_trial if accuracy improved
       g. Check early stopping (no improvement for *patience* trials)
       h. If not stopped, ``llm_optimizer.propose_next(history)``
    4. Set run status to ``"completed"``.
    5. If store, ``store.save_run(optimization_run)``.
    6. Return the :class:`OptimizationRun`.

    Args:
        progress_callback: Optional ``(trial_num, max_trials) -> None``
            called after each trial completes.
    """
    run_id = uuid.uuid4().hex[:16]
    # Detect benchmark name(s) from the trial runner
    from openjarvis.learning.optimize.trial_runner import MultiBenchTrialRunner

    benchmark_name = getattr(self.trial_runner, "benchmark", "")
    benchmark_names: List[str] = []
    if isinstance(self.trial_runner, MultiBenchTrialRunner):
        benchmark_names = [
            s.benchmark for s in self.trial_runner.benchmark_specs
        ]
        benchmark_name = "+".join(benchmark_names)

    optimization_run = OptimizationRun(
        run_id=run_id,
        search_space=self.search_space,
        status="running",
        optimizer_model=self.llm_optimizer.optimizer_model,
        benchmark=benchmark_name,
        benchmarks=benchmark_names,
    )

    history: List[TrialResult] = []
    best_accuracy = -1.0
    trials_without_improvement = 0

    # First config
    config = self.llm_optimizer.propose_initial()

    for trial_num in range(1, self.max_trials + 1):
        LOGGER.info(
            "Trial %d/%d (id=%s)",
            trial_num,
            self.max_trials,
            config.trial_id,
        )

        # Evaluate
        result = self.trial_runner.run_trial(config)

        # Analyze — returns TrialFeedback
        if result.summary is not None:
            feedback = self.llm_optimizer.analyze_trial(
                config,
                result.summary,
                sample_scores=result.sample_scores or None,
                per_benchmark=result.per_benchmark or None,
            )
            result.structured_feedback = feedback
            result.analysis = feedback.summary_text
        elif result.per_benchmark:
            # Multi-benchmark composite: build a synthetic summary
            # for analysis from per_benchmark data
            from openjarvis.evals.core.types import RunSummary as _RS

            synth = _RS(
                benchmark="multi",
                category="multi",
                backend="jarvis-agent",
                model=result.config.params.get("intelligence.model", ""),
                accuracy=result.accuracy,
                mean_latency_seconds=result.mean_latency_seconds,
                total_cost_usd=result.total_cost_usd,
                total_energy_joules=result.total_energy_joules,
                total_samples=result.samples_evaluated,
                scored_samples=result.samples_evaluated,
                correct=int(
                    result.accuracy * result.samples_evaluated
                ),
                errors=0,
                total_input_tokens=0,
                total_output_tokens=result.total_tokens,
            )
            feedback = self.llm_optimizer.analyze_trial(
                config,
                synth,
                per_benchmark=result.per_benchmark,
            )
            result.structured_feedback = feedback
            result.analysis = feedback.summary_text
        else:
            result.analysis = ""

        # Record
        history.append(result)
        optimization_run.trials.append(result)

        # Recompute Pareto frontier
        optimization_run.pareto_frontier = compute_pareto_frontier(
            history, optimization_run.objectives,
        )
        frontier_ids = {t.trial_id for t in optimization_run.pareto_frontier}

        # Persist trial
        if self.store is not None:
            self.store.save_trial(run_id, result)

        # Track best
        if result.accuracy > best_accuracy:
            best_accuracy = result.accuracy
            optimization_run.best_trial = result
            trials_without_improvement = 0
        else:
            trials_without_improvement += 1

        # Progress callback
        if progress_callback is not None:
            progress_callback(trial_num, self.max_trials)

        # Early stopping
        if trials_without_improvement >= self.early_stop_patience:
            LOGGER.info(
                "Early stopping after %d trials without improvement.",
                self.early_stop_patience,
            )
            break

        # Propose next (unless this was the last trial)
        if trial_num < self.max_trials:
            # Decide proposal strategy
            target_primitive = ""
            if result.structured_feedback:
                target_primitive = result.structured_feedback.target_primitive

            if (
                trial_num % 5 == 0
                and len(optimization_run.pareto_frontier) >= 2
            ):
                # Merge frontier members periodically
                candidates = optimization_run.pareto_frontier[:3]
                config = self.llm_optimizer.propose_merge(
                    candidates, history, frontier_ids=frontier_ids,
                )
            elif target_primitive and trial_num > 2:
                # Targeted mutation on the suggested primitive
                config = self.llm_optimizer.propose_targeted(
                    history,
                    result.config,
                    target_primitive,
                    frontier_ids=frontier_ids,
                )
            else:
                config = self.llm_optimizer.propose_next(
                    history, frontier_ids=frontier_ids,
                )

    optimization_run.status = "completed"

    if self.store is not None:
        self.store.save_run(optimization_run)

    return optimization_run
export_best_recipe
export_best_recipe(run: OptimizationRun, path: Path) -> Path

Export the best trial's config as a TOML recipe file.

Args: run: A completed :class:OptimizationRun. path: Destination path for the TOML file.

Returns: The path written to.

Raises: ValueError: If there is no best trial in the run.

Source code in src/openjarvis/learning/optimize/optimizer.py
def export_best_recipe(
    self, run: OptimizationRun, path: Path
) -> Path:
    """Export the best trial's config as a TOML recipe file.

    Args:
        run: A completed :class:`OptimizationRun`.
        path: Destination path for the TOML file.

    Returns:
        The *path* written to.

    Raises:
        ValueError: If there is no best trial in the run.
    """
    if run.best_trial is None:
        raise ValueError("No best trial to export.")

    recipe_data = self._trial_to_recipe_dict(run.best_trial)
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)

    if tomli_w is not None:
        with open(path, "wb") as fh:
            tomli_w.dump(recipe_data, fh)
    else:
        # Fallback: write TOML manually
        self._write_toml_fallback(recipe_data, path)

    run.best_recipe_path = str(path)
    return path

OptimizationStore

OptimizationStore(db_path: Union[str, Path])

SQLite-backed storage for optimization runs and trials.

Source code in src/openjarvis/learning/optimize/store.py
def __init__(self, db_path: Union[str, Path]) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(_CREATE_RUNS)
    self._conn.execute(_CREATE_TRIALS)
    self._conn.commit()
    self._migrate()
Functions
save_run
save_run(run: OptimizationRun) -> None

Persist an optimization run (insert or update).

Source code in src/openjarvis/learning/optimize/store.py
def save_run(self, run: OptimizationRun) -> None:
    """Persist an optimization run (insert or update)."""
    now = time.time()
    search_space_json = self._search_space_to_json(run.search_space)
    best_trial_id = run.best_trial.trial_id if run.best_trial else None
    pareto_ids = json.dumps([t.trial_id for t in run.pareto_frontier])
    self._conn.execute(
        _INSERT_RUN,
        (
            run.run_id,
            search_space_json,
            run.status,
            run.optimizer_model,
            run.benchmark,
            best_trial_id,
            run.best_recipe_path,
            now,
            now,
        ),
    )
    benchmarks_json = json.dumps(run.benchmarks)
    # Update pareto_frontier_ids and benchmarks separately
    self._conn.execute(
        "UPDATE optimization_runs SET pareto_frontier_ids = ?, "
        "benchmarks = ? WHERE run_id = ?",
        (pareto_ids, benchmarks_json, run.run_id),
    )
    self._conn.commit()
get_run
get_run(run_id: str) -> Optional[OptimizationRun]

Retrieve an optimization run by id, or None.

Source code in src/openjarvis/learning/optimize/store.py
def get_run(self, run_id: str) -> Optional[OptimizationRun]:
    """Retrieve an optimization run by id, or ``None``."""
    row = self._conn.execute(
        "SELECT * FROM optimization_runs WHERE run_id = ?",
        (run_id,),
    ).fetchone()
    if row is None:
        return None
    return self._row_to_run(row)
list_runs
list_runs(limit: int = 50) -> List[Dict[str, Any]]

Return summary dicts of recent optimization runs.

Source code in src/openjarvis/learning/optimize/store.py
def list_runs(self, limit: int = 50) -> List[Dict[str, Any]]:
    """Return summary dicts of recent optimization runs."""
    rows = self._conn.execute(
        "SELECT * FROM optimization_runs ORDER BY created_at DESC LIMIT ?",
        (limit,),
    ).fetchall()
    result: List[Dict[str, Any]] = []
    for row in rows:
        result.append(
            {
                "run_id": row[1],
                "status": row[3],
                "optimizer_model": row[4],
                "benchmark": row[5],
                "best_trial_id": row[6],
                "best_recipe_path": row[7],
                "created_at": row[8],
                "updated_at": row[9],
            }
        )
    return result
save_trial
save_trial(run_id: str, trial: TrialResult) -> None

Persist a single trial result.

Source code in src/openjarvis/learning/optimize/store.py
def save_trial(self, run_id: str, trial: TrialResult) -> None:
    """Persist a single trial result."""
    now = time.time()
    # Serialize sample_scores
    scores_json = json.dumps([
        {
            "record_id": s.record_id,
            "is_correct": s.is_correct,
            "score": s.score,
            "latency_seconds": s.latency_seconds,
            "prompt_tokens": s.prompt_tokens,
            "completion_tokens": s.completion_tokens,
            "cost_usd": s.cost_usd,
            "error": s.error,
            "ttft": s.ttft,
            "energy_joules": s.energy_joules,
            "power_watts": s.power_watts,
            "gpu_utilization_pct": s.gpu_utilization_pct,
            "throughput_tok_per_sec": s.throughput_tok_per_sec,
            "mfu_pct": s.mfu_pct,
            "mbu_pct": s.mbu_pct,
            "ipw": s.ipw,
            "ipj": s.ipj,
            "energy_per_output_token_joules": s.energy_per_output_token_joules,
            "throughput_per_watt": s.throughput_per_watt,
            "mean_itl_ms": s.mean_itl_ms,
        }
        for s in trial.sample_scores
    ])
    # Serialize structured_feedback
    fb = trial.structured_feedback
    fb_json = json.dumps({
        "summary_text": fb.summary_text,
        "failure_patterns": fb.failure_patterns,
        "primitive_ratings": fb.primitive_ratings,
        "suggested_changes": fb.suggested_changes,
        "target_primitive": fb.target_primitive,
    }) if fb else "{}"

    self._conn.execute(
        _INSERT_TRIAL,
        (
            trial.trial_id,
            run_id,
            json.dumps(trial.config.params),
            trial.config.reasoning,
            trial.accuracy,
            trial.mean_latency_seconds,
            trial.total_cost_usd,
            trial.total_energy_joules,
            trial.total_tokens,
            trial.samples_evaluated,
            trial.analysis,
            json.dumps(trial.failure_modes),
            now,
        ),
    )
    # Serialize per_benchmark
    pb_json = json.dumps([
        {
            "benchmark": b.benchmark,
            "accuracy": b.accuracy,
            "mean_latency_seconds": b.mean_latency_seconds,
            "total_cost_usd": b.total_cost_usd,
            "total_energy_joules": b.total_energy_joules,
            "total_tokens": b.total_tokens,
            "samples_evaluated": b.samples_evaluated,
            "errors": b.errors,
            "weight": b.weight,
        }
        for b in trial.per_benchmark
    ])

    # Update new columns separately
    self._conn.execute(
        "UPDATE trial_results SET sample_scores = ?, "
        "structured_feedback = ?, per_benchmark = ? "
        "WHERE trial_id = ? AND run_id = ?",
        (scores_json, fb_json, pb_json, trial.trial_id, run_id),
    )
    self._conn.commit()
get_trials
get_trials(run_id: str) -> List[TrialResult]

Retrieve all trial results for a given run.

Source code in src/openjarvis/learning/optimize/store.py
def get_trials(self, run_id: str) -> List[TrialResult]:
    """Retrieve all trial results for a given run."""
    rows = self._conn.execute(
        "SELECT * FROM trial_results WHERE run_id = ? ORDER BY id",
        (run_id,),
    ).fetchall()
    return [self._row_to_trial(r) for r in rows]
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/learning/optimize/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()

HeuristicRouter

HeuristicRouter(available_models: List[str] | None = None, *, default_model: str = '', fallback_model: str = '')

Bases: RouterPolicy

Rule-based model router.

Rules (applied in order): 1. Code detected → prefer model with "code"/"coder" in name 2. Math detected → prefer larger model 3. Short query (<50 chars, no code/math) → prefer smaller/faster model 4. Long/complex query (>500 chars OR reasoning keywords) → prefer larger model 5. High urgency (>0.8) → override to smaller model 6. Default fallback → default_model → fallback_model → first available

Source code in src/openjarvis/learning/router.py
def __init__(
    self,
    available_models: List[str] | None = None,
    *,
    default_model: str = "",
    fallback_model: str = "",
) -> None:
    self._available = available_models or []
    self._default = default_model
    self._fallback = fallback_model

TrainingDataMiner

TrainingDataMiner(trace_store: Any, *, min_quality: float = 0.7, min_samples_per_class: int = 1)

Extract supervised training pairs from stored traces.

PARAMETER DESCRIPTION
trace_store

Any object with a list_traces(limit=...) method returning List[Trace] (typically a :class:TraceStore).

TYPE: Any

min_quality

Minimum feedback score for a trace to be included.

TYPE: float DEFAULT: 0.7

min_samples_per_class

Minimum number of samples a query class must have to appear in routing/agent-config results.

TYPE: int DEFAULT: 1

Source code in src/openjarvis/learning/training/data.py
def __init__(
    self,
    trace_store: Any,
    *,
    min_quality: float = 0.7,
    min_samples_per_class: int = 1,
) -> None:
    self._store = trace_store
    self._min_quality = min_quality
    self._min_samples_per_class = min_samples_per_class
Functions
extract_sft_pairs
extract_sft_pairs(*, agent: str | None = None) -> List[Dict[str, Any]]

Return SFT training pairs from high-quality traces.

Each entry is a dict with keys: input, output, query_class, model, feedback.

Duplicate (input, output) pairs are collapsed; the first occurrence is kept.

Source code in src/openjarvis/learning/training/data.py
def extract_sft_pairs(self, *, agent: str | None = None) -> List[Dict[str, Any]]:
    """Return SFT training pairs from high-quality traces.

    Each entry is a dict with keys: ``input``, ``output``,
    ``query_class``, ``model``, ``feedback``.

    Duplicate ``(input, output)`` pairs are collapsed; the first
    occurrence is kept.
    """
    traces = self._quality_traces(agent=agent)
    seen: set[tuple[str, str]] = set()
    pairs: List[Dict[str, Any]] = []

    for t in traces:
        key = (t.query, t.result)
        if key in seen:
            continue
        seen.add(key)
        pairs.append(
            {
                "input": t.query,
                "output": t.result,
                "query_class": classify_query(t.query),
                "model": t.model,
                "feedback": t.feedback,
            }
        )

    return pairs
extract_routing_pairs
extract_routing_pairs(*, agent: str | None = None) -> Dict[str, Dict[str, Any]]

Return per-query-class routing recommendations.

Returns a dict mapping query class to:

  • best_model — model with highest average feedback for the class.
  • avg_feedback — average feedback across all models for the class.
  • sample_count — total number of qualifying traces in the class.
  • all_models — dict of {model: {"avg_feedback": float, "count": int}}.
Source code in src/openjarvis/learning/training/data.py
def extract_routing_pairs(
    self, *, agent: str | None = None
) -> Dict[str, Dict[str, Any]]:
    """Return per-query-class routing recommendations.

    Returns a dict mapping query class to:

    * ``best_model`` — model with highest average feedback for the class.
    * ``avg_feedback`` — average feedback across all models for the class.
    * ``sample_count`` — total number of qualifying traces in the class.
    * ``all_models`` — dict of ``{model: {"avg_feedback": float, "count": int}}``.
    """
    traces = self._quality_traces(agent=agent)

    # Accumulate per (query_class, model) feedback scores
    class_model_scores: Dict[str, Dict[str, List[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    for t in traces:
        qc = classify_query(t.query)
        class_model_scores[qc][t.model].append(t.feedback)  # type: ignore[arg-type]

    result: Dict[str, Dict[str, Any]] = {}
    for qc, model_scores in class_model_scores.items():
        total_count = sum(len(scores) for scores in model_scores.values())
        if total_count < self._min_samples_per_class:
            continue

        all_models: Dict[str, Dict[str, Any]] = {}
        best_model = ""
        best_avg = -1.0

        for model, scores in model_scores.items():
            avg = sum(scores) / len(scores)
            all_models[model] = {"avg_feedback": avg, "count": len(scores)}
            if avg > best_avg:
                best_avg = avg
                best_model = model

        total_scores = [s for scores in model_scores.values() for s in scores]
        overall_avg = sum(total_scores) / len(total_scores) if total_scores else 0.0

        result[qc] = {
            "best_model": best_model,
            "avg_feedback": overall_avg,
            "sample_count": total_count,
            "all_models": all_models,
        }

    return result
extract_agent_config_pairs
extract_agent_config_pairs(*, agent: str | None = None) -> Dict[str, Dict[str, Any]]

Return per-query-class agent and tool recommendations.

Returns a dict mapping query class to:

  • best_agent — agent with the highest average feedback.
  • best_tools — most frequently used tools by the best agent.
  • avg_feedback — average feedback across all agents for the class.
  • sample_count — total number of qualifying traces in the class.
Source code in src/openjarvis/learning/training/data.py
def extract_agent_config_pairs(
    self, *, agent: str | None = None
) -> Dict[str, Dict[str, Any]]:
    """Return per-query-class agent and tool recommendations.

    Returns a dict mapping query class to:

    * ``best_agent`` — agent with the highest average feedback.
    * ``best_tools`` — most frequently used tools by the best agent.
    * ``avg_feedback`` — average feedback across all agents for the class.
    * ``sample_count`` — total number of qualifying traces in the class.
    """
    traces = self._quality_traces(agent=agent)

    # Accumulate per (query_class, agent) feedback and tools
    class_agent_scores: Dict[str, Dict[str, List[float]]] = defaultdict(
        lambda: defaultdict(list)
    )
    class_agent_tools: Dict[str, Dict[str, List[List[str]]]] = defaultdict(
        lambda: defaultdict(list)
    )

    for t in traces:
        qc = classify_query(t.query)
        class_agent_scores[qc][t.agent].append(t.feedback)  # type: ignore[arg-type]
        tools = self._tools_from_trace(t)
        class_agent_tools[qc][t.agent].append(tools)

    result: Dict[str, Dict[str, Any]] = {}
    for qc, agent_scores in class_agent_scores.items():
        total_count = sum(len(scores) for scores in agent_scores.values())
        if total_count < self._min_samples_per_class:
            continue

        best_agent = ""
        best_avg = -1.0
        for agent, scores in agent_scores.items():
            avg = sum(scores) / len(scores)
            if avg > best_avg:
                best_avg = avg
                best_agent = agent

        # Collect tool frequency for best agent
        tool_freq: Dict[str, int] = defaultdict(int)
        for tool_list in class_agent_tools[qc].get(best_agent, []):
            for tool in tool_list:
                tool_freq[tool] += 1

        best_tools = sorted(tool_freq, key=tool_freq.get, reverse=True)  # type: ignore[arg-type]

        total_scores = [s for scores in agent_scores.values() for s in scores]
        overall_avg = sum(total_scores) / len(total_scores) if total_scores else 0.0

        result[qc] = {
            "best_agent": best_agent,
            "best_tools": best_tools,
            "avg_feedback": overall_avg,
            "sample_count": total_count,
        }

    return result

LoRATrainer

LoRATrainer(config: LoRATrainingConfig, *, model_name: str = 'Qwen/Qwen3-0.6B', device: Optional[str] = None)

Fine-tune a local causal LM with LoRA (or QLoRA) adapters.

PARAMETER DESCRIPTION
config

LoRA training configuration.

TYPE: LoRATrainingConfig

model_name

HuggingFace model identifier or local path.

TYPE: str DEFAULT: 'Qwen/Qwen3-0.6B'

device

PyTorch device string. None auto-detects (cuda > mps > cpu).

TYPE: Optional[str] DEFAULT: None

RAISES DESCRIPTION
ImportError

If torch is not installed.

Source code in src/openjarvis/learning/training/lora.py
def __init__(
    self,
    config: LoRATrainingConfig,
    *,
    model_name: str = "Qwen/Qwen3-0.6B",
    device: Optional[str] = None,
) -> None:
    if not HAS_TORCH:
        raise ImportError(
            "torch is required for LoRATrainer. "
            "Install with: pip install torch transformers peft"
        )

    self.config = config
    self.model_name = model_name
    self.device = _select_device(device)
    self.tokenizer: Any = None
    self.model: Any = None
Functions
prepare_dataset
prepare_dataset(pairs: List[Dict[str, Any]]) -> List[Dict[str, Any]]

Convert SFT pairs to tokenized examples.

Each returned dict contains input_ids, attention_mask, and text (the raw formatted string before tokenization).

PARAMETER DESCRIPTION
pairs

List of dicts with at least input and output keys, as produced by :class:TrainingDataMiner.extract_sft_pairs.

TYPE: List[Dict[str, Any]]

Source code in src/openjarvis/learning/training/lora.py
def prepare_dataset(
    self, pairs: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """Convert SFT pairs to tokenized examples.

    Each returned dict contains ``input_ids``, ``attention_mask``,
    and ``text`` (the raw formatted string before tokenization).

    Parameters
    ----------
    pairs:
        List of dicts with at least ``input`` and ``output`` keys,
        as produced by :class:`TrainingDataMiner.extract_sft_pairs`.
    """
    self._ensure_tokenizer()

    dataset: List[Dict[str, Any]] = []
    for pair in pairs:
        text = self._format_pair(pair)
        encoding = self.tokenizer(
            text,
            truncation=True,
            max_length=self.config.max_seq_length,
            padding="max_length",
            return_tensors="pt",
        )
        dataset.append({
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "text": text,
        })

    return dataset
train
train(pairs: List[Dict[str, Any]]) -> Dict[str, Any]

Run LoRA fine-tuning on the given SFT pairs.

PARAMETER DESCRIPTION
pairs

List of dicts with at least input and output keys.

TYPE: List[Dict[str, Any]]

RETURNS DESCRIPTION
dict

Training summary with keys: status, epochs, total_steps, avg_loss, adapter_path, training_samples.

Source code in src/openjarvis/learning/training/lora.py
def train(self, pairs: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Run LoRA fine-tuning on the given SFT pairs.

    Parameters
    ----------
    pairs:
        List of dicts with at least ``input`` and ``output`` keys.

    Returns
    -------
    dict
        Training summary with keys: ``status``, ``epochs``,
        ``total_steps``, ``avg_loss``, ``adapter_path``,
        ``training_samples``.
    """
    if not pairs:
        return {"status": "skipped", "reason": "no training data"}

    dataset = self.prepare_dataset(pairs)
    self._load_model()
    self._apply_lora()

    optimizer = torch.optim.AdamW(
        self.model.parameters(),
        lr=self.config.learning_rate,
        weight_decay=self.config.weight_decay,
    )

    total_steps = 0
    cumulative_loss = 0.0
    num_loss_steps = 0

    self.model.train()

    for epoch in range(self.config.num_epochs):
        epoch_loss = self._train_epoch(dataset, optimizer)
        steps_in_epoch = max(
            1, (len(dataset) + self.config.batch_size - 1) // self.config.batch_size
        )
        total_steps += steps_in_epoch
        cumulative_loss += epoch_loss * steps_in_epoch
        num_loss_steps += steps_in_epoch

        logger.info(
            "Epoch %d/%d  loss=%.4f",
            epoch + 1,
            self.config.num_epochs,
            epoch_loss,
        )

        if (epoch + 1) % self.config.save_every_n_epochs == 0:
            self._save_adapter(epoch + 1)

    avg_loss = cumulative_loss / num_loss_steps if num_loss_steps else 0.0
    adapter_path = str(Path(self.config.output_dir) / "final")
    self._save_adapter_to(adapter_path)

    return {
        "status": "completed",
        "epochs": self.config.num_epochs,
        "total_steps": total_steps,
        "avg_loss": avg_loss,
        "adapter_path": adapter_path,
        "training_samples": len(pairs),
    }

LoRATrainingConfig dataclass

LoRATrainingConfig(lora_rank: int = 16, lora_alpha: int = 32, lora_dropout: float = 0.05, target_modules: List[str] = (lambda: ['q_proj', 'v_proj'])(), num_epochs: int = 3, batch_size: int = 4, learning_rate: float = 2e-05, weight_decay: float = 0.01, warmup_ratio: float = 0.1, max_grad_norm: float = 1.0, max_seq_length: int = 2048, use_4bit: bool = False, output_dir: str = 'checkpoints/lora', save_every_n_epochs: int = 1, gradient_checkpointing: bool = True)

Configuration for LoRA / QLoRA fine-tuning.

Functions

build_routing_context

build_routing_context(query: str, *, urgency: float = 0.5) -> RoutingContext

Populate a RoutingContext from a raw query string.

Source code in src/openjarvis/learning/router.py
def build_routing_context(query: str, *, urgency: float = 0.5) -> RoutingContext:
    """Populate a ``RoutingContext`` from a raw query string."""
    return RoutingContext(
        query=query,
        query_length=len(query),
        has_code=bool(_CODE_PATTERNS.search(query)),
        has_math=bool(_MATH_PATTERNS.search(query)),
        urgency=urgency,
    )

ensure_registered

ensure_registered() -> None

Ensure all learning policies are registered in RouterPolicyRegistry.

Imported lazily to avoid circular imports with the intelligence primitive.

Source code in src/openjarvis/learning/__init__.py
def ensure_registered() -> None:
    """Ensure all learning policies are registered in RouterPolicyRegistry.

    Imported lazily to avoid circular imports with the intelligence primitive.
    """
    from openjarvis.learning.heuristic_policy import (
        ensure_registered as _reg_heuristic,
    )

    _reg_heuristic()

    try:
        from openjarvis.learning.grpo_policy import (
            ensure_registered as _reg_grpo,
        )

        _reg_grpo()
    except ImportError:
        pass

    try:
        from openjarvis.learning.bandit_router import (
            ensure_registered as _reg_bandit,
        )

        _reg_bandit()
    except ImportError:
        pass

    from openjarvis.learning.trace_policy import (
        ensure_registered as _reg_trace,
    )

    _reg_trace()

    try:
        import openjarvis.learning.sft_policy  # noqa: F401
    except ImportError:
        pass

    try:
        import openjarvis.learning.agent_advisor  # noqa: F401
    except ImportError:
        pass

    try:
        import openjarvis.learning.icl_updater  # noqa: F401
    except ImportError:
        pass

    # Orchestrator-native SFT & GRPO training
    try:
        import openjarvis.learning.orchestrator  # noqa: F401
    except ImportError:
        pass