Skip to content

gmail

gmail

Gmail connector — bulk email sync via the Gmail REST API.

Uses OAuth 2.0 tokens stored locally (see :mod:openjarvis.connectors.oauth). All network calls are isolated in module-level functions (_gmail_api_*) to make them trivially mockable in tests.

Classes

GmailConnector

GmailConnector(credentials_path: str = '')

Bases: BaseConnector

Connector that syncs emails from Gmail via the REST API.

Authentication is handled through Google OAuth 2.0. Tokens are stored locally in a JSON credentials file.

PARAMETER DESCRIPTION
credentials_path

Path to the JSON file where OAuth tokens are stored. Defaults to ~/.openjarvis/connectors/gmail.json.

TYPE: str DEFAULT: ''

Source code in src/openjarvis/connectors/gmail.py
def __init__(self, credentials_path: str = "") -> None:
    self._credentials_path = resolve_google_credentials(
        credentials_path or _DEFAULT_CREDENTIALS_PATH
    )
    self._items_synced: int = 0
    self._items_total: int = 0
    self._last_sync: Optional[datetime] = None
    self._last_cursor: Optional[str] = None
Functions
is_connected
is_connected() -> bool

Return True if a credentials file with a valid access token exists.

The previous "any non-empty dict counts" check returned True for files containing only client_id/client_secret (no actual OAuth token), which made jarvis connect gmail short-circuit with "already connected" before any OAuth flow ran.

Source code in src/openjarvis/connectors/gmail.py
def is_connected(self) -> bool:
    """Return ``True`` if a credentials file with a valid access token exists.

    The previous "any non-empty dict counts" check returned True for
    files containing only client_id/client_secret (no actual OAuth
    token), which made `jarvis connect gmail` short-circuit with
    "already connected" before any OAuth flow ran.
    """
    tokens = load_tokens(self._credentials_path)
    if tokens is None:
        return False
    return bool(tokens.get("access_token") or tokens.get("token"))
disconnect
disconnect() -> None

Delete the stored credentials file.

Source code in src/openjarvis/connectors/gmail.py
def disconnect(self) -> None:
    """Delete the stored credentials file."""
    delete_tokens(self._credentials_path)
auth_url
auth_url() -> str

Return a Google OAuth consent URL for the shared Google scopes.

Source code in src/openjarvis/connectors/gmail.py
def auth_url(self) -> str:
    """Return a Google OAuth consent URL for the shared Google scopes."""
    return build_google_auth_url(
        client_id="",  # placeholder — real client_id from config
        scopes=GOOGLE_ALL_SCOPES,
    )
handle_callback
handle_callback(code: str) -> None

Handle the OAuth callback by persisting the authorization code.

In a full implementation this would exchange the code for tokens. For now the code is saved directly as the token value.

Source code in src/openjarvis/connectors/gmail.py
def handle_callback(self, code: str) -> None:
    """Handle the OAuth callback by persisting the authorization code.

    In a full implementation this would exchange the code for tokens.
    For now the code is saved directly as the token value.
    """
    save_tokens(self._credentials_path, {"token": code})
sync
sync(*, since: Optional[datetime] = None, cursor: Optional[str] = None, query_extra: str = '') -> Iterator[Document]

Yield :class:Document objects for Gmail messages.

Paginates through the messages.list API and fetches each message's full payload to extract headers and body.

PARAMETER DESCRIPTION
since

When provided, only messages received after this timestamp are returned. Translated to a Gmail after:<epoch> search query.

TYPE: Optional[datetime] DEFAULT: None

cursor

nextPageToken from a previous sync to resume pagination.

TYPE: Optional[str] DEFAULT: None

query_extra

Additional Gmail search operators appended to the base query, e.g. "is:unread" to restrict to unread messages only.

TYPE: str DEFAULT: ''

