Skip to content

spotify

spotify

Spotify connector — recently played tracks via Spotify Web API.

Uses OAuth2 tokens stored locally. Requires user-read-recently-played scope.

Classes

SpotifyConnector

SpotifyConnector(*, token_path: str = _DEFAULT_TOKEN_PATH)

Bases: BaseConnector

Sync recently played tracks from Spotify.

Source code in src/openjarvis/connectors/spotify.py
def __init__(self, *, token_path: str = _DEFAULT_TOKEN_PATH) -> None:
    self._token_path = Path(token_path)
    self._status = SyncStatus()
Functions
auth_url
auth_url() -> str

Return Spotify OAuth authorization URL.

Source code in src/openjarvis/connectors/spotify.py
def auth_url(self) -> str:
    """Return Spotify OAuth authorization URL."""
    from urllib.parse import urlencode

    from openjarvis.connectors.oauth import (
        get_client_credentials,
        get_provider_for_connector,
    )

    provider = get_provider_for_connector("spotify")
    if not provider:
        return "https://developer.spotify.com/dashboard"
    creds = get_client_credentials(provider)
    if not creds:
        return "https://developer.spotify.com/dashboard"
    client_id, _ = creds
    redirect_uri = f"http://{provider.callback_host}:{provider.callback_port}{provider.callback_path}"
    params = {
        "client_id": client_id,
        "redirect_uri": redirect_uri,
        "response_type": "code",
        "scope": " ".join(provider.scopes),
    }
    return f"{provider.auth_endpoint}?{urlencode(params)}"
handle_callback
handle_callback(code: str) -> None

Exchange authorization code for tokens and save.

Source code in src/openjarvis/connectors/spotify.py
def handle_callback(self, code: str) -> None:
    """Exchange authorization code for tokens and save."""
    from openjarvis.connectors.oauth import (
        _CONNECTORS_DIR,
        _exchange_token,
        get_client_credentials,
        get_provider_for_connector,
        save_tokens,
    )

    provider = get_provider_for_connector("spotify")
    creds = get_client_credentials(provider) if provider else None
    if not provider or not creds:
        raise RuntimeError("Spotify client credentials not configured")
    client_id, client_secret = creds
    redirect_uri = f"http://{provider.callback_host}:{provider.callback_port}{provider.callback_path}"
    tokens = _exchange_token(provider, code, client_id, client_secret, redirect_uri)
    payload = {
        "access_token": tokens.get("access_token", ""),
        "refresh_token": tokens.get("refresh_token", ""),
        "client_id": client_id,
        "client_secret": client_secret,
    }
    for filename in provider.credential_files:
        save_tokens(str(_CONNECTORS_DIR / filename), payload)