Skip to content

hybrid_search

Hybrid retrieval over the KnowledgeStore: metadata filter + BM25 + vector cosine.

A single search entrypoint that the agentic research loop calls as a tool. Structured WHERE-clause filters (person, time range, sources) narrow the candidate set, then BM25 (FTS5) and dense cosine similarity score the survivors. The two ranks are fused with Reciprocal Rank Fusion, which is robust to the very different score scales the two signals produce (BM25 ~ [0, 20], cosine ~ [0.4, 0.9] for nomic-embed-text).

Each result is enriched with its thread context: when a hit belongs to a thread_id, the surrounding chunks are attached so the synthesis model sees the conversation, not an isolated fragment.

Brute-force vector scan is fine at the current corpus size (~5k chunks × 768 dims fits in ~15 MB and matmuls in <50 ms). Swap in an ANN index when that stops being true.

Classes

SearchHit dataclass

SearchHit(chunk_id: str, document_id: str, chunk_idx: int, title: str, content_snippet: str, source: str, timestamp: str, participants: List[str], score: float, bm25_score: float, vector_score: float, thread_id: str = '', thread_context: List[Dict[str, Any]] = list(), url: str = '')

A single hybrid-search result with enough context for citation.

HybridSearch

HybridSearch(store: KnowledgeStore, embedder: Optional[OllamaEmbedder] = None, *, bm25_weight: float = 0.5, vector_weight: float = 0.5, rrf_k: int = 60, recall_k: int = 200, thread_context_cap: int = 20)

Hybrid BM25 + dense-cosine retrieval over a KnowledgeStore.

PARAMETER DESCRIPTION
store

The store to query.

TYPE: KnowledgeStore

embedder

Embedding client used to encode the query. When None, search falls back to BM25 only and reports vector_score=0.

TYPE: Optional[OllamaEmbedder] DEFAULT: None

bm25_weight

Weights on the two RRF terms. Defaults to 0.5 / 0.5; raise either to bias retrieval toward lexical or semantic matches.

TYPE: float DEFAULT: 0.5

vector_weight

Weights on the two RRF terms. Defaults to 0.5 / 0.5; raise either to bias retrieval toward lexical or semantic matches.

TYPE: float DEFAULT: 0.5

rrf_k

RRF damping constant. Larger values flatten the contribution of deeper ranks; 60 is the canonical value from the original paper.

TYPE: int DEFAULT: 60

recall_k

How deep each individual ranker recalls before fusion. Should be at least a few times limit so the fuser has overlap to work with.

TYPE: int DEFAULT: 200

Source code in src/openjarvis/connectors/hybrid_search.py
def __init__(
    self,
    store: KnowledgeStore,
    embedder: Optional[OllamaEmbedder] = None,
    *,
    bm25_weight: float = 0.5,
    vector_weight: float = 0.5,
    rrf_k: int = 60,
    recall_k: int = 200,
    thread_context_cap: int = 20,
) -> None:
    self._store = store
    self._embedder = embedder
    self._bm25_weight = float(bm25_weight)
    self._vector_weight = float(vector_weight)
    self._rrf_k = int(rrf_k)
    self._recall_k = int(recall_k)
    self._thread_context_cap = int(thread_context_cap)
Functions
search
search(query: str, *, person: Optional[str] = None, time_range: Optional[Tuple[Optional[datetime], Optional[datetime]]] = None, sources: Optional[Sequence[str]] = None, limit: int = 20) -> List[SearchHit]

Run the hybrid pipeline and return up to limit hits.

See module docstring for ranking semantics. query may be empty when callers want a pure metadata filter (e.g. "all mail from X in May") — in that case only the vector leg runs (and only if an embedder is configured); if neither leg yields anything the structured filter is applied directly and the most recent rows are returned.

Source code in src/openjarvis/connectors/hybrid_search.py
def search(
    self,
    query: str,
    *,
    person: Optional[str] = None,
    time_range: Optional[Tuple[Optional[datetime], Optional[datetime]]] = None,
    sources: Optional[Sequence[str]] = None,
    limit: int = 20,
) -> List[SearchHit]:
    """Run the hybrid pipeline and return up to ``limit`` hits.

    See module docstring for ranking semantics. ``query`` may be empty
    when callers want a pure metadata filter (e.g. "all mail from X in
    May") — in that case only the vector leg runs (and only if an
    embedder is configured); if neither leg yields anything the
    structured filter is applied directly and the most recent rows are
    returned.
    """
    bm25_filter_sql, bm25_filter_params = self._build_filters(
        person=person, time_range=time_range, sources=sources, alias="kc"
    )
    unaliased_filter_sql, unaliased_filter_params = self._build_filters(
        person=person, time_range=time_range, sources=sources
    )

    bm25 = (
        self._bm25_recall(query, bm25_filter_sql, bm25_filter_params)
        if query.strip()
        else []
    )
    vector = (
        self._vector_recall(query, unaliased_filter_sql, unaliased_filter_params)
        if query.strip()
        else []
    )
    fused = self._fuse(bm25, vector)

    # Metadata-only fallback: empty query, or both legs produced nothing
    # despite a non-empty query. Return the most recent rows matching the
    # filter so the agent still gets a useful corpus snapshot.
    if not fused:
        sql = f"""
            SELECT id FROM knowledge_chunks
            WHERE {unaliased_filter_sql}
            ORDER BY timestamp DESC, created_at DESC
            LIMIT ?
        """
        rows = self._store._conn.execute(
            sql, [*unaliased_filter_params, limit]
        ).fetchall()
        fused = [(row["id"], 0.0, 0.0, 0.0) for row in rows]

    # Materialise the top-N rows in one IN-clause round trip.
    top = fused[:limit]
    if not top:
        return []
    ids = [cid for cid, *_ in top]
    placeholders = ",".join("?" for _ in ids)
    meta_rows = self._store._conn.execute(
        f"""
        SELECT id, doc_id, content, source, title, author, participants,
               timestamp, thread_id, chunk_index, url
        FROM knowledge_chunks
        WHERE id IN ({placeholders})
        """,
        ids,
    ).fetchall()
    by_id = {r["id"]: r for r in meta_rows}

    hits: List[SearchHit] = []
    for chunk_id, fused_score, bm25_score, vec_score in top:
        r = by_id.get(chunk_id)
        if r is None:
            continue
        hits.append(
            SearchHit(
                chunk_id=chunk_id,
                document_id=r["doc_id"],
                chunk_idx=int(r["chunk_index"]),
                title=r["title"] or "",
                content_snippet=_snippet(r["content"]),
                source=r["source"] or "",
                timestamp=r["timestamp"] or "",
                participants=_parse_participants(r["participants"]),
                score=fused_score,
                bm25_score=bm25_score,
                vector_score=vec_score,
                thread_id=r["thread_id"] or "",
                thread_context=self._thread_context(r["thread_id"] or "", chunk_id),
                url=r["url"] or "",
            )
        )
    return hits

Functions