Skip to content

executor

executor

AgentExecutor — runs a single agent tick.

Classes

AgentExecutor

AgentExecutor(manager: AgentManager, event_bus: EventBus, system: Any = None, trace_store: Any = None)

Executes a single tick for a managed agent.

Constructor receives a JarvisSystem reference for access to engine, tools, config, memory backends, and all other primitives.

Source code in src/openjarvis/agents/executor.py
def __init__(
    self,
    manager: AgentManager,
    event_bus: EventBus,
    system: Any = None,
    trace_store: Any = None,
) -> None:
    self._system = system
    self._manager = manager
    self._bus = event_bus
    self._trace_store = trace_store
Functions
set_system
set_system(system: Any) -> None

Deferred system injection — called after JarvisSystem is constructed.

Source code in src/openjarvis/agents/executor.py
def set_system(self, system: Any) -> None:
    """Deferred system injection — called after JarvisSystem is constructed."""
    self._system = system
run_ephemeral
run_ephemeral(agent_type: str, system_prompt: str, input_text: str, tools: list[str] | None = None) -> Any

Run a one-shot agent turn with no lifecycle tracking.

Source code in src/openjarvis/agents/executor.py
def run_ephemeral(
    self,
    agent_type: str,
    system_prompt: str,
    input_text: str,
    tools: list[str] | None = None,
) -> Any:
    """Run a one-shot agent turn with no lifecycle tracking."""
    from openjarvis.core.registry import AgentRegistry

    agent_cls = AgentRegistry.get(agent_type)
    agent = agent_cls(
        engine=getattr(self._manager, "_engine", None),
        system_prompt=system_prompt,
        bus=self._bus,
    )
    return agent.run(input_text)
execute_tick
execute_tick(agent_id: str) -> None

Run one tick for the given agent.

  1. Acquire concurrency guard (start_tick)
  2. Invoke agent with retry logic
  3. Update stats
  4. Release guard (end_tick)
Source code in src/openjarvis/agents/executor.py
def execute_tick(self, agent_id: str) -> None:
    """Run one tick for the given agent.

    1. Acquire concurrency guard (start_tick)
    2. Invoke agent with retry logic
    3. Update stats
    4. Release guard (end_tick)
    """
    try:
        self._manager.start_tick(agent_id)
        self._set_activity(agent_id, "Preparing tick...")
    except ValueError:
        logger.warning("Agent %s already running, skipping tick", agent_id)
        return

    agent = self._manager.get_agent(agent_id)
    if agent is None:
        logger.error("Agent %s not found", agent_id)
        return

    self._bus.publish(
        EventType.AGENT_TICK_START,
        {
            "agent_id": agent_id,
            "agent_name": agent["name"],
        },
    )

    # Activity tracking: subscribe to tool/inference events
    def _on_activity(event: Any) -> None:
        if event.data.get("agent") == agent_id:
            self._manager.update_agent(agent_id, last_activity_at=time.time())

    self._bus.subscribe(EventType.TOOL_CALL_START, _on_activity)
    self._bus.subscribe(EventType.INFERENCE_START, _on_activity)

    # Trace recording: collect tool call steps
    trace_steps: list[dict[str, Any]] = []

    def _on_tool_start(event: Any) -> None:
        if event.data.get("agent") == agent_id:
            trace_steps.append(
                {
                    "type": "tool_call",
                    "input": {
                        "tool": event.data.get("tool"),
                        "args": event.data.get("args"),
                    },
                    "start_time": event.timestamp,
                }
            )

    def _on_tool_end(event: Any) -> None:
        if event.data.get("agent") == agent_id and trace_steps:
            for step in reversed(trace_steps):
                if step["type"] == "tool_call" and "output" not in step:
                    step["output"] = {
                        "result": str(event.data.get("result", ""))[:4096],
                    }
                    step["duration"] = event.data.get("duration", 0)
                    break

    if self._trace_store:
        self._bus.subscribe(EventType.TOOL_CALL_START, _on_tool_start)
        self._bus.subscribe(EventType.TOOL_CALL_END, _on_tool_end)

    tick_start = time.time()
    result = None
    error_info = None

    try:
        result = self._run_with_retries(agent)
    except AgentTickError as e:
        error_info = e
    finally:
        self._bus.unsubscribe(EventType.TOOL_CALL_START, _on_activity)
        self._bus.unsubscribe(EventType.INFERENCE_START, _on_activity)

        if self._trace_store:
            self._bus.unsubscribe(EventType.TOOL_CALL_START, _on_tool_start)
            self._bus.unsubscribe(EventType.TOOL_CALL_END, _on_tool_end)

        tick_duration = time.time() - tick_start
        self._finalize_tick(agent_id, result, error_info, tick_duration)

        if self._trace_store:
            self._save_trace(
                agent_id,
                agent,
                result,
                error_info,
                tick_start,
                tick_duration,
                trace_steps,
            )

Functions