Skip to content

research_loop

research_loop

Agentic research loop over the hybrid-search tool.

A small, self-contained planner-executor loop:

  • the planner is a local Ollama chat model (default gemma4:31b),
  • the only tool it can call is :meth:HybridSearch.search,
  • it gets up to max_iterations tool calls,
  • tool results are trimmed before re-entering the context window, and
  • the final reply must cite specific hits.

The loop is deliberately decoupled from the rest of the agent scaffolding (ToolUsingAgent, EventBus, AgentContext, etc.) so the surface stays small. Anything that wants tracing or registry integration can wrap it.

Classes

ToolInvocation dataclass

ToolInvocation(arguments: Dict[str, Any], num_results: int = 0, top_titles: List[str] = list(), raw_hits: List[SearchHit] = list(), tool_name: str = 'search', response: str = '')

One tool call together with what the planner asked for and got.

tool_name is "search" or "clarify". For search calls, num_results, top_titles and raw_hits are populated; for clarify calls, response holds the user's answer.

ResearchAgent

ResearchAgent(engine: InferenceEngine, search: HybridSearch, *, model: str = DEFAULT_PLANNER_MODEL, max_iterations: int = 5, temperature: float = 0.3, max_tokens: int = 1500, num_ctx: int = 16384, clarify_handler: Optional[Callable[[str], str]] = None, on_event: Optional[Callable[[Dict[str, Any]], None]] = None, available_sources: Optional[List[str]] = None)

Planner + executor loop over a single hybrid-search tool.

PARAMETER DESCRIPTION
engine

An InferenceEngine that supports OpenAI-style tools in generate (Ollama with a tool-capable model).

TYPE: InferenceEngine

search

The HybridSearch instance the planner can call.

TYPE: HybridSearch

model

Planner model tag (default gemma4:31b).

TYPE: str DEFAULT: DEFAULT_PLANNER_MODEL

max_iterations

Hard ceiling on tool calls before the loop is forced into synthesis.

TYPE: int DEFAULT: 5

temperature

Generation parameters passed through to engine.generate.

TYPE: float DEFAULT: 0.3

max_tokens

Generation parameters passed through to engine.generate.

TYPE: float DEFAULT: 0.3

num_ctx

Generation parameters passed through to engine.generate.

TYPE: float DEFAULT: 0.3

on_event

Optional callback fired at loop milestones so callers (e.g. the SSE research router) can stream progress without rewriting the loop. Receives a dict in one of these shapes: - {"type": "search_call", "arguments": {...}} — about to call search - {"type": "search_result", "num_hits": N, "top_titles": [...], "sources": [{"ref": 1, "title": ..., "sender": ..., "date": ..., "source_id": ..., "url": ...}, ...]} — search returned - {"type": "clarify_call", "question": "..."} — about to ask for clarification - {"type": "clarify_response", "response": "..."} — clarification received - {"type": "final_answer", "text": "..."} — synthesis ready The callback runs on the same thread as run and must be non-blocking.

TYPE: Optional[Callable[[Dict[str, Any]], None]] DEFAULT: None

Source code in src/openjarvis/agents/research_loop.py
def __init__(
    self,
    engine: InferenceEngine,
    search: HybridSearch,
    *,
    model: str = DEFAULT_PLANNER_MODEL,
    max_iterations: int = 5,
    temperature: float = 0.3,
    max_tokens: int = 1500,
    num_ctx: int = 16384,
    clarify_handler: Optional[Callable[[str], str]] = None,
    on_event: Optional[Callable[[Dict[str, Any]], None]] = None,
    available_sources: Optional[List[str]] = None,
) -> None:
    self._engine = engine
    self._search = search
    self._model = model
    self._max_iterations = int(max_iterations)
    self._temperature = float(temperature)
    self._max_tokens = int(max_tokens)
    self._num_ctx = int(num_ctx)
    self._clarify_handler = clarify_handler or _default_clarify_handler
    self._on_event = on_event
    # Explicit list wins; otherwise we'll discover sources from the
    # KnowledgeStore on each run() call so the prompt stays accurate
    # even as the user connects new connectors mid-session.
    self._available_sources_override = available_sources
Functions
run
run(query: str) -> ResearchResult

Run the loop end-to-end and return the synthesis plus a trace.

