Skip to content

oauth

oauth

Shared OAuth 2.0 helpers for all connectors.

Provides: - OAuthProvider registry with configs for Google, Strava, Spotify - Generic run_connector_oauth() that opens browser + catches callback - URL builder, token persistence, and token cleanup utilities

Classes

OAuthProvider dataclass

OAuthProvider(name: str, display_name: str, auth_endpoint: str, token_endpoint: str, scopes: List[str], setup_url: str, setup_hint: str, callback_port: int = 8789, callback_host: str = '127.0.0.1', callback_path: str = '/callback', token_auth: str = 'body', extra_auth_params: Dict[str, str] = dict(), connector_ids: Tuple[str, ...] = (), credential_files: Tuple[str, ...] = ())

Configuration for an OAuth 2.0 provider.

Functions

get_provider_for_connector

get_provider_for_connector(connector_id: str) -> Optional[OAuthProvider]

Return the OAuthProvider that covers connector_id, or None.

Source code in src/openjarvis/connectors/oauth.py
def get_provider_for_connector(connector_id: str) -> Optional[OAuthProvider]:
    """Return the OAuthProvider that covers *connector_id*, or ``None``."""
    for provider in OAUTH_PROVIDERS.values():
        if connector_id in provider.connector_ids:
            return provider
    return None

get_client_credentials

get_client_credentials(provider: OAuthProvider) -> Optional[Tuple[str, str]]

Load stored client_id and client_secret for provider.

Checks credential files in ~/.openjarvis/connectors/ and falls back to environment variables OPENJARVIS_{NAME}_CLIENT_ID and OPENJARVIS_{NAME}_CLIENT_SECRET.

Source code in src/openjarvis/connectors/oauth.py
def get_client_credentials(
    provider: OAuthProvider,
) -> Optional[Tuple[str, str]]:
    """Load stored client_id and client_secret for *provider*.

    Checks credential files in ``~/.openjarvis/connectors/`` and falls
    back to environment variables ``OPENJARVIS_{NAME}_CLIENT_ID`` and
    ``OPENJARVIS_{NAME}_CLIENT_SECRET``.
    """
    # Check credential files
    for filename in provider.credential_files:
        path = _CONNECTORS_DIR / filename
        tokens = load_tokens(str(path))
        if tokens and tokens.get("client_id") and tokens.get("client_secret"):
            return tokens["client_id"], tokens["client_secret"]

    # Check environment variables
    prefix = f"OPENJARVIS_{provider.name.upper()}"
    env_id = os.environ.get(f"{prefix}_CLIENT_ID", "")
    env_secret = os.environ.get(f"{prefix}_CLIENT_SECRET", "")
    if env_id and env_secret:
        return env_id, env_secret

    return None

save_client_credentials

save_client_credentials(provider: OAuthProvider, client_id: str, client_secret: str) -> None

Persist client credentials so the user never has to enter them again.

Source code in src/openjarvis/connectors/oauth.py
def save_client_credentials(
    provider: OAuthProvider,
    client_id: str,
    client_secret: str,
) -> None:
    """Persist client credentials so the user never has to enter them again."""
    for filename in provider.credential_files:
        path = _CONNECTORS_DIR / filename
        existing = load_tokens(str(path)) or {}
        existing["client_id"] = client_id
        existing["client_secret"] = client_secret
        save_tokens(str(path), existing)

build_google_auth_url

build_google_auth_url(client_id: str, redirect_uri: str = _DEFAULT_REDIRECT_URI, scopes: Optional[List[str]] = None) -> str

Build a Google OAuth2 consent URL.

PARAMETER DESCRIPTION
client_id

The OAuth 2.0 client ID from the Google Cloud Console.

TYPE: str

redirect_uri

Where Google should redirect after consent. Defaults to the local callback server at http://localhost:8789/callback.

TYPE: str DEFAULT: _DEFAULT_REDIRECT_URI

scopes

List of OAuth scopes to request. Defaults to ["openid", "email", "profile"].

TYPE: Optional[List[str]] DEFAULT: None

RETURNS DESCRIPTION
str

Full consent URL including query string.

