Skip to content

runner

runner

EvalRunner — parallel execution of evaluation samples.

Supports two modes: - Parallel mode (default): Samples processed concurrently via ThreadPoolExecutor. - Episode mode (episode_mode=True): Samples processed sequentially within episodes, with in-context example injection from prior successful completions. Required for lifelong-learning benchmarks like LifelongAgentBench.

When a dataset provides create_task_env() returning a TaskEnvironment, samples are evaluated via multi-turn interactive loops instead of single-shot generation — matching benchmarks that require agent-environment interaction.

Classes

EvalRunner

EvalRunner(config: RunConfig, dataset: DatasetProvider, backend: InferenceBackend, scorer: Scorer, trackers: Optional[List[ResultTracker]] = None)

Runs an evaluation benchmark with parallel sample execution.

Source code in src/openjarvis/evals/core/runner.py
def __init__(
    self,
    config: RunConfig,
    dataset: DatasetProvider,
    backend: InferenceBackend,
    scorer: Scorer,
    trackers: Optional[List[ResultTracker]] = None,
) -> None:
    self._config = config
    self._dataset = dataset
    self._backend = backend
    self._scorer = scorer
    self._trackers: List[ResultTracker] = trackers or []
    self._results: List[EvalResult] = []
    self._output_file: Optional[Any] = None
Attributes
results property
results: List[EvalResult]

Return a copy of collected evaluation results.

Functions
run
run(progress_callback: Optional[Callable[[int, int], None]] = None) -> RunSummary

Execute the evaluation and return a summary.

Args: progress_callback: Optional (completed, total) callback invoked after each sample completes, useful for driving progress bars.

Source code in src/openjarvis/evals/core/runner.py
def run(
    self,
    progress_callback: Optional[Callable[[int, int], None]] = None,
) -> RunSummary:
    """Execute the evaluation and return a summary.

    Args:
        progress_callback: Optional ``(completed, total)`` callback invoked
            after each sample completes, useful for driving progress bars.
    """
    cfg = self._config
    started_at = time.time()

    self._dataset.load(
        max_samples=cfg.max_samples,
        split=cfg.dataset_split,
        seed=cfg.seed,
    )

    # Auto-enable episode_mode when the dataset *overrides*
    # iter_episodes() (i.e. it is a lifelong/sequential benchmark like
    # LifelongAgentBench).  The base DatasetProvider always defines a
    # default iter_episodes() that wraps each record in its own episode,
    # so hasattr() is always True — we must check for a real override.
    from openjarvis.evals.core.dataset import DatasetProvider as _DP

    try:
        _overrides_episodes = (
            type(self._dataset).iter_episodes is not _DP.iter_episodes
        )
    except AttributeError:
        _overrides_episodes = False
    if not cfg.episode_mode and _overrides_episodes:
        LOGGER.info(
            "%s requires sequential episode processing — "
            "auto-enabling episode_mode.",
            cfg.benchmark,
        )
        cfg = dataclasses.replace(cfg, episode_mode=True)
        self._config = cfg

    # Detect if dataset provides task environments (e.g. PinchBench)
    try:
        self._has_task_env = (
            type(self._dataset).create_task_env is not _DP.create_task_env
        )
    except AttributeError:
        self._has_task_env = False

    records = list(self._dataset.iter_records())
    LOGGER.info(
        "Running %s: %d samples, backend=%s, model=%s, workers=%d, episode_mode=%s",
        cfg.benchmark,
        len(records),
        cfg.backend,
        cfg.model,
        cfg.max_workers,
        cfg.episode_mode,
    )

    # --- Warmup phase (discard results) ---
    warmup_count = cfg.warmup_samples
    if warmup_count > 0 and records:
        warmup_records = records[:warmup_count]
        for rec in warmup_records:
            self._process_one(rec)
        LOGGER.info("Warmup complete: %d samples discarded", len(warmup_records))

    # Open output file for incremental JSONL writing
    output_path = self._resolve_output_path()
    if output_path:
        output_path.parent.mkdir(parents=True, exist_ok=True)
        self._output_file = open(output_path, "w")

    # Notify trackers of run start
    for tracker in self._trackers:
        try:
            tracker.on_run_start(cfg)
        except Exception as exc:
            LOGGER.warning(
                "Tracker %s.on_run_start failed: %s",
                type(tracker).__name__,
                exc,
            )

    total = len(records)
    try:
        if cfg.episode_mode:
            self._run_episode_mode(records, progress_callback, total)
        elif self._has_task_env:
            # Task environments (PinchBench etc.) change CWD —
            # must process sequentially for thread safety.
            for record in records:
                result = self._process_one(record)
                self._results.append(result)
                self._flush_result(result)
                if progress_callback is not None:
                    progress_callback(len(self._results), total)
        else:
            with ThreadPoolExecutor(max_workers=cfg.max_workers) as pool:
                futures = {pool.submit(self._process_one, r): r for r in records}
                for future in as_completed(futures):
                    result = future.result()
                    self._results.append(result)
                    self._flush_result(result)
                    if progress_callback is not None:
                        progress_callback(len(self._results), total)
    finally:
        if self._output_file:
            self._output_file.close()
            self._output_file = None

    ended_at = time.time()
    summary = self._compute_summary(records, started_at, ended_at)

    # Notify trackers of summary and run end
    for tracker in self._trackers:
        try:
            tracker.on_summary(summary)
        except Exception as exc:
            LOGGER.warning(
                "Tracker %s.on_summary failed: %s",
                type(tracker).__name__,
                exc,
            )
        try:
            tracker.on_run_end()
        except Exception as exc:
            LOGGER.warning(
                "Tracker %s.on_run_end failed: %s",
                type(tracker).__name__,
                exc,
            )

    # Write summary JSON alongside JSONL
    traces_dir: Optional[Path] = None
    if output_path:
        summary_path = output_path.with_suffix(".summary.json")
        with open(summary_path, "w") as f:
            json.dump(_summary_to_dict(summary), f, indent=2)
        LOGGER.info("Results written to %s", output_path)
        LOGGER.info("Summary written to %s", summary_path)

        # Write per-trace data
        traces_dir = self._write_traces(output_path)

    # Attach paths to summary for callers (e.g. CLI display)
    summary._output_path = output_path  # type: ignore[attr-defined]
    summary._traces_dir = traces_dir  # type: ignore[attr-defined]

    return summary

Functions