Source code in src/openjarvis/agents/research_loop.py
def run(self, query: str) -> ResearchResult:
    """Run the loop end-to-end and return the synthesis plus a trace."""
    sources_list = self._resolve_available_sources()
    if sources_list:
        sources_blurb = ", ".join(sources_list)
    else:
        sources_blurb = (
            "(no connected sources — tell the user to connect a "
            "connector before searching)"
        )
    sys_msg = Message(
        role=Role.SYSTEM,
        content=SYSTEM_PROMPT.format(
            today=datetime.now().isoformat(timespec="minutes"),
            available_sources=sources_blurb,
        ),
    )
    messages: List[Message] = [sys_msg, Message(role=Role.USER, content=query)]

    invocations: List[ToolInvocation] = []
    total_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}

    # Global ref counter: each search increments by the number of hits
    # it returned so the planner sees unique refs across calls. The
    # accumulator lets us renumber whatever the synthesis cites at the
    # end into a single deduped client-facing sources list.
    next_ref: int = 1
    ref_to_source: Dict[int, Dict[str, Any]] = {}

    def _finalize(text: str) -> Tuple[str, List[Dict[str, Any]]]:
        return renumber_citations(text, ref_to_source)

    iterations = 0
    for _ in range(self._max_iterations + 1):
        iterations += 1
        tools_arg = (
            [SEARCH_TOOL_SPEC, CLARIFY_TOOL_SPEC]
            if len(invocations) < self._max_iterations
            else None
        )
        result = self._engine.generate(
            messages,
            model=self._model,
            temperature=self._temperature,
            max_tokens=self._max_tokens,
            num_ctx=self._num_ctx,
            tools=tools_arg,
        )
        for k in total_usage:
            total_usage[k] += int(result.get("usage", {}).get(k, 0))

        content = result.get("content", "") or ""
        tool_calls_raw = result.get("tool_calls", []) or []

        if not tool_calls_raw:
            if content.strip():
                answer, final_sources = _finalize(content.strip())
                self._emit(
                    {
                        "type": "final_answer",
                        "text": answer,
                        "sources": final_sources,
                    }
                )
                return ResearchResult(
                    answer=answer,
                    iterations=iterations,
                    tool_calls=invocations,
                    usage=total_usage,
                )
            # Empty content with no tool call — push a synthesis prod
            if invocations:
                messages.append(Message(role=Role.ASSISTANT, content=content))
                messages.append(
                    Message(
                        role=Role.USER,
                        content=(
                            "Write your final answer now based on the search "
                            "results above. Cite sources as [1], [2], etc."
                        ),
                    )
                )
                continue
            fallback = "(model returned no content and no tool calls)"
            self._emit(
                {"type": "final_answer", "text": fallback, "sources": []}
            )
            return ResearchResult(
                answer=fallback,
                iterations=iterations,
                tool_calls=invocations,
                usage=total_usage,
            )

        assistant_msg = Message(
            role=Role.ASSISTANT,
            content=content,
            tool_calls=[
                ToolCall(
                    id=tc.get("id", f"call_{i}"),
                    name=tc.get("name", "search"),
                    arguments=tc.get("arguments", "{}") or "{}",
                )
                for i, tc in enumerate(tool_calls_raw)
            ],
        )
        messages.append(assistant_msg)

        for tc in tool_calls_raw:
            name = tc.get("name", "")
            raw_args = tc.get("arguments", "{}") or "{}"
            try:
                args = json.loads(raw_args) if isinstance(raw_args, str) else dict(raw_args)
            except json.JSONDecodeError:
                args = {}

            if name == "search":
                # Guard against the planner pre-empting clarify before any
                # search has run — silently accept; the rule lives in the
                # system prompt as guidance, not enforcement.
                self._emit({"type": "search_call", "arguments": args})
                inv = self._execute_search(args)
                invocations.append(inv)
                offset = next_ref - 1
                sources_for_search = build_sources_for_client(
                    inv.raw_hits, ref_offset=offset
                )
                self._emit(
                    {
                        "type": "search_result",
                        "num_hits": inv.num_results,
                        "top_titles": inv.top_titles,
                        "sources": sources_for_search,
                    }
                )
                for src in sources_for_search:
                    ref_to_source[int(src["ref"])] = src
                next_ref += len(sources_for_search)
                tool_output = json.dumps(
                    shape_results_for_model(inv.raw_hits, ref_offset=offset),
                    ensure_ascii=False,
                )
            elif name == "clarify":
                # Enforce the "search first" rule at runtime so we don't
                # surprise the user with a clarification before showing any
                # work. If the planner jumps to clarify with no searches
                # behind it, return an error and let the loop try again.
                if not any(i.tool_name == "search" for i in invocations):
                    tool_output = json.dumps(
                        {
                            "error": (
                                "clarify is only available after at least "
                                "one search call. Run search first, then "
                                "use clarify if the results are ambiguous "
                                "or empty."
                            )
                        }
                    )
                else:
                    self._emit(
                        {"type": "clarify_call", "question": str(args.get("question", ""))}
                    )
                    inv = self._execute_clarify(args)
                    invocations.append(inv)
                    self._emit(
                        {"type": "clarify_response", "response": inv.response}
                    )
                    tool_output = json.dumps(
                        {
                            "question": inv.arguments.get("question", ""),
                            "user_response": inv.response,
                        }
                    )
            else:
                tool_output = json.dumps(
                    {
                        "error": (
                            f"unknown tool {name!r}; available tools are "
                            "'search' and 'clarify'"
                        )
                    }
                )

            messages.append(
                Message(
                    role=Role.TOOL,
                    content=tool_output,
                    tool_call_id=tc.get("id", ""),
                    name=name,
                )
            )

        if len(invocations) >= self._max_iterations:
            messages.append(
                Message(
                    role=Role.USER,
                    content=(
                        "You have used your tool-call budget (search + "
                        "clarify combined). Write the final synthesis now "
                        "using only the search results and clarifications "
                        "above. Cite sources as [1], [2], etc."
                    ),
                )
            )

    # Loop fell through without the model producing a text response.
    # Force one final tool-less synthesis call so the caller always gets
    # an answer — bailing out with a sentinel string is never useful to
    # the user, who already paid for the searches.
    messages.append(
        Message(
            role=Role.USER,
            content=(
                "You've used all your search attempts. Synthesize your "
                "findings now from whatever you've found so far. Do not "
                "request more tool calls — write the final answer as "
                "plain text, citing sources as [1], [2], etc. where you can. "
                "If the searches returned nothing usable, say so plainly."
            ),
        )
    )
    iterations += 1
    final = self._engine.generate(
        messages,
        model=self._model,
        temperature=self._temperature,
        max_tokens=self._max_tokens,
        num_ctx=self._num_ctx,
        tools=None,
    )
    for k in total_usage:
        total_usage[k] += int(final.get("usage", {}).get(k, 0))
    answer = (final.get("content", "") or "").strip()
    if not answer:
        answer = (
            "(no synthesis available — the search budget was exhausted "
            "and the model returned no text response)"
        )
    answer, final_sources = _finalize(answer)
    self._emit(
        {"type": "final_answer", "text": answer, "sources": final_sources}
    )
    return ResearchResult(
        answer=answer,
        iterations=iterations,
        tool_calls=invocations,
        usage=total_usage,
    )