Source code in src/openjarvis/connectors/oauth.py
def build_google_auth_url(
    client_id: str,
    redirect_uri: str = _DEFAULT_REDIRECT_URI,
    scopes: Optional[List[str]] = None,
) -> str:
    """Build a Google OAuth2 consent URL.

    Parameters
    ----------
    client_id:
        The OAuth 2.0 client ID from the Google Cloud Console.
    redirect_uri:
        Where Google should redirect after consent. Defaults to the local
        callback server at ``http://localhost:8789/callback``.
    scopes:
        List of OAuth scopes to request.  Defaults to
        ``["openid", "email", "profile"]``.

    Returns
    -------
    str
        Full consent URL including query string.
    """
    if scopes is None:
        scopes = _DEFAULT_SCOPES

    params = {
        "client_id": client_id,
        "redirect_uri": redirect_uri,
        "response_type": "code",
        "scope": " ".join(scopes),
        "access_type": "offline",
        "prompt": "consent",
    }
    return f"{_GOOGLE_AUTH_ENDPOINT}?{urlencode(params)}"

resolve_google_credentials

resolve_google_credentials(connector_path: str) -> str

Return the best available Google credentials file path.

Checks the connector-specific file first, then falls back to the shared google.json. Returns connector_path if neither exists (so is_connected() correctly returns False).

Source code in src/openjarvis/connectors/oauth.py
def resolve_google_credentials(connector_path: str) -> str:
    """Return the best available Google credentials file path.

    Checks the connector-specific file first, then falls back to the
    shared ``google.json``.  Returns *connector_path* if neither exists
    (so ``is_connected()`` correctly returns ``False``).
    """
    if Path(connector_path).exists():
        return connector_path
    if Path(_SHARED_GOOGLE_CREDENTIALS_PATH).exists():
        return _SHARED_GOOGLE_CREDENTIALS_PATH
    return connector_path

load_tokens

load_tokens(path: str) -> Optional[Dict[str, Any]]

Load OAuth tokens from a JSON file.

Returns None if the file is missing, unreadable, or contains invalid JSON.

Source code in src/openjarvis/connectors/oauth.py
def load_tokens(path: str) -> Optional[Dict[str, Any]]:
    """Load OAuth tokens from a JSON file.

    Returns ``None`` if the file is missing, unreadable, or contains
    invalid JSON.
    """
    p = Path(path)
    if not p.exists():
        return None
    try:
        raw = p.read_text(encoding="utf-8")
        return json.loads(raw)
    except (OSError, json.JSONDecodeError):
        return None

save_tokens

save_tokens(path: str, tokens: Dict[str, Any]) -> None

Persist tokens to path as JSON with owner-only (0o600) permissions.

Creates parent directories as needed.

Source code in src/openjarvis/connectors/oauth.py
def save_tokens(path: str, tokens: Dict[str, Any]) -> None:
    """Persist *tokens* to *path* as JSON with owner-only (0o600) permissions.

    Creates parent directories as needed.
    """
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(json.dumps(tokens, indent=2), encoding="utf-8")
    os.chmod(path, 0o600)

delete_tokens

delete_tokens(path: str) -> None

Delete the credentials file at path if it exists.

Source code in src/openjarvis/connectors/oauth.py
def delete_tokens(path: str) -> None:
    """Delete the credentials file at *path* if it exists."""
    p = Path(path)
    if p.exists():
        p.unlink()

exchange_google_token

exchange_google_token(code: str, client_id: str, client_secret: str, redirect_uri: str = _DEFAULT_REDIRECT_URI) -> Dict[str, Any]

Exchange an authorization code for access + refresh tokens.

PARAMETER DESCRIPTION
code

The authorization code received from Google's consent redirect.

TYPE: str

client_id

OAuth 2.0 client ID.

TYPE: str

client_secret

OAuth 2.0 client secret.

TYPE: str

redirect_uri

Must match the redirect URI used when obtaining the auth code.

TYPE: str DEFAULT: _DEFAULT_REDIRECT_URI

RETURNS DESCRIPTION
dict

Token response containing access_token, refresh_token, token_type, and expires_in.

Source code in src/openjarvis/connectors/oauth.py
def exchange_google_token(
    code: str,
    client_id: str,
    client_secret: str,
    redirect_uri: str = _DEFAULT_REDIRECT_URI,
) -> Dict[str, Any]:
    """Exchange an authorization code for access + refresh tokens.

    Parameters
    ----------
    code:
        The authorization code received from Google's consent redirect.
    client_id:
        OAuth 2.0 client ID.
    client_secret:
        OAuth 2.0 client secret.
    redirect_uri:
        Must match the redirect URI used when obtaining the auth code.

    Returns
    -------
    dict
        Token response containing ``access_token``, ``refresh_token``,
        ``token_type``, and ``expires_in``.
    """
    import httpx

    resp = httpx.post(
        "https://oauth2.googleapis.com/token",
        data={
            "code": code,
            "client_id": client_id,
            "client_secret": client_secret,
            "redirect_uri": redirect_uri,
            "grant_type": "authorization_code",
        },
        timeout=30.0,
    )
    resp.raise_for_status()
    return resp.json()

