def run(
self,
progress_callback: Optional[Callable[[int, int], None]] = None,
) -> RunSummary:
"""Execute the evaluation and return a summary.
Args:
progress_callback: Optional ``(completed, total)`` callback invoked
after each sample completes, useful for driving progress bars.
"""
cfg = self._config
started_at = time.time()
self._dataset.load(
max_samples=cfg.max_samples,
split=cfg.dataset_split,
seed=cfg.seed,
)
# Auto-enable episode_mode when the dataset *overrides*
# iter_episodes() (i.e. it is a lifelong/sequential benchmark like
# LifelongAgentBench). The base DatasetProvider always defines a
# default iter_episodes() that wraps each record in its own episode,
# so hasattr() is always True — we must check for a real override.
from openjarvis.evals.core.dataset import DatasetProvider as _DP
try:
_overrides_episodes = (
type(self._dataset).iter_episodes is not _DP.iter_episodes
)
except AttributeError:
_overrides_episodes = False
if not cfg.episode_mode and _overrides_episodes:
LOGGER.info(
"%s requires sequential episode processing — "
"auto-enabling episode_mode.",
cfg.benchmark,
)
cfg = dataclasses.replace(cfg, episode_mode=True)
self._config = cfg
# Detect if dataset provides task environments (e.g. PinchBench)
try:
self._has_task_env = (
type(self._dataset).create_task_env is not _DP.create_task_env
)
except AttributeError:
self._has_task_env = False
records = list(self._dataset.iter_records())
LOGGER.info(
"Running %s: %d samples, backend=%s, model=%s, workers=%d, episode_mode=%s",
cfg.benchmark,
len(records),
cfg.backend,
cfg.model,
cfg.max_workers,
cfg.episode_mode,
)
# --- Warmup phase (discard results) ---
warmup_count = cfg.warmup_samples
if warmup_count > 0 and records:
warmup_records = records[:warmup_count]
for rec in warmup_records:
self._process_one(rec)
LOGGER.info("Warmup complete: %d samples discarded", len(warmup_records))
# Open output file for incremental JSONL writing
output_path = self._resolve_output_path()
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
self._output_file = open(output_path, "w")
# Notify trackers of run start
for tracker in self._trackers:
try:
tracker.on_run_start(cfg)
except Exception as exc:
LOGGER.warning(
"Tracker %s.on_run_start failed: %s",
type(tracker).__name__,
exc,
)
total = len(records)
try:
if cfg.episode_mode:
self._run_episode_mode(records, progress_callback, total)
elif self._has_task_env:
# Task environments (PinchBench etc.) change CWD —
# must process sequentially for thread safety.
for record in records:
result = self._process_one(record)
self._results.append(result)
self._flush_result(result)
if progress_callback is not None:
progress_callback(len(self._results), total)
else:
with ThreadPoolExecutor(max_workers=cfg.max_workers) as pool:
futures = {pool.submit(self._process_one, r): r for r in records}
for future in as_completed(futures):
result = future.result()
self._results.append(result)
self._flush_result(result)
if progress_callback is not None:
progress_callback(len(self._results), total)
finally:
if self._output_file:
self._output_file.close()
self._output_file = None
ended_at = time.time()
summary = self._compute_summary(records, started_at, ended_at)
# Notify trackers of summary and run end
for tracker in self._trackers:
try:
tracker.on_summary(summary)
except Exception as exc:
LOGGER.warning(
"Tracker %s.on_summary failed: %s",
type(tracker).__name__,
exc,
)
try:
tracker.on_run_end()
except Exception as exc:
LOGGER.warning(
"Tracker %s.on_run_end failed: %s",
type(tracker).__name__,
exc,
)
# Write summary JSON alongside JSONL
traces_dir: Optional[Path] = None
if output_path:
summary_path = output_path.with_suffix(".summary.json")
with open(summary_path, "w") as f:
json.dump(_summary_to_dict(summary), f, indent=2)
LOGGER.info("Results written to %s", output_path)
LOGGER.info("Summary written to %s", summary_path)
# Write per-trace data
traces_dir = self._write_traces(output_path)
# Attach paths to summary for callers (e.g. CLI display)
summary._output_path = output_path # type: ignore[attr-defined]
summary._traces_dir = traces_dir # type: ignore[attr-defined]
return summary