Skip to content

runner

runner

EvalRunner — parallel execution of evaluation samples.

Supports two modes: - Parallel mode (default): Samples processed concurrently via ThreadPoolExecutor. - Episode mode (episode_mode=True): Samples processed sequentially within episodes, with in-context example injection from prior successful completions. Required for lifelong-learning benchmarks like LifelongAgentBench.

When a dataset provides create_task_env() returning a TaskEnvironment, samples are evaluated via multi-turn interactive loops instead of single-shot generation — matching benchmarks that require agent-environment interaction.

Classes

EvalRunner

EvalRunner(config: RunConfig, dataset: DatasetProvider, backend: InferenceBackend, scorer: Scorer, trackers: Optional[List[ResultTracker]] = None)

Runs an evaluation benchmark with parallel sample execution.

Source code in src/openjarvis/evals/core/runner.py
def __init__(
    self,
    config: RunConfig,
    dataset: DatasetProvider,
    backend: InferenceBackend,
    scorer: Scorer,
    trackers: Optional[List[ResultTracker]] = None,
) -> None:
    self._config = config
    self._dataset = dataset
    self._backend = backend
    self._scorer = scorer
    self._trackers: List[ResultTracker] = trackers or []
    self._results: List[EvalResult] = []
    self._output_file: Optional[Any] = None
Attributes
results property
results: List[EvalResult]

Return a copy of collected evaluation results.

Functions
run
run(progress_callback: Optional[Callable[[int, int], None]] = None) -> RunSummary

Execute the evaluation and return a summary.

Args: progress_callback: Optional (completed, total) callback invoked after each sample completes, useful for driving progress bars.

Source code in src/openjarvis/evals/core/runner.py
def run(
    self,
    progress_callback: Optional[Callable[[int, int], None]] = None,
) -> RunSummary:
    """Execute the evaluation and return a summary.

    Args:
        progress_callback: Optional ``(completed, total)`` callback invoked
            after each sample completes, useful for driving progress bars.
    """
    cfg = self._config
    started_at = time.time()

    self._dataset.load(
        max_samples=cfg.max_samples,
        split=cfg.dataset_split,
        seed=cfg.seed,
    )

    # Auto-enable episode_mode when the dataset has iter_episodes()
    # (i.e. it is a lifelong/sequential benchmark like LifelongAgentBench).
    # This is enforced at the runner level so it applies regardless of
    # how the runner is invoked (CLI, SDK, tests, etc.).
    if not cfg.episode_mode and hasattr(self._dataset, "iter_episodes"):
        LOGGER.info(
            "%s requires sequential episode processing — "
            "auto-enabling episode_mode.",
            cfg.benchmark,
        )
        cfg = dataclasses.replace(cfg, episode_mode=True)
        self._config = cfg

    records = list(self._dataset.iter_records())
    LOGGER.info(
        "Running %s: %d samples, backend=%s, model=%s, workers=%d, "
        "episode_mode=%s",
        cfg.benchmark, len(records), cfg.backend, cfg.model,
        cfg.max_workers, cfg.episode_mode,
    )

    # --- Warmup phase (discard results) ---
    warmup_count = cfg.warmup_samples
    if warmup_count > 0 and records:
        warmup_records = records[:warmup_count]
        for rec in warmup_records:
            self._process_one(rec)
        LOGGER.info("Warmup complete: %d samples discarded", len(warmup_records))

    # Open output file for incremental JSONL writing
    output_path = self._resolve_output_path()
    if output_path:
        output_path.parent.mkdir(parents=True, exist_ok=True)
        self._output_file = open(output_path, "w")

    # Notify trackers of run start
    for tracker in self._trackers:
        try:
            tracker.on_run_start(cfg)
        except Exception as exc:
            LOGGER.warning(
                "Tracker %s.on_run_start failed: %s",
                type(tracker).__name__, exc,
            )

    total = len(records)
    try:
        if cfg.episode_mode:
            self._run_episode_mode(records, progress_callback, total)
        else:
            with ThreadPoolExecutor(max_workers=cfg.max_workers) as pool:
                futures = {
                    pool.submit(self._process_one, r): r for r in records
                }
                for future in as_completed(futures):
                    result = future.result()
                    self._results.append(result)
                    self._flush_result(result)
                    if progress_callback is not None:
                        progress_callback(len(self._results), total)
    finally:
        if self._output_file:
            self._output_file.close()
            self._output_file = None

    ended_at = time.time()
    summary = self._compute_summary(records, started_at, ended_at)

    # Notify trackers of summary and run end
    for tracker in self._trackers:
        try:
            tracker.on_summary(summary)
        except Exception as exc:
            LOGGER.warning(
                "Tracker %s.on_summary failed: %s",
                type(tracker).__name__, exc,
            )
        try:
            tracker.on_run_end()
        except Exception as exc:
            LOGGER.warning(
                "Tracker %s.on_run_end failed: %s",
                type(tracker).__name__, exc,
            )

    # Write summary JSON alongside JSONL
    traces_dir: Optional[Path] = None
    if output_path:
        summary_path = output_path.with_suffix(".summary.json")
        with open(summary_path, "w") as f:
            json.dump(_summary_to_dict(summary), f, indent=2)
        LOGGER.info("Results written to %s", output_path)
        LOGGER.info("Summary written to %s", summary_path)

        # Write per-trace data
        traces_dir = self._write_traces(output_path)

    # Attach paths to summary for callers (e.g. CLI display)
    summary._output_path = output_path  # type: ignore[attr-defined]
    summary._traces_dir = traces_dir  # type: ignore[attr-defined]

    return summary

Functions