Skip to content

gmail

gmail

Gmail connector — bulk email sync via the Gmail REST API.

Uses OAuth 2.0 tokens stored locally (see :mod:openjarvis.connectors.oauth). All network calls are isolated in module-level functions (_gmail_api_*) to make them trivially mockable in tests.

Classes

GmailConnector

GmailConnector(credentials_path: str = '')

Bases: BaseConnector

Connector that syncs emails from Gmail via the REST API.

Authentication is handled through Google OAuth 2.0. Tokens are stored locally in a JSON credentials file.

PARAMETER DESCRIPTION
credentials_path

Path to the JSON file where OAuth tokens are stored. Defaults to ~/.openjarvis/connectors/gmail.json.

TYPE: str DEFAULT: ''

Source code in src/openjarvis/connectors/gmail.py
def __init__(self, credentials_path: str = "") -> None:
    self._credentials_path = credentials_path or _DEFAULT_CREDENTIALS_PATH
    self._items_synced: int = 0
    self._items_total: int = 0
    self._last_sync: Optional[datetime] = None
    self._last_cursor: Optional[str] = None
Functions
is_connected
is_connected() -> bool

Return True if a credentials file with a valid token exists.

Source code in src/openjarvis/connectors/gmail.py
def is_connected(self) -> bool:
    """Return ``True`` if a credentials file with a valid token exists."""
    tokens = load_tokens(self._credentials_path)
    if tokens is None:
        return False
    # Accept any non-empty dict that contains at least one key
    # (simplified: real impl would also check expiry / refresh token)
    return bool(tokens)
disconnect
disconnect() -> None

Delete the stored credentials file.

Source code in src/openjarvis/connectors/gmail.py
def disconnect(self) -> None:
    """Delete the stored credentials file."""
    delete_tokens(self._credentials_path)
auth_url
auth_url() -> str

Return a Google OAuth consent URL requesting gmail.readonly scope.

Source code in src/openjarvis/connectors/gmail.py
def auth_url(self) -> str:
    """Return a Google OAuth consent URL requesting ``gmail.readonly`` scope."""
    return build_google_auth_url(
        client_id="",  # placeholder — real client_id from config
        scopes=[_GMAIL_SCOPE],
    )
handle_callback
handle_callback(code: str) -> None

Handle the OAuth callback by persisting the authorization code.

In a full implementation this would exchange the code for tokens. For now the code is saved directly as the token value.

Source code in src/openjarvis/connectors/gmail.py
def handle_callback(self, code: str) -> None:
    """Handle the OAuth callback by persisting the authorization code.

    In a full implementation this would exchange the code for tokens.
    For now the code is saved directly as the token value.
    """
    save_tokens(self._credentials_path, {"token": code})
sync
sync(*, since: Optional[datetime] = None, cursor: Optional[str] = None) -> Iterator[Document]

Yield :class:Document objects for Gmail messages.

Paginates through the messages.list API and fetches each message's full payload to extract headers and body.

PARAMETER DESCRIPTION
since

When provided, only messages received after this timestamp are returned. Translated to a Gmail after:<epoch> search query.

TYPE: Optional[datetime] DEFAULT: None

cursor

nextPageToken from a previous sync to resume pagination.

TYPE: Optional[str] DEFAULT: None

Source code in src/openjarvis/connectors/gmail.py
def sync(
    self,
    *,
    since: Optional[datetime] = None,
    cursor: Optional[str] = None,
) -> Iterator[Document]:
    """Yield :class:`Document` objects for Gmail messages.

    Paginates through the messages.list API and fetches each message's
    full payload to extract headers and body.

    Parameters
    ----------
    since:
        When provided, only messages received after this timestamp are
        returned.  Translated to a Gmail ``after:<epoch>`` search query.
    cursor:
        ``nextPageToken`` from a previous sync to resume pagination.
    """
    tokens = load_tokens(self._credentials_path)
    if not tokens:
        return

    token: str = tokens.get("token", tokens.get("access_token", ""))
    if not token:
        return

    query = ""
    if since is not None:
        # Gmail's after: operator accepts Unix epoch seconds.
        epoch = int(since.timestamp())
        query = f"after:{epoch}"

    page_token: Optional[str] = cursor
    synced = 0

    while True:
        list_resp = _gmail_api_list_messages(
            token, page_token=page_token, query=query
        )
        messages: List[Dict[str, Any]] = list_resp.get("messages", [])

        for msg_stub in messages:
            msg_id: str = msg_stub.get("id", "")
            if not msg_id:
                continue

            msg = _gmail_api_get_message(token, msg_id)
            payload: Dict[str, Any] = msg.get("payload", {})
            headers: List[Dict[str, str]] = payload.get("headers", [])

            from_header = _extract_header(headers, "From")
            subject = _extract_header(headers, "Subject")
            date_str = _extract_header(headers, "Date")
            to_header = _extract_header(headers, "To")

            body = _decode_body(payload)
            timestamp = _parse_date(date_str)

            participants: List[str] = []
            if from_header:
                participants.append(from_header)
            if to_header:
                participants.append(to_header)

            thread_id: Optional[str] = msg.get("threadId")

            doc = Document(
                doc_id=f"gmail:{msg_id}",
                source="gmail",
                doc_type="email",
                content=body,
                title=subject,
                author=from_header,
                participants=participants,
                timestamp=timestamp,
                thread_id=thread_id,
                metadata={
                    "message_id": msg_id,
                    "labels": msg.get("labelIds", []),
                },
            )
            synced += 1
            yield doc

        next_page: Optional[str] = list_resp.get("nextPageToken")
        if not next_page:
            self._last_cursor = None
            break
        page_token = next_page
        self._last_cursor = next_page

    self._items_synced = synced
    self._last_sync = datetime.now()
sync_status
sync_status() -> SyncStatus

Return sync progress from the most recent :meth:sync call.

Source code in src/openjarvis/connectors/gmail.py
def sync_status(self) -> SyncStatus:
    """Return sync progress from the most recent :meth:`sync` call."""
    return SyncStatus(
        state="idle",
        items_synced=self._items_synced,
        last_sync=self._last_sync,
        cursor=self._last_cursor,
    )
mcp_tools
mcp_tools() -> List[ToolSpec]

Expose three MCP tool specs for real-time Gmail queries.

Source code in src/openjarvis/connectors/gmail.py
def mcp_tools(self) -> List[ToolSpec]:
    """Expose three MCP tool specs for real-time Gmail queries."""
    return [
        ToolSpec(
            name="gmail_search_emails",
            description=(
                "Search Gmail messages using a query string. "
                "Supports the same syntax as the Gmail search box "
                "(e.g. 'from:alice subject:report is:unread')."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Gmail search query",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of emails to return",
                        "default": 20,
                    },
                },
                "required": ["query"],
            },
            category="communication",
        ),
        ToolSpec(
            name="gmail_get_thread",
            description=("Retrieve all messages in a Gmail thread by thread ID."),
            parameters={
                "type": "object",
                "properties": {
                    "thread_id": {
                        "type": "string",
                        "description": "Gmail thread ID",
                    },
                },
                "required": ["thread_id"],
            },
            category="communication",
        ),
        ToolSpec(
            name="gmail_list_unread",
            description=(
                "List unread Gmail messages, optionally filtered by label."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "label": {
                        "type": "string",
                        "description": "Gmail label to filter by (e.g. 'INBOX')",
                        "default": "INBOX",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of messages to return",
                        "default": 20,
                    },
                },
                "required": [],
            },
            category="communication",
        ),
    ]

Functions