Skip to content

operative

operative

OperativeAgent — persistent, scheduled agent for autonomous operation.

Extends ToolUsingAgent with built-in session persistence and state recall. Designed for Operators: autonomous agents that run on a schedule with automatic state management between ticks.

Classes

OperativeAgent

OperativeAgent(engine: InferenceEngine, model: str, *, tools: Optional[List[BaseTool]] = None, bus: Optional[EventBus] = None, max_turns: Optional[int] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, system_prompt: Optional[str] = None, operator_id: Optional[str] = None, session_store: Optional[Any] = None, memory_backend: Optional[Any] = None, interactive: bool = False, confirm_callback=None, **kwargs: Any)

Bases: ToolUsingAgent

Persistent autonomous agent with built-in state management.

The Operative agent extends the standard tool-calling loop with:

  1. Session loading — restores conversation history from previous ticks.
  2. State recall — retrieves previous state JSON from memory backend.
  3. System prompt — injects the operator's protocol instructions.
  4. Tool loop — standard function-calling loop (same as Orchestrator).
  5. Session save — persists the tick's prompt and response.
  6. State persistence — auto-persists state if the agent didn't do it explicitly via memory_store tool.
Source code in src/openjarvis/agents/operative.py
def __init__(
    self,
    engine: InferenceEngine,
    model: str,
    *,
    tools: Optional[List[BaseTool]] = None,
    bus: Optional[EventBus] = None,
    max_turns: Optional[int] = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    system_prompt: Optional[str] = None,
    operator_id: Optional[str] = None,
    session_store: Optional[Any] = None,
    memory_backend: Optional[Any] = None,
    interactive: bool = False,
    confirm_callback=None,
    **kwargs: Any,
) -> None:
    super().__init__(
        engine,
        model,
        tools=tools,
        bus=bus,
        max_turns=max_turns,
        temperature=temperature,
        max_tokens=max_tokens,
        interactive=interactive,
        confirm_callback=confirm_callback,
    )
    self._system_prompt = system_prompt or ""
    self._operator_id = operator_id
    self._session_store = session_store
    self._memory_backend = memory_backend
Functions
run
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Execute a single operator tick.

Source code in src/openjarvis/agents/operative.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Execute a single operator tick."""
    self._emit_turn_start(input)

    # 1. Build system prompt with state context
    sys_parts: list[str] = []
    if self._system_prompt:
        sys_parts.append(self._system_prompt)

    # 2. State recall from memory backend
    previous_state = self._recall_state()
    if previous_state:
        sys_parts.append(f"\n## Previous State\n{previous_state}")

    system_prompt = "\n\n".join(sys_parts) if sys_parts else None

    # 3. Load session history
    session_messages = self._load_session()

    # 4. Build messages
    messages = self._build_operative_messages(
        input,
        context,
        system_prompt=system_prompt,
        session_messages=session_messages,
    )

    # 5. Run function-calling tool loop
    openai_tools = self._executor.get_openai_tools() if self._tools else []
    all_tool_results: list[ToolResult] = []
    turns = 0
    content = ""
    state_stored_by_tool = False
    total_usage: dict[str, int] = {
        "prompt_tokens": 0,
        "completion_tokens": 0,
        "total_tokens": 0,
    }

    for _turn in range(self._max_turns):
        turns += 1

        if self._loop_guard:
            messages = self._loop_guard.compress_context(messages)

        gen_kwargs: dict[str, Any] = {}
        if openai_tools:
            gen_kwargs["tools"] = openai_tools

        result = self._generate(messages, **gen_kwargs)
        usage = result.get("usage", {})
        for k in total_usage:
            total_usage[k] += usage.get(k, 0)
        content = result.get("content", "")
        raw_tool_calls = result.get("tool_calls", [])

        if not raw_tool_calls:
            content = self._check_continuation(result, messages)
            break

        tool_calls = [
            ToolCall(
                id=tc.get("id", f"call_{i}"),
                name=tc.get("name", ""),
                arguments=tc.get("arguments", "{}"),
            )
            for i, tc in enumerate(raw_tool_calls)
        ]

        messages.append(
            Message(
                role=Role.ASSISTANT,
                content=content,
                tool_calls=tool_calls,
            )
        )

        for tc in tool_calls:
            # Loop guard check
            if self._loop_guard:
                verdict = self._loop_guard.check_call(tc.name, tc.arguments)
                if verdict.blocked:
                    tool_result = ToolResult(
                        tool_name=tc.name,
                        content=f"Loop guard: {verdict.reason}",
                        success=False,
                    )
                    all_tool_results.append(tool_result)
                    messages.append(
                        Message(
                            role=Role.TOOL,
                            content=tool_result.content,
                            tool_call_id=tc.id,
                            name=tc.name,
                        )
                    )
                    continue

            tool_result = self._executor.execute(tc)
            all_tool_results.append(tool_result)

            # Track if agent stored state via memory_store
            if tc.name == "memory_store" and self._operator_id:
                try:
                    args = json.loads(tc.arguments)
                    state_key = f"operator:{self._operator_id}:state"
                    if args.get("key", "") == state_key:
                        state_stored_by_tool = True
                except (json.JSONDecodeError, TypeError):
                    pass

            messages.append(
                Message(
                    role=Role.TOOL,
                    content=tool_result.content,
                    tool_call_id=tc.id,
                    name=tc.name,
                )
            )
    else:
        # Max turns exceeded
        self._save_session(input, content)
        meta = dict(total_usage)
        meta["max_turns_exceeded"] = True
        return AgentResult(
            content=content or "Maximum turns reached without a final answer.",
            tool_results=all_tool_results,
            turns=turns,
            metadata=meta,
        )

    # 6. Save session
    self._save_session(input, content)

    # 7. Auto-persist state if agent didn't do it explicitly
    if not state_stored_by_tool:
        self._auto_persist_state(content)

    self._emit_turn_end(turns=turns, content_length=len(content))
    return AgentResult(
        content=content,
        tool_results=all_tool_results,
        turns=turns,
        metadata=total_usage,
    )