Skip to content

collector

collector

TraceCollector — wraps any BaseAgent to record interaction traces.

Classes

TraceCollector

TraceCollector(agent: BaseAgent, *, store: Optional[TraceStore] = None, bus: Optional[EventBus] = None)

Wraps a BaseAgent and records a :class:Trace for every run().

The collector subscribes to the EventBus to capture inference, tool, and memory events emitted during agent execution, converting them into TraceStep objects. When the agent finishes, the complete Trace is persisted to the TraceStore and published on the bus.

Enhanced to capture full model response content, tool call arguments and results, and the complete conversation message history.

Usage::

agent = OrchestratorAgent(engine, model, tools=tools, bus=bus)
collector = TraceCollector(agent, store=trace_store, bus=bus)
result = collector.run("What is 2+2?")
trace = collector.last_trace  # Rich trace with steps + messages
Source code in src/openjarvis/traces/collector.py
def __init__(
    self,
    agent: BaseAgent,
    *,
    store: Optional[TraceStore] = None,
    bus: Optional[EventBus] = None,
) -> None:
    self._agent = agent
    self._store = store
    self._bus = bus
    self._current_steps: list[TraceStep] = []
    self._current_model: str = ""
    self._current_engine: str = ""
    self._last_trace: Optional[Trace] = None
Attributes
last_trace property
last_trace: Optional[Trace]

Return the trace from the most recent run().

Functions
run
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Execute the wrapped agent and record a trace.

Source code in src/openjarvis/traces/collector.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Execute the wrapped agent and record a trace."""
    self._current_steps = []
    self._current_model = ""
    self._current_engine = ""

    # Subscribe to events for trace collection
    unsubs = self._subscribe()

    started_at = time.time()
    try:
        result = self._agent.run(input, context=context, **kwargs)
    finally:
        self._unsubscribe(unsubs)

    ended_at = time.time()

    # Add final respond step
    self._current_steps.append(
        TraceStep(
            step_type=StepType.RESPOND,
            timestamp=ended_at,
            duration_seconds=0.0,
            output={"content": result.content, "turns": result.turns},
        )
    )

    # Extract messages from agent result metadata
    messages: List[Dict[str, Any]] = result.metadata.get("messages", [])

    # Build and persist the trace
    trace = Trace(
        query=input,
        agent=getattr(self._agent, "agent_id", "unknown"),
        model=self._current_model,
        engine=self._current_engine,
        steps=list(self._current_steps),
        result=result.content,
        messages=messages,
        started_at=started_at,
        ended_at=ended_at,
    )
    # Recompute totals from steps
    for step in trace.steps:
        trace.total_latency_seconds += step.duration_seconds
        trace.total_tokens += step.output.get("tokens", 0)

    self._last_trace = trace

    if self._store is not None:
        self._store.save(trace)

    if self._bus is not None:
        self._bus.publish(EventType.TRACE_COMPLETE, {"trace": trace})

    return result

Functions

record_response_trace

record_response_trace(store: Optional[TraceStore], *, query: str, result: str, model: str = '', engine: str = '', agent: str = 'server', started_at: float, ended_at: float) -> Optional[Trace]

Persist a minimal single-step Trace for a non-agent response.

The streaming SSE and WebSocket chat paths stream straight from the engine, bypassing the agent (and therefore TraceCollector). They call this so those interactions still land in traces.db — otherwise streamed chats, which are the desktop GUI's main path, would never produce traces.

Best-effort: returns the saved Trace or None (when store is None or persistence raised), and never propagates an exception into the caller's response path.

Source code in src/openjarvis/traces/collector.py
def record_response_trace(
    store: Optional[TraceStore],
    *,
    query: str,
    result: str,
    model: str = "",
    engine: str = "",
    agent: str = "server",
    started_at: float,
    ended_at: float,
) -> Optional[Trace]:
    """Persist a minimal single-step ``Trace`` for a non-agent response.

    The streaming SSE and WebSocket chat paths stream straight from the
    engine, bypassing the agent (and therefore ``TraceCollector``). They call
    this so those interactions still land in ``traces.db`` — otherwise streamed
    chats, which are the desktop GUI's main path, would never produce traces.

    Best-effort: returns the saved ``Trace`` or ``None`` (when *store* is
    ``None`` or persistence raised), and never propagates an exception into the
    caller's response path.
    """
    if store is None:
        return None
    try:
        duration = max(0.0, ended_at - started_at)
        trace = Trace(
            query=query,
            agent=agent,
            model=model,
            engine=engine,
            result=result,
            started_at=started_at,
            ended_at=ended_at,
            steps=[
                TraceStep(
                    step_type=StepType.RESPOND,
                    timestamp=ended_at,
                    duration_seconds=duration,
                    output={"content": result},
                )
            ],
        )
        trace.total_latency_seconds = duration
        store.save(trace)
        return trace
    except Exception:
        import logging

        logging.getLogger("openjarvis.traces").debug(
            "record_response_trace failed", exc_info=True
        )
        return None