Skip to content

monitor_operative

monitor_operative

MonitorOperativeAgent -- long-horizon agent with configurable strategies.

Extends ToolUsingAgent (not OperativeAgent) with four configurable strategy axes for long-horizon benchmark evaluation:

  1. memory_extraction -- how findings are persisted to memory
  2. observation_compression -- how tool outputs are compressed
  3. retrieval_strategy -- how prior context is recalled
  4. task_decomposition -- how complex tasks are split

The agent also inherits cross-session state persistence from the OperativeAgent pattern (session_store, memory_backend, operator_id).

Classes

MonitorOperativeAgent

MonitorOperativeAgent(engine: InferenceEngine, model: str, *, tools: Optional[List[BaseTool]] = None, bus: Optional[EventBus] = None, max_turns: int = 25, temperature: float = 0.3, max_tokens: int = 4096, system_prompt: Optional[str] = None, memory_extraction: str = 'causality_graph', observation_compression: str = 'summarize', retrieval_strategy: str = 'hybrid_with_self_eval', task_decomposition: str = 'phased', operator_id: Optional[str] = None, session_store: Optional[Any] = None, memory_backend: Optional[Any] = None, **kwargs: Any)

Bases: ToolUsingAgent

Long-horizon agent with configurable memory, compression, retrieval, and decomposition strategies.

The four strategy axes control how the agent manages information across turns and sessions:

  • memory_extraction: How findings are persisted (causality_graph, scratchpad, structured_json, none).
  • observation_compression: How tool outputs are compressed before being added to context (summarize, truncate, none).
  • retrieval_strategy: How prior context is recalled at the start of each run (hybrid_with_self_eval, keyword, semantic, none).
  • task_decomposition: How complex tasks are broken down (phased, monolithic, hierarchical).
Source code in src/openjarvis/agents/monitor_operative.py
def __init__(
    self,
    engine: InferenceEngine,
    model: str,
    *,
    tools: Optional[List[BaseTool]] = None,
    bus: Optional[EventBus] = None,
    max_turns: int = 25,
    temperature: float = 0.3,
    max_tokens: int = 4096,
    system_prompt: Optional[str] = None,
    # Strategy parameters
    memory_extraction: str = "causality_graph",
    observation_compression: str = "summarize",
    retrieval_strategy: str = "hybrid_with_self_eval",
    task_decomposition: str = "phased",
    # State persistence (OperativeAgent pattern)
    operator_id: Optional[str] = None,
    session_store: Optional[Any] = None,
    memory_backend: Optional[Any] = None,
    **kwargs: Any,
) -> None:
    super().__init__(
        engine, model, tools=tools, bus=bus,
        max_turns=max_turns, temperature=temperature,
        max_tokens=max_tokens,
    )
    # Validate strategies
    if memory_extraction not in VALID_MEMORY_EXTRACTION:
        raise ValueError(
            f"Invalid memory_extraction {memory_extraction!r}, "
            f"must be one of {VALID_MEMORY_EXTRACTION}"
        )
    if observation_compression not in VALID_OBSERVATION_COMPRESSION:
        raise ValueError(
            f"Invalid observation_compression {observation_compression!r}, "
            f"must be one of {VALID_OBSERVATION_COMPRESSION}"
        )
    if retrieval_strategy not in VALID_RETRIEVAL_STRATEGY:
        raise ValueError(
            f"Invalid retrieval_strategy {retrieval_strategy!r}, "
            f"must be one of {VALID_RETRIEVAL_STRATEGY}"
        )
    if task_decomposition not in VALID_TASK_DECOMPOSITION:
        raise ValueError(
            f"Invalid task_decomposition {task_decomposition!r}, "
            f"must be one of {VALID_TASK_DECOMPOSITION}"
        )

    self._memory_extraction = memory_extraction
    self._observation_compression = observation_compression
    self._retrieval_strategy = retrieval_strategy
    self._task_decomposition = task_decomposition

    self._system_prompt = system_prompt
    self._operator_id = operator_id
    self._session_store = session_store
    self._memory_backend = memory_backend
Functions
run
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Execute the agent on input with the configured strategies.

