Skip to content

runner

runner

DiagnosisRunner: orchestrates phase 1 of the distillation loop.

Builds diagnostic tools, runs the TeacherAgent, parses failure clusters from the teacher's output, and persists artifacts.

See spec §5.

Classes

DiagnosisResult dataclass

DiagnosisResult(diagnosis_md: str, clusters: list[FailureCluster] = list(), cost_usd: float = 0.0, tool_call_records: list[ToolCallRecord] = list())

The output of a diagnosis run.

DiagnosisRunner

DiagnosisRunner(*, teacher_engine: Any, teacher_model: str, trace_store: Any, benchmark_samples: list, student_runner: Any, judge: Any, session_dir: Path, session_id: str, config: dict[str, Any], max_turns: int = 30, max_cost_usd: float = 5.0)

Orchestrates phase 1 of the distillation loop.

PARAMETER DESCRIPTION
teacher_engine

The CloudEngine (or mock) for teacher inference.

TYPE: Any

teacher_model

Frontier model id (e.g. "claude-opus-4-6").

TYPE: str

trace_store

TraceStore for reading student traces.

TYPE: Any

benchmark_samples

List of PersonalBenchmarkSample objects.

TYPE: list

student_runner

Callable to re-execute the student on a task.

TYPE: Any

judge

TraceJudge for comparing outputs.

TYPE: Any

session_dir

Path where session artifacts are written.

TYPE: Path

session_id

Current session id.

TYPE: str

config

Dict with config_path and openjarvis_home.

TYPE: dict[str, Any]

max_turns

Max teacher tool calls (default 30).

TYPE: int DEFAULT: 30

max_cost_usd

Max teacher API cost (default 5.0).

TYPE: float DEFAULT: 5.0

Source code in src/openjarvis/learning/distillation/diagnose/runner.py
def __init__(
    self,
    *,
    teacher_engine: Any,
    teacher_model: str,
    trace_store: Any,
    benchmark_samples: list,
    student_runner: Any,
    judge: Any,
    session_dir: Path,
    session_id: str,
    config: dict[str, Any],
    max_turns: int = 30,
    max_cost_usd: float = 5.0,
) -> None:
    self._teacher_engine = teacher_engine
    self._teacher_model = teacher_model
    self._trace_store = trace_store
    self._benchmark_samples = benchmark_samples
    self._student_runner = student_runner
    self._judge = judge
    self._session_dir = Path(session_dir)
    self._session_id = session_id
    self._config = config
    self._max_turns = max_turns
    self._max_cost_usd = max_cost_usd
Functions
run
run() -> DiagnosisResult

Execute the diagnosis phase.

RETURNS DESCRIPTION
DiagnosisResult

Contains the diagnosis markdown, parsed clusters, cost, and tool call records.

Source code in src/openjarvis/learning/distillation/diagnose/runner.py
def run(self) -> DiagnosisResult:
    """Execute the diagnosis phase.

    Returns
    -------
    DiagnosisResult
        Contains the diagnosis markdown, parsed clusters, cost, and
        tool call records.
    """
    # Ensure session directory exists
    self._session_dir.mkdir(parents=True, exist_ok=True)

    # Build diagnostic tools
    tools = build_diagnostic_tools(
        trace_store=self._trace_store,
        config=self._config,
        benchmark_samples=self._benchmark_samples,
        student_runner=self._student_runner,
        teacher_engine=self._teacher_engine,
        teacher_model=self._teacher_model,
        judge=self._judge,
        session_id=self._session_id,
    )

    # Build system prompt with budget hints
    system_prompt = _SYSTEM_PROMPT.format(
        max_turns=self._max_turns,
        max_cost_usd=self._max_cost_usd,
    )

    # Run the teacher
    agent = TeacherAgent(
        engine=self._teacher_engine,
        model=self._teacher_model,
        tools=tools,
        max_turns=self._max_turns,
        max_cost_usd=self._max_cost_usd,
    )
    agent_result = agent.run(
        "Analyze the student's recent trace history, identify failure patterns, "
        "and produce a structured diagnosis with failure clusters.",
        system_prompt=system_prompt,
    )

    # Persist diagnosis.md
    diagnosis_path = self._session_dir / "diagnosis.md"
    diagnosis_path.write_text(agent_result.content, encoding="utf-8")

    # Persist teacher traces JSONL
    traces_dir = self._session_dir / "teacher_traces"
    traces_dir.mkdir(parents=True, exist_ok=True)
    jsonl_path = traces_dir / "diagnose.jsonl"
    with jsonl_path.open("w", encoding="utf-8") as f:
        for record in agent_result.tool_call_records:
            f.write(json.dumps(record.to_jsonl_dict()) + "\n")

    # Parse failure clusters from the diagnosis content
    clusters = _parse_clusters(agent_result.content)

    # Fallback: if no clusters parsed, ask the teacher to emit only the JSON
    if not clusters:
        logger.warning(
            "No clusters in primary diagnosis (%d chars, %d tool calls). "
            "Attempting fallback extraction.",
            len(agent_result.content),
            len(agent_result.tool_call_records),
        )
        clusters = self._fallback_extract_clusters(agent_result.content)

    return DiagnosisResult(
        diagnosis_md=agent_result.content,
        clusters=clusters,
        cost_usd=agent_result.total_cost_usd,
        tool_call_records=agent_result.tool_call_records,
    )

Functions