def export_summary_json(
traces: list[QueryTrace],
config: dict[str, Any],
path: Path,
*,
bench_energy: Optional[dict[str, Any]] = None,
) -> Path:
"""Export aggregate summary as JSON.
Args:
traces: List of QueryTrace objects.
config: Run configuration dictionary.
path: Output file path.
bench_energy: Optional benchmark-level aggregate telemetry dict.
Returns:
The path to the written file.
"""
total_queries = len(traces)
completed = sum(1 for t in traces if t.completed)
total_turns = sum(t.num_turns for t in traces)
total_tool_calls = sum(t.total_tool_calls for t in traces)
total_input_tokens = sum(t.total_input_tokens for t in traces)
total_output_tokens = sum(t.total_output_tokens for t in traces)
total_wall_clock_s = sum(t.total_wall_clock_s for t in traces)
gpu_energy_values = [
t.total_gpu_energy_joules
for t in traces
if t.total_gpu_energy_joules is not None
]
total_gpu_energy = sum(gpu_energy_values) if gpu_energy_values else None
cpu_energy_values: list[float] = []
for trace in traces:
cpu_vals = [
turn.cpu_energy_joules
for turn in trace.turns
if turn.cpu_energy_joules is not None
]
if cpu_vals:
cpu_energy_values.append(sum(cpu_vals))
total_cpu_energy = sum(cpu_energy_values) if cpu_energy_values else None
resolved = sum(1 for t in traces if t.is_resolved is True)
unresolved = sum(1 for t in traces if t.is_resolved is False)
cost_values = [t.total_cost_usd for t in traces if t.total_cost_usd is not None]
total_cost = sum(cost_values) if cost_values else None
avg_turns = total_turns / total_queries if total_queries > 0 else 0
avg_wall_clock = total_wall_clock_s / total_queries if total_queries > 0 else 0
avg_gpu_energy = (
total_gpu_energy / total_queries
if total_gpu_energy is not None and total_queries > 0
else None
)
stats = {
"wall_clock_s": _agg_stats([t.total_wall_clock_s for t in traces]),
"gpu_energy_joules": _agg_stats(
[t.total_gpu_energy_joules for t in traces],
),
"cpu_energy_joules": _agg_stats(
[t.total_cpu_energy_joules for t in traces],
),
"gpu_power_watts": _agg_stats(
[t.avg_gpu_power_watts for t in traces],
),
"cpu_power_watts": _agg_stats(
[t.avg_cpu_power_watts for t in traces],
),
"input_tokens": _agg_stats(
[float(t.total_input_tokens) for t in traces],
),
"output_tokens": _agg_stats(
[float(t.total_output_tokens) for t in traces],
),
"total_tokens": _agg_stats(
[float(t.total_tokens) for t in traces],
),
"throughput_tokens_per_sec": _agg_stats(
[t.throughput_tokens_per_sec for t in traces],
),
"energy_per_token_joules": _agg_stats(
[t.energy_per_token_joules for t in traces],
),
"cost_usd": _agg_stats([t.total_cost_usd for t in traces]),
"turns": _agg_stats([float(t.num_turns) for t in traces]),
"tool_calls": _agg_stats(
[float(t.total_tool_calls) for t in traces],
),
"mbu_avg_pct": _agg_stats(
[t.query_mbu_avg_pct for t in traces],
),
}
accuracy = (
resolved / (resolved + unresolved) if (resolved + unresolved) > 0 else None
)
efficiency = _compute_efficiency(traces, total_gpu_energy, total_cpu_energy)
normalized = _compute_normalized(traces)
# Aggregate per-action energy across all turns
action_totals: dict[str, dict[str, float]] = {}
for trace in traces:
for turn in trace.turns:
if not turn.action_energy_breakdown:
continue
for action in turn.action_energy_breakdown:
atype = action["action_type"]
if atype not in action_totals:
action_totals[atype] = {
"count": 0,
"total_duration_s": 0.0,
"total_gpu_energy_joules": 0.0,
"total_cpu_energy_joules": 0.0,
}
entry = action_totals[atype]
entry["count"] += 1
entry["total_duration_s"] += action.get(
"duration_s",
0.0,
)
gpu_e = action.get("gpu_energy_joules")
if gpu_e is not None:
entry["total_gpu_energy_joules"] += gpu_e
cpu_e = action.get("cpu_energy_joules")
if cpu_e is not None:
entry["total_cpu_energy_joules"] += cpu_e
summary: dict[str, Any] = {
"generated_at": time.time(),
"config": config,
"hardware_info": _hardware_info_dict(),
"totals": {
"queries": total_queries,
"completed": completed,
"resolved": resolved,
"unresolved": unresolved,
"accuracy": accuracy,
"turns": total_turns,
"tool_calls": total_tool_calls,
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
"total_tokens": total_input_tokens + total_output_tokens,
"wall_clock_s": total_wall_clock_s,
"gpu_energy_joules": total_gpu_energy,
"cpu_energy_joules": total_cpu_energy,
"cost_usd": total_cost,
},
"averages": {
"turns_per_query": avg_turns,
"wall_clock_per_query_s": avg_wall_clock,
"gpu_energy_per_query_joules": avg_gpu_energy,
},
"statistics": stats,
"efficiency": efficiency,
}
if action_totals:
summary["action_energy_summary"] = action_totals
if normalized is not None:
summary["normalized_statistics"] = normalized["normalized_statistics"]
summary["normalized_efficiency"] = normalized["normalized_efficiency"]
if bench_energy is not None:
summary["bench_telemetry"] = bench_energy
# ---- Spec ยง6.3: table_gen-compatible flat schema ----
# Emit framework / framework_commit / model / benchmark / n_tasks /
# metrics at the top level so the framework-comparison `table_gen`
# loader (`_SummarySchema`) can parse this file. The existing rich
# schema is preserved untouched; this is purely additive.
fwk = ""
fwk_commit = ""
if isinstance(config, dict):
fwk = config.get("framework", "") or ""
fwk_commit = config.get("framework_commit", "") or ""
if not fwk:
fwk = "openjarvis"
def _stats_block(vals: list[float]) -> dict[str, Any]:
if not vals:
return {"mean": 0.0, "std": 0.0, "n": 0}
return {
"mean": float(statistics.fmean(vals)),
"std": (float(statistics.stdev(vals)) if len(vals) > 1 else 0.0),
"n": len(vals),
}
accuracy_vals: list[float] = [
1.0 if t.is_resolved is True else 0.0
for t in traces
if t.is_resolved is not None
]
latency_vals = [t.total_wall_clock_s for t in traces if t.total_wall_clock_s > 0]
energy_vals = [
t.total_gpu_energy_joules
for t in traces
if t.total_gpu_energy_joules is not None and t.total_gpu_energy_joules > 0
]
in_tok_vals = [
float(t.total_input_tokens) for t in traces if t.total_input_tokens > 0
]
out_tok_vals = [
float(t.total_output_tokens) for t in traces if t.total_output_tokens > 0
]
cost_vals = [
t.total_cost_usd
for t in traces
if t.total_cost_usd is not None and t.total_cost_usd > 0
]
power_vals = [
t.avg_gpu_power_watts
for t in traces
if t.avg_gpu_power_watts is not None and t.avg_gpu_power_watts > 0
]
summary["framework"] = fwk
summary["framework_commit"] = fwk_commit
summary["model"] = config.get("model", "") if isinstance(config, dict) else ""
summary["benchmark"] = (
config.get("benchmark", "") if isinstance(config, dict) else ""
)
summary["n_tasks"] = len(traces)
summary["metrics"] = {
"accuracy": _stats_block(accuracy_vals),
"latency_seconds": _stats_block(latency_vals),
"energy_joules_per_query": _stats_block(energy_vals),
"input_tokens_per_query": _stats_block(in_tok_vals),
"output_tokens_per_query": _stats_block(out_tok_vals),
"cost_usd_per_query": _stats_block(cost_vals),
"peak_power_w": _stats_block(power_vals),
}
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(summary, indent=2, default=str))
return path