Functions

shape_results_for_model

shape_results_for_model(hits: List[SearchHit], *, detailed_top: int = 5, thread_ctx_per_hit: int = 3, total_cap: int = 20, ref_offset: int = 0) -> Dict[str, Any]

Compact a hit list into a JSON payload the planner can chew through.

The first detailed_top rows keep their content snippet and trimmed thread context; the remainder are summarised to title + sender + date so the planner still sees the breadth of what's available without blowing the context window. Each hit gets a numeric ref (1-indexed, plus ref_offset) so the synthesis can cite it as [N]. The offset lets multi-search runs hand the planner globally unique refs across calls so a later renumbering pass can dedupe by first appearance.

Source code in src/openjarvis/agents/research_loop.py
def shape_results_for_model(
    hits: List[SearchHit],
    *,
    detailed_top: int = 5,
    thread_ctx_per_hit: int = 3,
    total_cap: int = 20,
    ref_offset: int = 0,
) -> Dict[str, Any]:
    """Compact a hit list into a JSON payload the planner can chew through.

    The first ``detailed_top`` rows keep their content snippet and trimmed
    thread context; the remainder are summarised to title + sender + date so
    the planner still sees the breadth of what's available without blowing the
    context window. Each hit gets a numeric ``ref`` (1-indexed, plus
    ``ref_offset``) so the synthesis can cite it as ``[N]``. The offset lets
    multi-search runs hand the planner globally unique refs across calls so
    a later renumbering pass can dedupe by first appearance.
    """
    out_hits: List[Dict[str, Any]] = []
    visible = hits[:total_cap]
    for i, h in enumerate(visible):
        sender = h.participants[0] if h.participants else ""
        base = {
            "ref": i + 1 + ref_offset,
            "title": h.title,
            "sender": sender,
            "timestamp": h.timestamp,
            "source": h.source,
            "score": round(h.score, 4),
        }
        if i < detailed_top:
            base["snippet"] = h.content_snippet
            if h.thread_context:
                base["thread"] = _trim_thread_context(h.thread_context, thread_ctx_per_hit)
        out_hits.append(base)
    return {
        "num_results": len(hits),
        "shown": len(visible),
        "truncated": len(hits) > total_cap,
        "hits": out_hits,
    }

renumber_citations

renumber_citations(text: str, ref_to_source: Dict[int, Dict[str, Any]]) -> Tuple[str, List[Dict[str, Any]]]

Renumber [N] citations in text by first-appearance order.