Source code in src/openjarvis/agents/monitor_operative.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Execute the agent on *input* with the configured strategies."""
    self._emit_turn_start(input)

    # 1. Build system prompt with state context
    sys_parts: list[str] = []
    if self._system_prompt:
        sys_parts.append(self._system_prompt)
    else:
        tool_desc = self._build_tool_descriptions()
        try:
            sys_parts.append(
                MONITOR_OPERATIVE_SYSTEM_PROMPT.format(
                    memory_extraction=self._memory_extraction,
                    observation_compression=self._observation_compression,
                    retrieval_strategy=self._retrieval_strategy,
                    task_decomposition=self._task_decomposition,
                    tool_descriptions=tool_desc,
                ),
            )
        except KeyError:
            sys_parts.append(MONITOR_OPERATIVE_SYSTEM_PROMPT)

    # 2. State recall from memory backend
    previous_state = self._recall_state()
    if previous_state:
        sys_parts.append(f"\n## Previous State\n{previous_state}")

    system_prompt = "\n\n".join(sys_parts) if sys_parts else None

    # 3. Load session history
    session_messages = self._load_session()

    # 4. Build messages
    messages = self._build_operative_messages(
        input, context,
        system_prompt=system_prompt,
        session_messages=session_messages,
    )

    # 5. Run function-calling tool loop
    openai_tools = self._executor.get_openai_tools() if self._tools else []
    all_tool_results: list[ToolResult] = []
    turns = 0
    content = ""
    state_stored_by_tool = False

    for _turn in range(self._max_turns):
        turns += 1

        gen_kwargs: dict[str, Any] = {}
        if openai_tools:
            gen_kwargs["tools"] = openai_tools

        result = self._generate(messages, **gen_kwargs)
        content = result.get("content", "")
        raw_tool_calls = result.get("tool_calls", [])

        # No tool calls -> check continuation, then final answer
        if not raw_tool_calls:
            content = self._check_continuation(result, messages)
            break

        # Build ToolCall objects from raw dicts
        tool_calls = [
            ToolCall(
                id=tc.get("id", f"call_{i}"),
                name=tc.get("name", ""),
                arguments=tc.get("arguments", "{}"),
            )
            for i, tc in enumerate(raw_tool_calls)
        ]

        # Append assistant message with tool calls
        messages.append(Message(
            role=Role.ASSISTANT,
            content=content,
            tool_calls=tool_calls,
        ))

        # Execute each tool
        for tc in tool_calls:
            # Loop guard check
            if self._loop_guard:
                verdict = self._loop_guard.check_call(
                    tc.name, tc.arguments,
                )
                if verdict.blocked:
                    tool_result = ToolResult(
                        tool_name=tc.name,
                        content=f"Loop guard: {verdict.reason}",
                        success=False,
                    )
                    all_tool_results.append(tool_result)
                    messages.append(Message(
                        role=Role.TOOL,
                        content=tool_result.content,
                        tool_call_id=tc.id,
                        name=tc.name,
                    ))
                    continue

            tool_result = self._executor.execute(tc)
            all_tool_results.append(tool_result)

            # Track explicit state storage
            if tc.name == "memory_store" and self._operator_id:
                try:
                    args = json.loads(tc.arguments)
                    state_key = f"monitor_operative:{self._operator_id}:state"
                    if args.get("key", "") == state_key:
                        state_stored_by_tool = True
                except (json.JSONDecodeError, TypeError) as exc:
                    logger.debug(
                        "Failed to parse tool call arguments"
                        " for state tracking: %s", exc,
                    )

            # Compress observation if strategy requires it
            observation_content = self._compress_observation(tool_result.content)

            messages.append(Message(
                role=Role.TOOL,
                content=observation_content,
                tool_call_id=tc.id,
                name=tc.name,
            ))

            # Extract and store findings based on memory strategy
            self._extract_and_store(tc.name, tool_result.content)
    else:
        # Max turns exceeded
        self._save_session(input, content)
        return self._max_turns_result(
            all_tool_results, turns, content=content,
        )

    # 6. Save session
    self._save_session(input, content)

    # 7. Auto-persist state if agent didn't do it explicitly
    if not state_stored_by_tool:
        self._auto_persist_state(content)

    self._emit_turn_end(turns=turns, content_length=len(content))
    return AgentResult(
        content=content,
        tool_results=all_tool_results,
        turns=turns,
    )