collector
collector
¶
TraceCollector — wraps any BaseAgent to record interaction traces.
Classes¶
TraceCollector
¶
TraceCollector(agent: BaseAgent, *, store: Optional[TraceStore] = None, bus: Optional[EventBus] = None)
Wraps a BaseAgent and records a :class:Trace for every run().
The collector subscribes to the EventBus to capture inference, tool,
and memory events emitted during agent execution, converting them into
TraceStep objects. When the agent finishes, the complete Trace
is persisted to the TraceStore and published on the bus.
Enhanced to capture full model response content, tool call arguments and results, and the complete conversation message history.
Usage::
agent = OrchestratorAgent(engine, model, tools=tools, bus=bus)
collector = TraceCollector(agent, store=trace_store, bus=bus)
result = collector.run("What is 2+2?")
trace = collector.last_trace # Rich trace with steps + messages
Source code in src/openjarvis/traces/collector.py
Attributes¶
Functions¶
run
¶
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult
Execute the wrapped agent and record a trace.
Source code in src/openjarvis/traces/collector.py
Functions¶
record_response_trace
¶
record_response_trace(store: Optional[TraceStore], *, query: str, result: str, model: str = '', engine: str = '', agent: str = 'server', started_at: float, ended_at: float) -> Optional[Trace]
Persist a minimal single-step Trace for a non-agent response.
The streaming SSE and WebSocket chat paths stream straight from the
engine, bypassing the agent (and therefore TraceCollector). They call
this so those interactions still land in traces.db — otherwise streamed
chats, which are the desktop GUI's main path, would never produce traces.
Best-effort: returns the saved Trace or None (when store is
None or persistence raised), and never propagates an exception into the
caller's response path.