run_oauth_flow

run_oauth_flow(client_id: str, client_secret: str, scopes: List[str], credentials_path: str, redirect_uri: str = _DEFAULT_REDIRECT_URI) -> Dict[str, Any]

Run the full OAuth flow: browser consent, callback, token exchange.

Steps:

  1. Build consent URL
  2. Start localhost callback server
  3. Open browser to consent URL
  4. Wait for Google to redirect with ?code=...
  5. Exchange code for access_token + refresh_token
  6. Save tokens to credentials_path
  7. Return the tokens dict
PARAMETER DESCRIPTION
client_id

OAuth 2.0 client ID.

TYPE: str

client_secret

OAuth 2.0 client secret.

TYPE: str

scopes

List of OAuth scopes to request.

TYPE: List[str]

credentials_path

Where to persist the resulting tokens.

TYPE: str

redirect_uri

Local callback URI. Defaults to http://localhost:8789/callback.

TYPE: str DEFAULT: _DEFAULT_REDIRECT_URI

RETURNS DESCRIPTION
dict

Token response from Google (access_token, refresh_token, etc.).

RAISES DESCRIPTION
RuntimeError

If the user denies authorization or the callback times out.

Source code in src/openjarvis/connectors/oauth.py
def run_oauth_flow(
    client_id: str,
    client_secret: str,
    scopes: List[str],
    credentials_path: str,
    redirect_uri: str = _DEFAULT_REDIRECT_URI,
) -> Dict[str, Any]:
    """Run the full OAuth flow: browser consent, callback, token exchange.

    Steps:

    1. Build consent URL
    2. Start localhost callback server
    3. Open browser to consent URL
    4. Wait for Google to redirect with ``?code=...``
    5. Exchange code for ``access_token`` + ``refresh_token``
    6. Save tokens to *credentials_path*
    7. Return the tokens dict

    Parameters
    ----------
    client_id:
        OAuth 2.0 client ID.
    client_secret:
        OAuth 2.0 client secret.
    scopes:
        List of OAuth scopes to request.
    credentials_path:
        Where to persist the resulting tokens.
    redirect_uri:
        Local callback URI.  Defaults to ``http://localhost:8789/callback``.

    Returns
    -------
    dict
        Token response from Google (``access_token``, ``refresh_token``, etc.).

    Raises
    ------
    RuntimeError
        If the user denies authorization or the callback times out.
    """
    import webbrowser
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import parse_qs, urlparse

    auth_url = build_google_auth_url(
        client_id=client_id,
        redirect_uri=redirect_uri,
        scopes=scopes,
    )

    # Mutable containers used by the callback handler closure.
    auth_code: List[str] = []
    error: List[str] = []

    class _CallbackHandler(BaseHTTPRequestHandler):
        def do_GET(self) -> None:  # noqa: N802 — required override name
            parsed = urlparse(self.path)
            params = parse_qs(parsed.query)

            if "code" in params:
                auth_code.append(params["code"][0])
                self.send_response(200)
                self.send_header("Content-Type", "text/html")
                self.end_headers()
                self.wfile.write(
                    b"<html><body><h2>Authorization successful!</h2>"
                    b"<p>You can close this tab and return to OpenJarvis.</p>"
                    b"</body></html>"
                )
            elif "error" in params:
                error.append(params["error"][0])
                self.send_response(400)
                self.send_header("Content-Type", "text/html")
                self.end_headers()
                self.wfile.write(
                    b"<html><body><h2>Authorization failed</h2>"
                    b"<p>Please try again.</p></body></html>"
                )
            else:
                self.send_response(400)
                self.end_headers()

        def log_message(self, format: str, *args: Any) -> None:  # noqa: A002
            pass  # Suppress HTTP request logs

    # Parse port from redirect_uri
    port = int(urlparse(redirect_uri).port or 8789)

    # Kill any stale listener on the port before starting
    import socket

    test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        test_sock.bind(("127.0.0.1", port))
        test_sock.close()
    except OSError:
        # Port in use — try to free it
        test_sock.close()
        import subprocess

        subprocess.run(
            ["lsof", "-t", "-i", f":{port}"],
            capture_output=True,
        )
        # Wait briefly and retry
        import time

        time.sleep(1)

    server = HTTPServer(("127.0.0.1", port), _CallbackHandler)
    server.timeout = 120  # 2 minute timeout

    # Open the consent page in the user's default browser
    webbrowser.open(auth_url)

    # Wait for the callback (blocking, with per-request timeout)
    while not auth_code and not error:
        server.handle_request()

    server.server_close()

    if error:
        raise RuntimeError(f"OAuth authorization failed: {error[0]}")
    if not auth_code:
        raise RuntimeError("OAuth authorization timed out")

    # Exchange the authorization code for tokens
    tokens = exchange_google_token(
        code=auth_code[0],
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri=redirect_uri,
    )

    # Persist tokens together with client credentials (needed for refresh)
    token_payload = {
        "access_token": tokens.get("access_token", ""),
        "refresh_token": tokens.get("refresh_token", ""),
        "token_type": tokens.get("token_type", "Bearer"),
        "expires_in": tokens.get("expires_in", 3600),
        "client_id": client_id,
        "client_secret": client_secret,
    }
    save_tokens(credentials_path, token_payload)

    # Also save to the shared Google credentials file so that all Google
    # connectors can use this token without a separate OAuth flow.
    if credentials_path != _SHARED_GOOGLE_CREDENTIALS_PATH:
        save_tokens(_SHARED_GOOGLE_CREDENTIALS_PATH, token_payload)

    return tokens

