Wraps a BaseAgent and records a :class:Trace for every run().
The collector subscribes to the EventBus to capture inference, tool,
and memory events emitted during agent execution, converting them into
TraceStep objects. When the agent finishes, the complete Trace
is persisted to the TraceStore and published on the bus.
Enhanced to capture full model response content, tool call arguments and
results, and the complete conversation message history.
Usage::
agent = OrchestratorAgent(engine, model, tools=tools, bus=bus)
collector = TraceCollector(agent, store=trace_store, bus=bus)
result = collector.run("What is 2+2?")
trace = collector.last_trace # Rich trace with steps + messages
Source code in src/openjarvis/traces/collector.py
| def __init__(
self,
agent: BaseAgent,
*,
store: Optional[TraceStore] = None,
bus: Optional[EventBus] = None,
) -> None:
self._agent = agent
self._store = store
self._bus = bus
self._current_steps: list[TraceStep] = []
self._current_model: str = ""
self._current_engine: str = ""
self._last_trace: Optional[Trace] = None
|
Attributes
last_trace
property
last_trace: Optional[Trace]
Return the trace from the most recent run().
Functions
run
Execute the wrapped agent and record a trace.
Source code in src/openjarvis/traces/collector.py
| def run(
self,
input: str,
context: Optional[AgentContext] = None,
**kwargs: Any,
) -> AgentResult:
"""Execute the wrapped agent and record a trace."""
self._current_steps = []
self._current_model = ""
self._current_engine = ""
# Subscribe to events for trace collection
unsubs = self._subscribe()
started_at = time.time()
try:
result = self._agent.run(input, context=context, **kwargs)
finally:
self._unsubscribe(unsubs)
ended_at = time.time()
# Add final respond step
self._current_steps.append(
TraceStep(
step_type=StepType.RESPOND,
timestamp=ended_at,
duration_seconds=0.0,
output={"content": result.content, "turns": result.turns},
)
)
# Extract messages from agent result metadata
messages: List[Dict[str, Any]] = result.metadata.get("messages", [])
# Build and persist the trace
trace = Trace(
query=input,
agent=getattr(self._agent, "agent_id", "unknown"),
model=self._current_model,
engine=self._current_engine,
steps=list(self._current_steps),
result=result.content,
messages=messages,
started_at=started_at,
ended_at=ended_at,
)
# Recompute totals from steps
for step in trace.steps:
trace.total_latency_seconds += step.duration_seconds
trace.total_tokens += step.output.get("tokens", 0)
self._last_trace = trace
if self._store is not None:
self._store.save(trace)
if self._bus is not None:
self._bus.publish(EventType.TRACE_COMPLETE, {"trace": trace})
return result
|