Skip to content

agentic_runner

agentic_runner

AgenticRunner — multi-turn agent execution with energy telemetry correlation.

Orchestrates agentic workloads where a single query may involve multiple LLM turns and tool calls, capturing per-turn traces with energy attribution.

Classes

AgenticRunner

AgenticRunner(agent: Any, dataset: Any, telemetry_session: Any = None, config: Optional[dict[str, Any]] = None, event_recorder: Optional[EventRecorder] = None, run_dir: Optional[Path] = None, concurrency: int = 1, agent_factory: Optional[Callable[[], Any]] = None, query_timeout: Optional[float] = None)

Orchestrate multi-turn agent runs with energy telemetry correlation.

Designed for agentic workloads where a single query may involve multiple LLM turns and tool calls. Captures per-turn TurnTrace objects with energy attribution and builds QueryTrace aggregates.

Source code in src/openjarvis/evals/core/agentic_runner.py
def __init__(
    self,
    agent: Any,
    dataset: Any,
    telemetry_session: Any = None,
    config: Optional[dict[str, Any]] = None,
    event_recorder: Optional[EventRecorder] = None,
    run_dir: Optional[Path] = None,
    concurrency: int = 1,
    agent_factory: Optional[Callable[[], Any]] = None,
    query_timeout: Optional[float] = None,
) -> None:
    self._agent = agent
    self._dataset = dataset
    self._telemetry = telemetry_session
    self._config = config or {}
    self._event_recorder = (
        event_recorder if event_recorder is not None else EventRecorder()
    )
    self._run_dir = run_dir
    self._traces: list[QueryTrace] = []
    self._concurrency = max(1, concurrency)
    self._agent_factory = agent_factory
    self._query_timeout = query_timeout
    self._results_lock = threading.Lock()
Attributes
traces property
traces: list[QueryTrace]

Return collected traces.

Functions
run async
run(max_queries: Optional[int] = None) -> list[QueryTrace]

Run the agent over the dataset, collecting traces and telemetry.

Args: max_queries: Maximum number of queries to process. None means all.

Returns: List of QueryTrace objects with energy-correlated telemetry.

Source code in src/openjarvis/evals/core/agentic_runner.py
async def run(self, max_queries: Optional[int] = None) -> list[QueryTrace]:
    """Run the agent over the dataset, collecting traces and telemetry.

    Args:
        max_queries: Maximum number of queries to process. None means all.

    Returns:
        List of ``QueryTrace`` objects with energy-correlated telemetry.
    """
    records = list(self._dataset.iter_records())
    total = max_queries if max_queries is not None else len(records)
    records = records[:total]
    model = self._config.get("model", "unknown")

    work_items = list(enumerate(records))

    if self._concurrency <= 1:
        return await self._run_sequential(work_items, model)
    return await self._run_concurrent(work_items, model)