Skip to content

Index

connectors

Data source connectors for Deep Research.

Classes

Attachment dataclass

Attachment(filename: str, mime_type: str, size_bytes: int, sha256: str = '', content: bytes = b'')

A file attached to a document (email attachment, shared file, etc.).

BaseConnector

Bases: ABC

Abstract base for data source connectors.

Each connector knows how to authenticate with a service, bulk-sync its data as Document objects, and optionally expose MCP tools for real-time agent queries.

Functions
is_connected abstractmethod
is_connected() -> bool

Return True if the connector has valid credentials.

Source code in src/openjarvis/connectors/_stubs.py
@abstractmethod
def is_connected(self) -> bool:
    """Return True if the connector has valid credentials."""
disconnect abstractmethod
disconnect() -> None

Revoke credentials and clean up.

Source code in src/openjarvis/connectors/_stubs.py
@abstractmethod
def disconnect(self) -> None:
    """Revoke credentials and clean up."""
sync abstractmethod
sync(*, since: Optional[datetime] = None, cursor: Optional[str] = None) -> Iterator[Document]

Yield documents from the data source.

If since is given, only return items created/modified after that time. If cursor is given, resume from a previous checkpoint.

Source code in src/openjarvis/connectors/_stubs.py
@abstractmethod
def sync(
    self, *, since: Optional[datetime] = None, cursor: Optional[str] = None
) -> Iterator[Document]:
    """Yield documents from the data source.

    If *since* is given, only return items created/modified after that time.
    If *cursor* is given, resume from a previous checkpoint.
    """
sync_status abstractmethod
sync_status() -> SyncStatus

Return current sync progress.

Source code in src/openjarvis/connectors/_stubs.py
@abstractmethod
def sync_status(self) -> SyncStatus:
    """Return current sync progress."""
auth_url
auth_url() -> str

Generate an OAuth consent URL. Only relevant for auth_type='oauth'.

Source code in src/openjarvis/connectors/_stubs.py
def auth_url(self) -> str:
    """Generate an OAuth consent URL.  Only relevant for auth_type='oauth'."""
    raise NotImplementedError(f"{self.connector_id} does not use OAuth")
handle_callback
handle_callback(code: str) -> None

Handle the OAuth callback. Only relevant for auth_type='oauth'.

Source code in src/openjarvis/connectors/_stubs.py
def handle_callback(self, code: str) -> None:
    """Handle the OAuth callback.  Only relevant for auth_type='oauth'."""
    raise NotImplementedError(f"{self.connector_id} does not use OAuth")
mcp_tools
mcp_tools() -> List[ToolSpec]

Return MCP tool specs for real-time agent queries. Optional.

Source code in src/openjarvis/connectors/_stubs.py
def mcp_tools(self) -> List[ToolSpec]:
    """Return MCP tool specs for real-time agent queries.  Optional."""
    return []

Document dataclass

Document(doc_id: str, source: str, doc_type: str, content: str, title: str = '', author: str = '', participants: List[str] = list(), timestamp: datetime = now(), thread_id: Optional[str] = None, url: Optional[str] = None, attachments: List[Attachment] = list(), metadata: Dict[str, Any] = dict())

Universal schema for data from any connector.

All connectors normalize their output to this format before ingestion.

SyncStatus dataclass

SyncStatus(state: str = 'idle', items_synced: int = 0, items_total: int = 0, last_sync: Optional[datetime] = None, cursor: Optional[str] = None, error: Optional[str] = None)

Progress of a connector's sync operation.

KnowledgeStore

KnowledgeStore(db_path: Union[str, Path] = '')

Bases: MemoryBackend

Source-aware SQLite/FTS5 knowledge store for Deep Research.

Stores document chunks with rich provenance metadata and supports filtered BM25 retrieval by source, doc_type, author, and timestamp.

