def execute_tick(self, agent_id: str) -> None:
"""Run one tick for the given agent.
1. Acquire concurrency guard (start_tick)
2. Invoke agent with retry logic
3. Update stats
4. Release guard (end_tick)
"""
try:
self._manager.start_tick(agent_id)
except ValueError:
logger.warning("Agent %s already running, skipping tick", agent_id)
return
agent = self._manager.get_agent(agent_id)
if agent is None:
logger.error("Agent %s not found", agent_id)
return
self._bus.publish(EventType.AGENT_TICK_START, {
"agent_id": agent_id,
"agent_name": agent["name"],
})
# Activity tracking: subscribe to tool/inference events
def _on_activity(event: Any) -> None:
if event.data.get("agent") == agent_id:
self._manager.update_agent(agent_id, last_activity_at=time.time())
self._bus.subscribe(EventType.TOOL_CALL_START, _on_activity)
self._bus.subscribe(EventType.INFERENCE_START, _on_activity)
# Trace recording: collect tool call steps
trace_steps: list[dict[str, Any]] = []
def _on_tool_start(event: Any) -> None:
if event.data.get("agent") == agent_id:
trace_steps.append({
"type": "tool_call",
"input": {
"tool": event.data.get("tool"),
"args": event.data.get("args"),
},
"start_time": event.timestamp,
})
def _on_tool_end(event: Any) -> None:
if event.data.get("agent") == agent_id and trace_steps:
for step in reversed(trace_steps):
if step["type"] == "tool_call" and "output" not in step:
step["output"] = {
"result": str(event.data.get("result", ""))[:4096],
}
step["duration"] = event.data.get("duration", 0)
break
if self._trace_store:
self._bus.subscribe(EventType.TOOL_CALL_START, _on_tool_start)
self._bus.subscribe(EventType.TOOL_CALL_END, _on_tool_end)
tick_start = time.time()
result = None
error_info = None
try:
result = self._run_with_retries(agent)
except AgentTickError as e:
error_info = e
finally:
self._bus.unsubscribe(EventType.TOOL_CALL_START, _on_activity)
self._bus.unsubscribe(EventType.INFERENCE_START, _on_activity)
if self._trace_store:
self._bus.unsubscribe(EventType.TOOL_CALL_START, _on_tool_start)
self._bus.unsubscribe(EventType.TOOL_CALL_END, _on_tool_end)
tick_duration = time.time() - tick_start
self._finalize_tick(agent_id, result, error_info, tick_duration)
if self._trace_store:
self._save_trace(
agent_id, agent, result, error_info,
tick_start, tick_duration, trace_steps,
)