Skip to content

Index

traces

Trace system — full interaction-level recording and analysis.

The trace system captures the complete sequence of steps an agent takes to handle a query. Unlike telemetry (which records per-inference metrics), traces record the decision-making process: which model was selected, what memory was retrieved, which tools were called, and the final response.

Traces are the primary input to the learning system.

Classes

TraceAnalyzer

TraceAnalyzer(store: TraceStore)

Read-only query layer over a :class:TraceStore.

Computes aggregated statistics from stored traces, providing the inputs that the learning system needs to update routing policies.

Source code in src/openjarvis/traces/analyzer.py
def __init__(self, store: TraceStore) -> None:
    self._store = store
Functions
summary
summary(*, since: Optional[float] = None, until: Optional[float] = None) -> TraceSummary

Compute an overall summary of all traces in the time range.

Source code in src/openjarvis/traces/analyzer.py
def summary(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> TraceSummary:
    """Compute an overall summary of all traces in the time range."""
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    if not traces:
        return TraceSummary()

    total_steps = sum(len(t.steps) for t in traces)
    evaluated = [t for t in traces if t.outcome is not None]
    successes = [t for t in evaluated if t.outcome == "success"]

    step_dist: Dict[str, int] = {}
    total_energy = 0.0
    generate_energy = 0.0
    step_data: Dict[str, Dict[str, list]] = {}

    for t in traces:
        for s in t.steps:
            key = _step_type_str(s)
            step_dist[key] = step_dist.get(key, 0) + 1

            energy = s.metadata.get("energy_joules", 0.0)
            total_energy += energy
            if key == "generate":
                generate_energy += energy

            if key not in step_data:
                step_data[key] = {
                    "durations": [], "energies": [],
                    "input_tokens": [], "output_tokens": [],
                }
            step_data[key]["durations"].append(s.duration_seconds)
            step_data[key]["energies"].append(energy)
            step_data[key]["input_tokens"].append(
                s.output.get("prompt_tokens", 0)
            )
            step_data[key]["output_tokens"].append(
                s.output.get("completion_tokens", 0)
            )

    sts_map: Dict[str, StepTypeStats] = {}
    for key, data in step_data.items():
        durations = data["durations"]
        in_tok = [float(x) for x in data["input_tokens"]]
        out_tok = [float(x) for x in data["output_tokens"]]
        sts_map[key] = StepTypeStats(
            count=len(durations),
            avg_duration=_avg(durations),
            median_duration=stats_mod.median(durations) if durations else 0.0,
            min_duration=min(durations) if durations else 0.0,
            max_duration=max(durations) if durations else 0.0,
            std_duration=stats_mod.stdev(durations) if len(durations) > 1 else 0.0,
            total_energy=sum(data["energies"]),
            avg_input_tokens=_avg(in_tok),
            median_input_tokens=stats_mod.median(in_tok) if in_tok else 0.0,
            min_input_tokens=min(in_tok) if in_tok else 0.0,
            max_input_tokens=max(in_tok) if in_tok else 0.0,
            std_input_tokens=stats_mod.stdev(in_tok) if len(in_tok) > 1 else 0.0,
            avg_output_tokens=_avg(out_tok),
            median_output_tokens=stats_mod.median(out_tok) if out_tok else 0.0,
            min_output_tokens=min(out_tok) if out_tok else 0.0,
            max_output_tokens=max(out_tok) if out_tok else 0.0,
            std_output_tokens=stats_mod.stdev(out_tok) if len(out_tok) > 1 else 0.0,
        )

    return TraceSummary(
        total_traces=len(traces),
        total_steps=total_steps,
        avg_steps_per_trace=total_steps / len(traces) if traces else 0.0,
        avg_latency=_avg([t.total_latency_seconds for t in traces]),
        avg_tokens=_avg([float(t.total_tokens) for t in traces]),
        success_rate=len(successes) / len(evaluated) if evaluated else 0.0,
        step_type_distribution=step_dist,
        total_energy_joules=total_energy,
        total_generate_energy_joules=generate_energy,
        step_type_stats=sts_map,
    )
per_route_stats
per_route_stats(*, since: Optional[float] = None, until: Optional[float] = None) -> List[RouteStats]

Compute stats grouped by (model, agent) routing decisions.

Source code in src/openjarvis/traces/analyzer.py
def per_route_stats(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> List[RouteStats]:
    """Compute stats grouped by (model, agent) routing decisions."""
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    groups: Dict[tuple, list[Trace]] = {}
    for t in traces:
        key = (t.model, t.agent)
        groups.setdefault(key, []).append(t)

    results = []
    for (model, agent), group in sorted(groups.items()):
        evaluated = [t for t in group if t.outcome is not None]
        successes = [t for t in evaluated if t.outcome == "success"]
        feedbacks = [t.feedback for t in group if t.feedback is not None]
        results.append(
            RouteStats(
                model=model,
                agent=agent,
                count=len(group),
                avg_latency=_avg([t.total_latency_seconds for t in group]),
                avg_tokens=_avg([float(t.total_tokens) for t in group]),
                success_rate=len(successes) / len(evaluated) if evaluated else 0.0,
                avg_feedback=_avg(feedbacks) if feedbacks else None,
            )
        )
    return results
per_tool_stats
per_tool_stats(*, since: Optional[float] = None, until: Optional[float] = None) -> List[ToolStats]

Compute stats grouped by tool name.

Source code in src/openjarvis/traces/analyzer.py
def per_tool_stats(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> List[ToolStats]:
    """Compute stats grouped by tool name."""
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    tools: Dict[str, Dict[str, Any]] = {}
    for t in traces:
        for s in t.steps:
            stype = _step_type_str(s)
            if stype != "tool_call":
                continue
            name = s.input.get("tool", "unknown")
            if name not in tools:
                tools[name] = {"count": 0, "latencies": [], "successes": 0}
            tools[name]["count"] += 1
            tools[name]["latencies"].append(s.duration_seconds)
            if s.output.get("success"):
                tools[name]["successes"] += 1

    return [
        ToolStats(
            tool_name=name,
            call_count=data["count"],
            avg_latency=_avg(data["latencies"]),
            success_rate=(
                data["successes"] / data["count"]
                if data["count"] else 0.0
            ),
        )
        for name, data in sorted(tools.items())
    ]
traces_for_query_type
traces_for_query_type(*, has_code: bool = False, min_length: Optional[int] = None, max_length: Optional[int] = None, since: Optional[float] = None, until: Optional[float] = None) -> List[Trace]

Retrieve traces matching query characteristics.

Useful for the learning system to find traces similar to a new query and learn which routing decisions worked best.

Source code in src/openjarvis/traces/analyzer.py
def traces_for_query_type(
    self,
    *,
    has_code: bool = False,
    min_length: Optional[int] = None,
    max_length: Optional[int] = None,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> List[Trace]:
    """Retrieve traces matching query characteristics.

    Useful for the learning system to find traces similar to a new
    query and learn which routing decisions worked best.
    """
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    filtered = []
    for t in traces:
        if has_code and not _looks_like_code(t.query):
            continue
        if min_length is not None and len(t.query) < min_length:
            continue
        if max_length is not None and len(t.query) > max_length:
            continue
        filtered.append(t)
    return filtered
export_traces
export_traces(*, since: Optional[float] = None, until: Optional[float] = None, limit: int = 1000) -> List[Dict[str, Any]]

Export traces as plain dicts (for JSON serialization).

Source code in src/openjarvis/traces/analyzer.py
def export_traces(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
    limit: int = 1000,
) -> List[Dict[str, Any]]:
    """Export traces as plain dicts (for JSON serialization)."""
    traces = self._store.list_traces(since=since, until=until, limit=limit)
    return [_trace_to_dict(t) for t in traces]

TraceCollector

TraceCollector(agent: BaseAgent, *, store: Optional[TraceStore] = None, bus: Optional[EventBus] = None)

Wraps a BaseAgent and records a :class:Trace for every run().

The collector subscribes to the EventBus to capture inference, tool, and memory events emitted during agent execution, converting them into TraceStep objects. When the agent finishes, the complete Trace is persisted to the TraceStore and published on the bus.

Usage::

agent = OrchestratorAgent(engine, model, tools=tools, bus=bus)
collector = TraceCollector(agent, store=trace_store, bus=bus)
result = collector.run("What is 2+2?")
# Trace is automatically saved to trace_store
Source code in src/openjarvis/traces/collector.py
def __init__(
    self,
    agent: BaseAgent,
    *,
    store: Optional[TraceStore] = None,
    bus: Optional[EventBus] = None,
) -> None:
    self._agent = agent
    self._store = store
    self._bus = bus
    self._current_steps: list[TraceStep] = []
    self._current_model: str = ""
    self._current_engine: str = ""
Attributes
last_trace property
last_trace: Optional[Trace]

Return the trace from the most recent run(), if available.

Functions
run
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Execute the wrapped agent and record a trace.

Source code in src/openjarvis/traces/collector.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Execute the wrapped agent and record a trace."""
    self._current_steps = []
    self._current_model = ""
    self._current_engine = ""

    # Subscribe to events for trace collection
    unsubs = self._subscribe()

    started_at = time.time()
    try:
        result = self._agent.run(input, context=context, **kwargs)
    finally:
        self._unsubscribe(unsubs)

    ended_at = time.time()

    # Add final respond step
    self._current_steps.append(
        TraceStep(
            step_type=StepType.RESPOND,
            timestamp=ended_at,
            duration_seconds=0.0,
            output={"content": result.content, "turns": result.turns},
        )
    )

    # Build and persist the trace
    trace = Trace(
        query=input,
        agent=getattr(self._agent, "agent_id", "unknown"),
        model=self._current_model,
        engine=self._current_engine,
        steps=list(self._current_steps),
        result=result.content,
        started_at=started_at,
        ended_at=ended_at,
    )
    # Recompute totals from steps
    for step in trace.steps:
        trace.total_latency_seconds += step.duration_seconds
        trace.total_tokens += step.output.get("tokens", 0)

    if self._store is not None:
        self._store.save(trace)

    if self._bus is not None:
        self._bus.publish(EventType.TRACE_COMPLETE, {"trace": trace})

    return result

TraceStore

TraceStore(db_path: str | Path)

Append-only SQLite store for interaction traces.

Source code in src/openjarvis/traces/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(_CREATE_TRACES)
    self._conn.execute(_CREATE_STEPS)
    self._conn.execute(_CREATE_FTS)
    self._conn.execute(_FTS_SYNC_INSERT)
    self._conn.commit()
Functions
save
save(trace: Trace) -> None

Persist a complete trace with all its steps.

Source code in src/openjarvis/traces/store.py
def save(self, trace: Trace) -> None:
    """Persist a complete trace with all its steps."""
    self._conn.execute(
        _INSERT_TRACE,
        (
            trace.trace_id,
            trace.query,
            trace.agent,
            trace.model,
            trace.engine,
            trace.result,
            trace.outcome,
            trace.feedback,
            trace.started_at,
            trace.ended_at,
            trace.total_tokens,
            trace.total_latency_seconds,
            json.dumps(trace.metadata),
        ),
    )
    for idx, step in enumerate(trace.steps):
        self._conn.execute(
            _INSERT_STEP,
            (
                trace.trace_id,
                idx,
                step.step_type.value
                if isinstance(step.step_type, StepType)
                else step.step_type,
                step.timestamp,
                step.duration_seconds,
                json.dumps(step.input),
                json.dumps(step.output),
                json.dumps(step.metadata),
            ),
        )
    self._conn.commit()
get
get(trace_id: str) -> Optional[Trace]

Retrieve a trace by id, or None if not found.

Source code in src/openjarvis/traces/store.py
def get(self, trace_id: str) -> Optional[Trace]:
    """Retrieve a trace by id, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT * FROM traces WHERE trace_id = ?", (trace_id,)
    ).fetchone()
    if row is None:
        return None
    return self._row_to_trace(row)
list_traces
list_traces(*, agent: Optional[str] = None, model: Optional[str] = None, outcome: Optional[str] = None, since: Optional[float] = None, until: Optional[float] = None, limit: int = 100) -> List[Trace]

Query traces with optional filters.

Source code in src/openjarvis/traces/store.py
def list_traces(
    self,
    *,
    agent: Optional[str] = None,
    model: Optional[str] = None,
    outcome: Optional[str] = None,
    since: Optional[float] = None,
    until: Optional[float] = None,
    limit: int = 100,
) -> List[Trace]:
    """Query traces with optional filters."""
    clauses: List[str] = []
    params: List[Any] = []
    if agent is not None:
        clauses.append("agent = ?")
        params.append(agent)
    if model is not None:
        clauses.append("model = ?")
        params.append(model)
    if outcome is not None:
        clauses.append("outcome = ?")
        params.append(outcome)
    if since is not None:
        clauses.append("started_at >= ?")
        params.append(since)
    if until is not None:
        clauses.append("started_at <= ?")
        params.append(until)
    where = " AND ".join(clauses) if clauses else "1=1"
    sql = f"SELECT * FROM traces WHERE {where} ORDER BY started_at DESC LIMIT ?"
    params.append(limit)
    rows = self._conn.execute(sql, params).fetchall()
    return [self._row_to_trace(r) for r in rows]
count
count() -> int

Return the total number of stored traces.

Source code in src/openjarvis/traces/store.py
def count(self) -> int:
    """Return the total number of stored traces."""
    row = self._conn.execute("SELECT COUNT(*) FROM traces").fetchone()
    return row[0] if row else 0
search
search(query: str, *, agent: str | None = None, limit: int = 20) -> list[dict[str, Any]]

Full-text search across traces. Optionally filter by agent.

Source code in src/openjarvis/traces/store.py
def search(
    self,
    query: str,
    *,
    agent: str | None = None,
    limit: int = 20,
) -> list[dict[str, Any]]:
    """Full-text search across traces. Optionally filter by agent."""
    sql = (
        "SELECT t.trace_id, t.query, t.result, t.agent, t.model, t.outcome,"
        " t.started_at "
        "FROM traces_fts f JOIN traces t ON f.rowid = t.rowid "
        "WHERE traces_fts MATCH ?"
    )
    params: list[Any] = [query]
    if agent:
        sql += " AND t.agent = ?"
        params.append(agent)
    sql += " ORDER BY rank LIMIT ?"
    params.append(limit)
    rows = self._conn.execute(sql, params).fetchall()
    return [
        {
            "trace_id": r[0], "query": r[1], "result": r[2],
            "agent": r[3], "model": r[4], "outcome": r[5], "started_at": r[6],
        }
        for r in rows
    ]
subscribe_to_bus
subscribe_to_bus(bus: EventBus) -> None

Subscribe to TRACE_COMPLETE events on bus.

Source code in src/openjarvis/traces/store.py
def subscribe_to_bus(self, bus: EventBus) -> None:
    """Subscribe to ``TRACE_COMPLETE`` events on *bus*."""
    bus.subscribe(EventType.TRACE_COMPLETE, self._on_event)
update_feedback
update_feedback(trace_id: str, score: float) -> bool

Update the feedback score for a trace.

Returns True if the trace was found and updated, False otherwise.

Source code in src/openjarvis/traces/store.py
def update_feedback(self, trace_id: str, score: float) -> bool:
    """Update the feedback score for a trace.

    Returns True if the trace was found and updated, False otherwise.
    """
    cursor = self._conn.execute(
        "UPDATE traces SET feedback = ? WHERE trace_id = ?",
        (score, trace_id),
    )
    self._conn.commit()
    return cursor.rowcount > 0
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/traces/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()