Skip to content

executor

executor

AgentExecutor — runs a single agent tick.

Classes

AgentExecutor

AgentExecutor(manager: AgentManager, event_bus: EventBus, system: Any = None, trace_store: Any = None)

Executes a single tick for a managed agent.

Constructor receives a JarvisSystem reference for access to engine, tools, config, memory backends, and all other primitives.

Source code in src/openjarvis/agents/executor.py
def __init__(
    self,
    manager: AgentManager,
    event_bus: EventBus,
    system: Any = None,
    trace_store: Any = None,
) -> None:
    self._system = system
    self._manager = manager
    self._bus = event_bus
    self._trace_store = trace_store
Functions
set_system
set_system(system: Any) -> None

Deferred system injection — called after JarvisSystem is constructed.

Source code in src/openjarvis/agents/executor.py
def set_system(self, system: Any) -> None:
    """Deferred system injection — called after JarvisSystem is constructed."""
    self._system = system
execute_tick
execute_tick(agent_id: str) -> None

Run one tick for the given agent.

  1. Acquire concurrency guard (start_tick)
  2. Invoke agent with retry logic
  3. Update stats
  4. Release guard (end_tick)
Source code in src/openjarvis/agents/executor.py
def execute_tick(self, agent_id: str) -> None:
    """Run one tick for the given agent.

    1. Acquire concurrency guard (start_tick)
    2. Invoke agent with retry logic
    3. Update stats
    4. Release guard (end_tick)
    """
    try:
        self._manager.start_tick(agent_id)
    except ValueError:
        logger.warning("Agent %s already running, skipping tick", agent_id)
        return

    agent = self._manager.get_agent(agent_id)
    if agent is None:
        logger.error("Agent %s not found", agent_id)
        return

    self._bus.publish(EventType.AGENT_TICK_START, {
        "agent_id": agent_id,
        "agent_name": agent["name"],
    })

    # Activity tracking: subscribe to tool/inference events
    def _on_activity(event: Any) -> None:
        if event.data.get("agent") == agent_id:
            self._manager.update_agent(agent_id, last_activity_at=time.time())

    self._bus.subscribe(EventType.TOOL_CALL_START, _on_activity)
    self._bus.subscribe(EventType.INFERENCE_START, _on_activity)

    # Trace recording: collect tool call steps
    trace_steps: list[dict[str, Any]] = []

    def _on_tool_start(event: Any) -> None:
        if event.data.get("agent") == agent_id:
            trace_steps.append({
                "type": "tool_call",
                "input": {
                    "tool": event.data.get("tool"),
                    "args": event.data.get("args"),
                },
                "start_time": event.timestamp,
            })

    def _on_tool_end(event: Any) -> None:
        if event.data.get("agent") == agent_id and trace_steps:
            for step in reversed(trace_steps):
                if step["type"] == "tool_call" and "output" not in step:
                    step["output"] = {
                        "result": str(event.data.get("result", ""))[:4096],
                    }
                    step["duration"] = event.data.get("duration", 0)
                    break

    if self._trace_store:
        self._bus.subscribe(EventType.TOOL_CALL_START, _on_tool_start)
        self._bus.subscribe(EventType.TOOL_CALL_END, _on_tool_end)

    tick_start = time.time()
    result = None
    error_info = None

    try:
        result = self._run_with_retries(agent)
    except AgentTickError as e:
        error_info = e
    finally:
        self._bus.unsubscribe(EventType.TOOL_CALL_START, _on_activity)
        self._bus.unsubscribe(EventType.INFERENCE_START, _on_activity)

        if self._trace_store:
            self._bus.unsubscribe(EventType.TOOL_CALL_START, _on_tool_start)
            self._bus.unsubscribe(EventType.TOOL_CALL_END, _on_tool_end)

        tick_duration = time.time() - tick_start
        self._finalize_tick(agent_id, result, error_info, tick_duration)

        if self._trace_store:
            self._save_trace(
                agent_id, agent, result, error_info,
                tick_start, tick_duration, trace_steps,
            )

Functions