Skip to content

gmail

gmail

GmailChannel — native Gmail API adapter using OAuth2.

Classes

GmailChannel

GmailChannel(credentials_path: str = '', *, token_path: str = '', user_id: str = 'me', poll_interval: int = 30, bus: Optional[EventBus] = None)

Bases: BaseChannel

Native Gmail channel adapter using the Gmail API with OAuth2.

PARAMETER DESCRIPTION
credentials_path

Path to the OAuth2 credentials.json file. Falls back to GMAIL_CREDENTIALS_PATH env var.

TYPE: str DEFAULT: ''

token_path

Path to the stored OAuth2 token file. Falls back to GMAIL_TOKEN_PATH env var.

TYPE: str DEFAULT: ''

user_id

Gmail user ID (default "me" for the authenticated user).

TYPE: str DEFAULT: 'me'

poll_interval

Seconds between inbox polls (default 30).

TYPE: int DEFAULT: 30

bus

Optional event bus for publishing channel events.

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/channels/gmail.py
def __init__(
    self,
    credentials_path: str = "",
    *,
    token_path: str = "",
    user_id: str = "me",
    poll_interval: int = 30,
    bus: Optional[EventBus] = None,
) -> None:
    self._credentials_path = credentials_path or os.environ.get(
        "GMAIL_CREDENTIALS_PATH",
        "",
    )
    self._token_path = token_path or os.environ.get(
        "GMAIL_TOKEN_PATH",
        "",
    )
    self._user_id = user_id
    self._poll_interval = poll_interval
    self._bus = bus
    self._handlers: List[ChannelHandler] = []
    self._status = ChannelStatus.DISCONNECTED
    self._service: Any = None
    self._listener_thread: Optional[threading.Thread] = None
    self._stop_event = threading.Event()
Functions
connect
connect() -> None

Load OAuth2 credentials and build the Gmail API service.

Source code in src/openjarvis/channels/gmail.py
def connect(self) -> None:
    """Load OAuth2 credentials and build the Gmail API service."""
    if not self._credentials_path and not self._token_path:
        logger.warning("No Gmail credentials configured")
        self._status = ChannelStatus.ERROR
        return

    self._stop_event.clear()
    self._status = ChannelStatus.CONNECTING

    try:
        from google.oauth2.credentials import Credentials
        from google_auth_oauthlib.flow import InstalledAppFlow
        from googleapiclient.discovery import build

        SCOPES = ["https://www.googleapis.com/auth/gmail.modify"]

        creds: Any = None
        if self._token_path and os.path.exists(self._token_path):
            creds = Credentials.from_authorized_user_file(
                self._token_path,
                SCOPES,
            )

        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                from google.auth.transport.requests import Request

                creds.refresh(Request())
            elif self._credentials_path and os.path.exists(
                self._credentials_path,
            ):
                flow = InstalledAppFlow.from_client_secrets_file(
                    self._credentials_path,
                    SCOPES,
                )
                creds = flow.run_local_server(port=0)
            else:
                logger.warning("Gmail credentials not found or invalid")
                self._status = ChannelStatus.ERROR
                return

            # Save token for future use
            if self._token_path and creds:
                with open(self._token_path, "w") as token_file:
                    token_file.write(creds.to_json())

        self._service = build("gmail", "v1", credentials=creds)
        self._status = ChannelStatus.CONNECTED
        logger.info("Gmail channel connected")

        # Start polling thread
        self._listener_thread = threading.Thread(
            target=self._poll_loop,
            daemon=True,
        )
        self._listener_thread.start()
    except ImportError:
        logger.warning(
            "Google API libraries not installed; "
            "install with: pip install openjarvis[channel-gmail]",
        )
        self._status = ChannelStatus.ERROR
    except Exception:
        logger.debug("Gmail connect failed", exc_info=True)
        self._status = ChannelStatus.ERROR
disconnect
disconnect() -> None

Stop the polling thread and clear the service.

Source code in src/openjarvis/channels/gmail.py
def disconnect(self) -> None:
    """Stop the polling thread and clear the service."""
    self._stop_event.set()
    if self._listener_thread is not None:
        self._listener_thread.join(timeout=5.0)
        self._listener_thread = None
    self._service = None
    self._status = ChannelStatus.DISCONNECTED
send
send(channel: str, content: str, *, conversation_id: str = '', metadata: Dict[str, Any] | None = None) -> bool

Send an email via the Gmail API.

channel is the recipient email address.

Source code in src/openjarvis/channels/gmail.py
def send(
    self,
    channel: str,
    content: str,
    *,
    conversation_id: str = "",
    metadata: Dict[str, Any] | None = None,
) -> bool:
    """Send an email via the Gmail API.

    ``channel`` is the recipient email address.
    """
    if self._service is None:
        logger.warning("Cannot send: Gmail service not connected")
        return False

    try:
        msg = MIMEText(content)
        msg["To"] = channel
        msg["From"] = self._user_id
        msg["Subject"] = (metadata or {}).get(
            "subject",
            "Message from OpenJarvis",
        )

        raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
        body: Dict[str, Any] = {"raw": raw}
        if conversation_id:
            body["threadId"] = conversation_id

        self._service.users().messages().send(
            userId=self._user_id,
            body=body,
        ).execute()

        self._publish_sent(channel, content, conversation_id)
        return True
    except Exception:
        logger.debug("Gmail send failed", exc_info=True)
        return False
status
status() -> ChannelStatus

Return the current connection status.

Source code in src/openjarvis/channels/gmail.py
def status(self) -> ChannelStatus:
    """Return the current connection status."""
    if self._status == ChannelStatus.CONNECTED and self._service is None:
        return ChannelStatus.ERROR
    return self._status
list_channels
list_channels() -> List[str]

Return available channel identifiers.

Source code in src/openjarvis/channels/gmail.py
def list_channels(self) -> List[str]:
    """Return available channel identifiers."""
    return ["inbox"]
on_message
on_message(handler: ChannelHandler) -> None

Register a callback for incoming Gmail messages.

Source code in src/openjarvis/channels/gmail.py
def on_message(self, handler: ChannelHandler) -> None:
    """Register a callback for incoming Gmail messages."""
    self._handlers.append(handler)