def run_orchestrator(
agent: Any,
problem: str,
*,
cfg: Dict[str, Any],
handbook: Optional[StageSkillHandbook],
strategy: str,
) -> Tuple[str, Dict[str, Any]]:
"""Run the eval orchestrator on one problem. Returns (answer, metadata)."""
max_rounds = int(cfg.get("max_rounds", 6))
char_cap = int(cfg.get("context_char_cap", 24000))
retriever_url = cfg.get("retriever_url")
code_timeout = int(cfg.get("code_timeout_s", 60))
answer_max_tokens = int(cfg.get("answer_max_tokens", 40000))
ws_max_uses = int(cfg.get("web_search_max_uses", 5))
# The orchestrator model: a fixed model per run (the original's
# MODEL_NAME). Defaults to the cell's cloud model when that endpoint
# supports tool calls, else Opus. ``router_model`` / ``router_endpoint``
# are accepted as back-compat aliases (pre-restructure cfg key names).
orch_endpoint = (cfg.get("orchestrator_endpoint")
or cfg.get("router_endpoint")
or agent._cloud_endpoint).lower()
orch_model = (cfg.get("orchestrator_model")
or cfg.get("router_model")
or agent._cloud_model)
if orch_endpoint not in ("anthropic", "openai"):
orch_endpoint, orch_model = "anthropic", "claude-opus-4-7"
orch_max_tokens = int(cfg.get("orchestrator_max_tokens", 4096))
pool = build_pool(
local_model=agent._local_model,
local_endpoint=agent._local_endpoint,
cloud_model=agent._cloud_model,
cloud_endpoint=agent._cloud_endpoint,
overrides=cfg.get("model_pool"),
)
doc_list: List[Tuple[str, str]] = []
code_list: List[Tuple[str, str]] = []
attempt_list: List[Tuple[str, str]] = []
route_log: List[Dict[str, Any]] = []
tokens_local = 0
tokens_cloud = 0
cost_usd = 0.0
tool_calls_n = 0
web_uses = 0
final_pred = ""
used_rounds = 0
def _route(stage: str, tool_alias: Optional[str], orch_text: str) -> str:
"""Resolve the worker alias for ``stage`` via the routing strategy."""
if handbook is not None and strategy != "none":
sa = parse_skill_analysis(orch_text)
rr = get_routing_strategy(strategy, handbook).select_model(
stage, sa, tool_call_model=tool_alias,
)
return rr.model_alias
return tool_alias or _STAGE_DEFAULT_ALIAS[stage]
for step in range(max_rounds):
used_rounds = step + 1
is_last = step == max_rounds - 1
context_str = _build_context(
doc_list, code_list, attempt_list, char_cap=char_cap,
)
if handbook is not None and strategy != "none":
user = build_skill_orchestrator_prompt(
problem=problem,
context_str=context_str,
strategy=strategy,
handbook=handbook,
)
else:
user = (
f"Problem: {problem}\n\n{context_str}\n\n"
"Choose an appropriate tool."
)
text, tcalls, p, c, ocost = _orchestrate_step(
agent, user=user, model=orch_model,
endpoint=orch_endpoint, max_tokens=orch_max_tokens,
)
tokens_cloud += p + c
cost_usd += ocost
# The orchestrator may answer directly in <answer> tags.
if not tcalls and "<answer>" in text and "</answer>" in text:
final_pred = text.split("<answer>")[-1].split("</answer>")[0].strip()
break
# Last round: force the answer tool (eval_frames.py:1373-1380).
if is_last:
ans_alias = None
for tc in tcalls:
if tc["name"] == "answer":
ans_alias = (tc.get("input") or {}).get("model")
tcalls = [{"name": "answer", "input": {"model": ans_alias or "answer-1"}}]
elif not tcalls:
# No tool, no answer — record the text and continue.
if text.strip():
attempt_list.append(("orchestrator", text.strip()[:2000]))
continue
finish = False
for tc in tcalls:
tool = tc["name"]
tool_alias = (tc.get("input") or {}).get("model")
stage = _TOOL_STAGE.get(tool, "answer")
chosen_alias = _route(stage, tool_alias, text)
spec: ModelSpec = pool.get(chosen_alias) or pool[
_STAGE_DEFAULT_ALIAS[stage]
]
route_log.append({
"step": step,
"tool": tool,
"orchestrator_alias": tool_alias,
"routed_alias": chosen_alias,
"routed_model": spec.model,
"is_local": spec.is_local,
})
tool_calls_n += 1
if tool == "search":
res = run_search(
agent, spec, context_str=context_str, problem=problem,
retriever_url=retriever_url, web_search_max_uses=ws_max_uses,
)
docs = res["search_results_data"]
joined = "\n---\n".join(d for d in docs if d)[:char_cap]
doc_list.append((res["query"], joined or "(no results)"))
web_uses += res.get("web_search_uses", 0)
elif tool in ("enhance_reasoning", "code"):
res = run_code(
agent, spec, context_str=context_str, problem=problem,
bash_timeout_s=code_timeout,
)
code_list.append((res["generated_code"], res["exec_result"]))
else: # answer
res = run_answer(
agent, spec, context_str=context_str, problem=problem,
max_tokens=answer_max_tokens,
)
final_pred = res["pred"]
attempt_list.append((res["alias"], final_pred))
finish = True
if res["is_local"]:
tokens_local += res["tokens_in"] + res["tokens_out"]
else:
tokens_cloud += res["tokens_in"] + res["tokens_out"]
cost_usd += res["cost_usd"]
if finish:
break
agent.record_trace_event({
"kind": "skillorchestra_route_log",
"strategy": strategy,
"rounds_used": used_rounds,
"routes": route_log,
})
meta = {
"tokens_local": tokens_local,
"tokens_cloud": tokens_cloud,
"cost_usd": cost_usd,
"turns": used_rounds,
"tool_calls": tool_calls_n,
"web_search_uses": web_uses,
"traces": {
"strategy": strategy,
"handbook_loaded": handbook is not None,
"rounds_used": used_rounds,
"routes": route_log,
},
}
return final_pred, meta