Source code in src/openjarvis/connectors/gmail.py
def sync(
    self,
    *,
    since: Optional[datetime] = None,
    cursor: Optional[str] = None,
    query_extra: str = "",
) -> Iterator[Document]:
    """Yield :class:`Document` objects for Gmail messages.

    Paginates through the messages.list API and fetches each message's
    full payload to extract headers and body.

    Parameters
    ----------
    since:
        When provided, only messages received after this timestamp are
        returned.  Translated to a Gmail ``after:<epoch>`` search query.
    cursor:
        ``nextPageToken`` from a previous sync to resume pagination.
    query_extra:
        Additional Gmail search operators appended to the base query,
        e.g. ``"is:unread"`` to restrict to unread messages only.
    """
    # Existence check only — the actual access token is reloaded on every
    # API call by _call_with_refresh so a mid-sync refresh is picked up
    # transparently.
    tokens = load_tokens(self._credentials_path)
    if not tokens or not (tokens.get("token") or tokens.get("access_token")):
        return

    # Default to no filter so SENT, labeled, and category-tabbed mail
    # all flow in. The previous "category:primary" default excluded
    # ~95% of a typical mailbox (sent mail, Promotions, Updates, etc.)
    # which made any C2-style "what did I say to X" query impossible.
    query_parts: List[str] = []
    if since is not None:
        # Gmail's after: operator accepts Unix epoch seconds.
        query_parts.append(f"after:{int(since.timestamp())}")
    if query_extra:
        query_parts.append(query_extra)
    query = " ".join(query_parts)

    page_token: Optional[str] = cursor
    synced = 0

    while True:
        list_resp = _call_with_refresh(
            _gmail_api_list_messages,
            self._credentials_path,
            page_token=page_token,
            query=query,
        )
        messages: List[Dict[str, Any]] = list_resp.get("messages", [])

        for msg_stub in messages:
            msg_id: str = msg_stub.get("id", "")
            if not msg_id:
                continue

            msg = _call_with_refresh(
                _gmail_api_get_message,
                self._credentials_path,
                msg_id,
            )
            payload: Dict[str, Any] = msg.get("payload", {})
            headers: List[Dict[str, str]] = payload.get("headers", [])

            from_header = _extract_header(headers, "From")
            to_header = _extract_header(headers, "To")
            cc_header = _extract_header(headers, "Cc")
            subject = _extract_header(headers, "Subject")
            date_str = _extract_header(headers, "Date")
            rfc_message_id = _extract_header(headers, "Message-ID")

            body = _decode_body(payload)
            timestamp = _parse_date(date_str)

            # Raw header values, exactly as Gmail returned them — preserved
            # so re-normalisation against an updated alias map doesn't need
            # a re-fetch from the API.
            participants_raw: List[str] = [
                h for h in (from_header, to_header, cc_header) if h
            ]
            # Lowercase email addresses, multi-recipient-aware.
            participants: List[str] = []
            for header in (from_header, to_header, cc_header):
                participants.extend(_normalize_addresses(header))

            label_ids: List[str] = msg.get("labelIds", [])
            channel = _select_channel(label_ids)
            thread_id: Optional[str] = msg.get("threadId")

            doc = Document(
                doc_id=f"gmail:{msg_id}",
                source="gmail",
                source_id=msg_id,
                doc_type="email",
                content=body,
                title=subject,
                author=from_header,
                participants=participants,
                participants_raw=participants_raw,
                timestamp=timestamp,
                thread_id=thread_id,
                channel=channel,
                # Deep-link straight to the message. ``msg_id`` is Gmail's
                # internal hex id, which the ``#all/<id>`` permalink
                # resolves directly — so citations have a working URL
                # without relying on _hit_url reconstruction at query time.
                url=f"https://mail.google.com/mail/u/0/#all/{msg_id}",
                metadata={
                    "message_id": msg_id,
                    "rfc_message_id": rfc_message_id,
                    "labels": label_ids,
                    "snippet": msg.get("snippet", ""),
                    "history_id": msg.get("historyId", ""),
                    "size_estimate": msg.get("sizeEstimate", 0),
                },
            )
            synced += 1
            yield doc

        next_page: Optional[str] = list_resp.get("nextPageToken")
        if not next_page:
            self._last_cursor = None
            break
        page_token = next_page
        self._last_cursor = next_page

    self._items_synced = synced
    self._last_sync = datetime.now()
delete_message
delete_message(msg_id: str) -> None

Move a message to Trash (recoverable for 30 days).

Source code in src/openjarvis/connectors/gmail.py
def delete_message(self, msg_id: str) -> None:
    """Move a message to Trash (recoverable for 30 days)."""
    self._call_with_refresh(_gmail_api_trash_message, msg_id)
archive_message
archive_message(msg_id: str) -> None

Archive a message by removing the INBOX label.

Source code in src/openjarvis/connectors/gmail.py
def archive_message(self, msg_id: str) -> None:
    """Archive a message by removing the INBOX label."""
    self._call_with_refresh(
        _gmail_api_modify_message, msg_id, remove_labels=["INBOX"]
    )
sync_status
sync_status() -> SyncStatus

Return sync progress from the most recent :meth:sync call.

Source code in src/openjarvis/connectors/gmail.py
def sync_status(self) -> SyncStatus:
    """Return sync progress from the most recent :meth:`sync` call."""
    return SyncStatus(
        state="idle",
        items_synced=self._items_synced,
        last_sync=self._last_sync,
        cursor=self._last_cursor,
    )
mcp_tools
mcp_tools() -> List[ToolSpec]

Expose three MCP tool specs for real-time Gmail queries.

Source code in src/openjarvis/connectors/gmail.py
def mcp_tools(self) -> List[ToolSpec]:
    """Expose three MCP tool specs for real-time Gmail queries."""
    return [
        ToolSpec(
            name="gmail_search_emails",
            description=(
                "Search Gmail messages using a query string. "
                "Supports the same syntax as the Gmail search box "
                "(e.g. 'from:alice subject:report is:unread')."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Gmail search query",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of emails to return",
                        "default": 20,
                    },
                },
                "required": ["query"],
            },
            category="communication",
        ),
        ToolSpec(
            name="gmail_get_thread",
            description=("Retrieve all messages in a Gmail thread by thread ID."),
            parameters={
                "type": "object",
                "properties": {
                    "thread_id": {
                        "type": "string",
                        "description": "Gmail thread ID",
                    },
                },
                "required": ["thread_id"],
            },
            category="communication",
        ),
        ToolSpec(
            name="gmail_list_unread",
            description=(
                "List unread Gmail messages, optionally filtered by label."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "label": {
                        "type": "string",
                        "description": "Gmail label to filter by (e.g. 'INBOX')",
                        "default": "INBOX",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of messages to return",
                        "default": 20,
                    },
                },
                "required": [],
            },
            category="communication",
        ),
    ]

Functions