Source code in src/openjarvis/connectors/store.py
def __init__(self, db_path: Union[str, Path] = "") -> None:
    if not db_path:
        from openjarvis.core.config import DEFAULT_CONFIG_DIR

        db_path = DEFAULT_CONFIG_DIR / "knowledge.db"

    self._db_path = str(db_path)
    # Ensure the parent directory exists (skip for :memory:)
    if self._db_path != ":memory:":
        from openjarvis.security.file_utils import secure_create

        secure_create(Path(self._db_path))

    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.row_factory = sqlite3.Row
    self._setup()
Functions
store
store(content: str, *, source: str = '', doc_type: str = '', doc_id: Optional[str] = None, title: str = '', author: str = '', participants: Optional[List[str]] = None, timestamp: Optional[Union[datetime, str]] = None, thread_id: Optional[str] = None, url: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, chunk_index: int = 0) -> str

Persist a content chunk and return its unique chunk id.

All source-level fields are merged into the stored metadata so that retrieve() results carry full provenance.

Source code in src/openjarvis/connectors/store.py
def store(  # type: ignore[override]
    self,
    content: str,
    *,
    source: str = "",
    doc_type: str = "",
    doc_id: Optional[str] = None,
    title: str = "",
    author: str = "",
    participants: Optional[List[str]] = None,
    timestamp: Optional[Union[datetime, str]] = None,
    thread_id: Optional[str] = None,
    url: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None,
    chunk_index: int = 0,
) -> str:
    """Persist a content chunk and return its unique chunk id.

    All source-level fields are merged into the stored metadata so that
    ``retrieve()`` results carry full provenance.
    """
    chunk_id = str(uuid.uuid4())
    if doc_id is None:
        doc_id = str(uuid.uuid4())

    ts_str = _to_iso(timestamp)
    participants_json = json.dumps(participants or [])

    # Merge provenance fields into metadata for easy access in results
    combined_meta: Dict[str, Any] = dict(metadata or {})
    combined_meta["chunk_id"] = chunk_id
    combined_meta["source"] = source
    combined_meta["doc_type"] = doc_type
    combined_meta["doc_id"] = doc_id
    combined_meta["title"] = title
    combined_meta["author"] = author
    combined_meta["participants"] = participants or []
    combined_meta["timestamp"] = ts_str
    combined_meta["thread_id"] = thread_id or ""
    combined_meta["url"] = url or ""
    combined_meta["chunk_index"] = chunk_index

    meta_json = json.dumps(combined_meta)

    self._conn.execute(
        """
        INSERT INTO knowledge_chunks
            (id, doc_id, content, source, doc_type, title, author,
             participants, timestamp, thread_id, url, metadata,
             chunk_index, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
        (
            chunk_id,
            doc_id,
            content,
            source,
            doc_type,
            title,
            author,
            participants_json,
            ts_str,
            thread_id or "",
            url or "",
            meta_json,
            chunk_index,
            time.time(),
        ),
    )
    self._conn.commit()

    get_event_bus().publish(
        EventType.MEMORY_STORE,
        {
            "backend": self.backend_id,
            "chunk_id": chunk_id,
            "doc_id": doc_id,
            "source": source,
            "doc_type": doc_type,
        },
    )
    return chunk_id
retrieve
retrieve(query: str, *, top_k: int = 5, source: Optional[str] = None, doc_type: Optional[str] = None, author: Optional[str] = None, since: Optional[Union[datetime, str]] = None, until: Optional[Union[datetime, str]] = None, **kwargs: Any) -> List[RetrievalResult]

Search using FTS5 BM25 with optional column filters.

PARAMETER DESCRIPTION
query

TYPE: str

top_k

TYPE: int DEFAULT: 5

source

TYPE: Optional[str] DEFAULT: None

doc_type

TYPE: Optional[str] DEFAULT: None

author

TYPE: Optional[str] DEFAULT: None

since

TYPE: Optional[Union[datetime, str]] DEFAULT: None

until

TYPE: Optional[Union[datetime, str]] DEFAULT: None

Source code in src/openjarvis/connectors/store.py
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    source: Optional[str] = None,
    doc_type: Optional[str] = None,
    author: Optional[str] = None,
    since: Optional[Union[datetime, str]] = None,
    until: Optional[Union[datetime, str]] = None,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Search using FTS5 BM25 with optional column filters.

    Parameters
    ----------
    query:    Full-text search query.
    top_k:    Maximum number of results.
    source:   Restrict to chunks from this source (e.g. "gmail").
    doc_type: Restrict to chunks of this type (e.g. "email").
    author:   Restrict to chunks authored by this person.
    since:    Exclude chunks whose timestamp is earlier than this value.
    until:    Exclude chunks whose timestamp is later than this value.
    """
    if not query.strip():
        return []

    since_str = _to_iso(since) if since is not None else None
    until_str = _to_iso(until) if until is not None else None

    # Build the WHERE clause for filter columns
    filters: List[str] = []
    params: List[Any] = []

    if source is not None:
        filters.append("kc.source = ?")
        params.append(source)
    if doc_type is not None:
        filters.append("kc.doc_type = ?")
        params.append(doc_type)
    if author is not None:
        filters.append("kc.author = ?")
        params.append(author)
    if since_str:
        filters.append("kc.timestamp >= ?")
        params.append(since_str)
    if until_str:
        filters.append("kc.timestamp <= ?")
        params.append(until_str)

    where_clause = ""
    if filters:
        where_clause = "AND " + " AND ".join(filters)

    # FTS5 bm25() returns negative scores; abs() gives a positive rank
    sql = f"""
        SELECT
            kc.id,
            kc.content,
            kc.source,
            kc.metadata,
            abs(bm25(knowledge_fts)) AS score
        FROM knowledge_fts
        JOIN knowledge_chunks kc ON knowledge_fts.rowid = kc.rowid
        WHERE knowledge_fts MATCH ?
        {where_clause}
        ORDER BY score DESC
        LIMIT ?
    """

    try:
        rows = self._conn.execute(sql, [query] + params + [top_k]).fetchall()
    except sqlite3.OperationalError:
        # Malformed FTS query — return empty rather than crash
        return []

    results: List[RetrievalResult] = []
    for row in rows:
        meta = json.loads(row["metadata"]) if row["metadata"] else {}
        # Ensure chunk_id is always present in metadata (backfill for
        # rows stored before this field was added to combined_meta).
        if "chunk_id" not in meta:
            meta["chunk_id"] = row["id"]
        results.append(
            RetrievalResult(
                content=row["content"],
                score=float(row["score"]),
                source=row["source"],
                metadata=meta,
            )
        )

    get_event_bus().publish(
        EventType.MEMORY_RETRIEVE,
        {
            "backend": self.backend_id,
            "query": query,
            "num_results": len(results),
            "filters": {
                "source": source,
                "doc_type": doc_type,
                "author": author,
                "since": since_str,
                "until": until_str,
            },
        },
    )
    return results
delete
delete(doc_id: str) -> bool

Delete all chunks with the given doc_id. Returns True if any existed.

Source code in src/openjarvis/connectors/store.py
def delete(self, doc_id: str) -> bool:
    """Delete all chunks with the given *doc_id*. Returns True if any existed."""
    cur = self._conn.execute(
        "DELETE FROM knowledge_chunks WHERE doc_id = ?", (doc_id,)
    )
    self._conn.commit()
    return cur.rowcount > 0
clear
clear() -> None

Remove all stored chunks.

Source code in src/openjarvis/connectors/store.py
def clear(self) -> None:
    """Remove all stored chunks."""
    self._conn.executescript(
        "DELETE FROM knowledge_chunks; DELETE FROM knowledge_fts;"
    )
    self._conn.commit()
count
count() -> int

Return the total number of stored chunks.

Source code in src/openjarvis/connectors/store.py
def count(self) -> int:
    """Return the total number of stored chunks."""
    row = self._conn.execute("SELECT COUNT(*) FROM knowledge_chunks").fetchone()
    return row[0] if row else 0
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/connectors/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    try:
        self._conn.close()
    except Exception:
        pass