The planner sees globally-offset refs across multiple search calls (search 1 returns 1..20, search 2 returns 21..40, …). When the synthesis arrives, the first ref the model actually cited becomes [1], the second unique one becomes [2], and so on. Repeats map to the same new ref. Refs the synthesis never cites are dropped from the returned sources list — only the ones the user can actually click on get carried through.

PARAMETER DESCRIPTION
text

Synthesis text containing inline [N] references.

TYPE: str

ref_to_source

Mapping from the original (offset) ref to the source dict that build_sources_for_client produced for that hit.

TYPE: Dict[int, Dict[str, Any]]

RETURNS DESCRIPTION
(new_text, ordered_sources)

new_text has every cited [N] rewritten to its new sequence number. ordered_sources is the deduped list of source dicts in the order they appear in the synthesis, each with its ref field set to the new sequence number.

Source code in src/openjarvis/agents/research_loop.py
def renumber_citations(
    text: str,
    ref_to_source: Dict[int, Dict[str, Any]],
) -> Tuple[str, List[Dict[str, Any]]]:
    """Renumber ``[N]`` citations in ``text`` by first-appearance order.

    The planner sees globally-offset refs across multiple search calls
    (search 1 returns 1..20, search 2 returns 21..40, …). When the
    synthesis arrives, the first ref the model actually cited becomes
    ``[1]``, the second unique one becomes ``[2]``, and so on. Repeats
    map to the same new ref. Refs the synthesis never cites are dropped
    from the returned ``sources`` list — only the ones the user can
    actually click on get carried through.

    Parameters
    ----------
    text:
        Synthesis text containing inline ``[N]`` references.
    ref_to_source:
        Mapping from the original (offset) ref to the source dict that
        ``build_sources_for_client`` produced for that hit.

    Returns
    -------
    (new_text, ordered_sources)
        ``new_text`` has every cited ``[N]`` rewritten to its new
        sequence number. ``ordered_sources`` is the deduped list of
        source dicts in the order they appear in the synthesis, each
        with its ``ref`` field set to the new sequence number.
    """
    old_to_new: Dict[int, int] = {}
    ordered: List[Dict[str, Any]] = []
    for m in _CITE_RE.finditer(text):
        try:
            old = int(m.group(1))
        except ValueError:
            continue
        if old in old_to_new:
            continue
        src = ref_to_source.get(old)
        if src is None:
            # Synthesis cited a ref that doesn't exist in the corpus —
            # leave the literal text alone, drop the source entry.
            continue
        new_ref = len(ordered) + 1
        old_to_new[old] = new_ref
        renumbered_src = dict(src)
        renumbered_src["ref"] = new_ref
        ordered.append(renumbered_src)

    def _replace(match: "re.Match[str]") -> str:
        try:
            old = int(match.group(1))
        except ValueError:
            return match.group(0)
        new = old_to_new.get(old)
        return f"[{new}]" if new is not None else match.group(0)

    new_text = _CITE_RE.sub(_replace, text)
    return new_text, ordered

build_sources_for_client

build_sources_for_client(hits: List[SearchHit], *, total_cap: int = 20, ref_offset: int = 0) -> List[Dict[str, Any]]

Produce the citation-friendly sources list streamed to the frontend.

One entry per hit, in the same order the planner sees them — so a [N] citation in the synthesis maps to sources[N - 1] on the client. We don't deduplicate by document_id: separate chunks of the same email each get their own citation slot since the planner may quote different parts.

Source code in src/openjarvis/agents/research_loop.py
def build_sources_for_client(
    hits: List[SearchHit],
    *,
    total_cap: int = 20,
    ref_offset: int = 0,
) -> List[Dict[str, Any]]:
    """Produce the citation-friendly sources list streamed to the frontend.

    One entry per hit, in the same order the planner sees them — so a
    ``[N]`` citation in the synthesis maps to ``sources[N - 1]`` on the
    client. We don't deduplicate by ``document_id``: separate chunks of the
    same email each get their own citation slot since the planner may quote
    different parts.
    """
    out: List[Dict[str, Any]] = []
    for i, h in enumerate(hits[:total_cap]):
        sender = h.participants[0] if h.participants else ""
        # Prefer the URL the connector stored at ingest time (Granola's
        # ``web_url``, Notion's page URL, etc.) — it's the only reliable
        # link for sources whose web URL doesn't derive from the doc_id.
        # Fall back to the doc_id-based reconstruction for sources where
        # that still works (Slack, Gmail).
        url = h.url or _hit_url(h.source, h.document_id)
        out.append(
            {
                "ref": i + 1 + ref_offset,
                "title": h.title,
                "sender": sender,
                "date": _hit_date(h.timestamp),
                "source": h.source,
                "source_id": _bare_doc_id(h.source, h.document_id),
                "url": url,
            }
        )
    return out