Skip to content

collector

collector

TraceCollector — wraps any BaseAgent to record interaction traces.

Classes

TraceCollector

TraceCollector(agent: BaseAgent, *, store: Optional[TraceStore] = None, bus: Optional[EventBus] = None)

Wraps a BaseAgent and records a :class:Trace for every run().

The collector subscribes to the EventBus to capture inference, tool, and memory events emitted during agent execution, converting them into TraceStep objects. When the agent finishes, the complete Trace is persisted to the TraceStore and published on the bus.

Usage::

agent = OrchestratorAgent(engine, model, tools=tools, bus=bus)
collector = TraceCollector(agent, store=trace_store, bus=bus)
result = collector.run("What is 2+2?")
# Trace is automatically saved to trace_store
Source code in src/openjarvis/traces/collector.py
def __init__(
    self,
    agent: BaseAgent,
    *,
    store: Optional[TraceStore] = None,
    bus: Optional[EventBus] = None,
) -> None:
    self._agent = agent
    self._store = store
    self._bus = bus
    self._current_steps: list[TraceStep] = []
    self._current_model: str = ""
    self._current_engine: str = ""
Attributes
last_trace property
last_trace: Optional[Trace]

Return the trace from the most recent run(), if available.

Functions
run
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Execute the wrapped agent and record a trace.

Source code in src/openjarvis/traces/collector.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Execute the wrapped agent and record a trace."""
    self._current_steps = []
    self._current_model = ""
    self._current_engine = ""

    # Subscribe to events for trace collection
    unsubs = self._subscribe()

    started_at = time.time()
    try:
        result = self._agent.run(input, context=context, **kwargs)
    finally:
        self._unsubscribe(unsubs)

    ended_at = time.time()

    # Add final respond step
    self._current_steps.append(
        TraceStep(
            step_type=StepType.RESPOND,
            timestamp=ended_at,
            duration_seconds=0.0,
            output={"content": result.content, "turns": result.turns},
        )
    )

    # Build and persist the trace
    trace = Trace(
        query=input,
        agent=getattr(self._agent, "agent_id", "unknown"),
        model=self._current_model,
        engine=self._current_engine,
        steps=list(self._current_steps),
        result=result.content,
        started_at=started_at,
        ended_at=ended_at,
    )
    # Recompute totals from steps
    for step in trace.steps:
        trace.total_latency_seconds += step.duration_seconds
        trace.total_tokens += step.output.get("tokens", 0)

    if self._store is not None:
        self._store.save(trace)

    if self._bus is not None:
        self._bus.publish(EventType.TRACE_COMPLETE, {"trace": trace})

    return result