run_connector_oauth

run_connector_oauth(connector_id: str, client_id: str = '', client_secret: str = '') -> Dict[str, Any]

Run a complete OAuth flow for connector_id.

  1. Look up the OAuthProvider
  2. Resolve client credentials (arg → stored → env)
  3. Build auth URL and open the user's browser
  4. Start localhost callback server and wait for the code
  5. Exchange the code for tokens
  6. Save tokens to all relevant credential files

Returns the raw token response dict.

Source code in src/openjarvis/connectors/oauth.py
def run_connector_oauth(
    connector_id: str,
    client_id: str = "",
    client_secret: str = "",
) -> Dict[str, Any]:
    """Run a complete OAuth flow for *connector_id*.

    1. Look up the ``OAuthProvider``
    2. Resolve client credentials (arg → stored → env)
    3. Build auth URL and open the user's browser
    4. Start localhost callback server and wait for the code
    5. Exchange the code for tokens
    6. Save tokens to all relevant credential files

    Returns the raw token response dict.
    """
    import webbrowser

    provider = get_provider_for_connector(connector_id)
    if provider is None:
        raise ValueError(f"No OAuth provider configured for '{connector_id}'")

    # Resolve credentials
    if not (client_id and client_secret):
        creds = get_client_credentials(provider)
        if creds:
            client_id, client_secret = creds
    if not (client_id and client_secret):
        raise RuntimeError(
            f"No client credentials for {provider.display_name}. "
            f"Set them up at: {provider.setup_url}"
        )

    redirect_uri = (
        f"http://{provider.callback_host}:{provider.callback_port}"
        f"{provider.callback_path}"
    )

    # Build auth URL
    params: Dict[str, str] = {
        "client_id": client_id,
        "redirect_uri": redirect_uri,
        "response_type": "code",
        "scope": " ".join(provider.scopes),
        **provider.extra_auth_params,
    }
    auth_url = f"{provider.auth_endpoint}?{urlencode(params)}"

    # Open browser and wait for callback
    webbrowser.open(auth_url)
    code = _wait_for_callback_code(
        host=provider.callback_host,
        port=provider.callback_port,
        path=provider.callback_path,
    )

    # Exchange code for tokens
    tokens = _exchange_token(provider, code, client_id, client_secret, redirect_uri)

    # Build payload with client credentials included (needed for refresh)
    payload = {
        "access_token": tokens.get("access_token", ""),
        "refresh_token": tokens.get("refresh_token", ""),
        "token_type": tokens.get("token_type", "Bearer"),
        "expires_in": tokens.get("expires_in", 3600),
        "client_id": client_id,
        "client_secret": client_secret,
    }

    # Save to all credential files for this provider
    for filename in provider.credential_files:
        save_tokens(str(_CONNECTORS_DIR / filename), payload)

    return tokens