Skip to content

connectors_router

connectors_router

FastAPI router for /v1/connectors — connector management endpoints.

Classes

ConnectRequest

Bases: BaseModel

Credentials / connection parameters for a connector.

Functions

create_connectors_router

create_connectors_router()

Return an APIRouter with /connectors endpoints.

Importing FastAPI inside the factory avoids a hard import-time dependency and mirrors the pattern used by other optional routers in this package.

Source code in src/openjarvis/server/connectors_router.py
def create_connectors_router():
    """Return an APIRouter with /connectors endpoints.

    Importing FastAPI inside the factory avoids a hard import-time
    dependency and mirrors the pattern used by other optional routers in
    this package.
    """
    try:
        from fastapi import APIRouter, HTTPException
    except ImportError as exc:
        raise ImportError(
            "fastapi and pydantic are required for the connectors router"
        ) from exc

    if ConnectRequest is None:
        raise ImportError("pydantic is required for the connectors router")

    from openjarvis.core.registry import ConnectorRegistry

    router = APIRouter(prefix="/v1/connectors", tags=["connectors"])

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _get_or_create(connector_id: str) -> Any:
        """Return a cached connector instance, creating it if needed."""
        if connector_id not in _instances:
            cls = ConnectorRegistry.get(connector_id)
            _instances[connector_id] = cls()
        return _instances[connector_id]

    def _connector_summary(connector_id: str, instance: Any) -> Dict[str, Any]:
        """Build the dict returned by GET /connectors."""
        chunks = 0
        try:
            from openjarvis.connectors.store import KnowledgeStore

            store = KnowledgeStore()
            rows = store._conn.execute(
                "SELECT COUNT(*) FROM knowledge_chunks WHERE source = ?",
                (connector_id,),
            ).fetchone()
            chunks = rows[0] if rows else 0
        except Exception:
            pass

        return {
            "connector_id": connector_id,
            "display_name": getattr(instance, "display_name", connector_id),
            "auth_type": getattr(instance, "auth_type", "unknown"),
            "connected": instance.is_connected(),
            "chunks": chunks,
        }

    # ------------------------------------------------------------------
    # Endpoints
    # ------------------------------------------------------------------

    @router.get("")
    async def list_connectors():
        """List all registered connectors with their connection status."""
        _ensure_connectors_registered()
        results = []
        for key in sorted(ConnectorRegistry.keys()):
            try:
                instance = _get_or_create(key)
                results.append(_connector_summary(key, instance))
            except Exception:
                results.append(
                    {
                        "connector_id": key,
                        "display_name": key,
                        "auth_type": "unknown",
                        "connected": False,
                    }
                )
        return {"connectors": results}

    @router.get("/{connector_id}")
    async def connector_detail(connector_id: str):
        """Return detail for a single connector."""
        _ensure_connectors_registered()
        if not ConnectorRegistry.contains(connector_id):
            raise HTTPException(
                status_code=404,
                detail=f"Connector '{connector_id}' not found",
            )
        instance = _get_or_create(connector_id)

        # Try to get an OAuth URL if applicable; ignore errors for non-OAuth
        # connectors.
        auth_url: Optional[str] = None
        try:
            auth_url = instance.auth_url()
        except (NotImplementedError, Exception):
            pass

        # Serialise MCP tool names only (ToolSpec objects are not JSON-safe).
        mcp_tools = []
        try:
            mcp_tools = [t.name for t in instance.mcp_tools()]
        except Exception:
            pass

        return {
            "connector_id": connector_id,
            "display_name": getattr(instance, "display_name", connector_id),
            "auth_type": getattr(instance, "auth_type", "unknown"),
            "connected": instance.is_connected(),
            "auth_url": auth_url,
            "mcp_tools": mcp_tools,
        }

    @router.post("/{connector_id}/connect")
    async def connect_connector(connector_id: str, req: ConnectRequest):
        """Connect a connector using the supplied credentials."""
        _ensure_connectors_registered()
        if not ConnectorRegistry.contains(connector_id):
            raise HTTPException(
                status_code=404,
                detail=f"Connector '{connector_id}' not found",
            )
        instance = _get_or_create(connector_id)

        try:
            auth_type = getattr(instance, "auth_type", "unknown")

            if auth_type == "filesystem":
                # Filesystem connectors accept a vault / directory path.
                if req.path:
                    instance._vault_path = req.path
                    from pathlib import Path

                    instance._connected = Path(req.path).is_dir()

            elif auth_type == "oauth":
                if req.code:
                    instance.handle_callback(req.code)
                elif req.token:
                    # Some OAuth connectors accept a pre-existing token.
                    if hasattr(instance, "_token"):
                        instance._token = req.token

            else:
                # Generic: try to store token or credentials if the instance
                # exposes the relevant attributes.
                if req.token and hasattr(instance, "_token"):
                    instance._token = req.token
                if req.email and hasattr(instance, "_email"):
                    instance._email = req.email
                if req.password and hasattr(instance, "_password"):
                    instance._password = req.password

        except Exception as exc:
            raise HTTPException(status_code=400, detail=str(exc))

        # Auto-ingest after successful connection
        if instance.is_connected():
            import threading

            def _ingest() -> None:
                try:
                    from openjarvis.connectors.pipeline import (
                        IngestionPipeline,
                    )
                    from openjarvis.connectors.store import KnowledgeStore
                    from openjarvis.connectors.sync_engine import SyncEngine

                    store = KnowledgeStore()
                    pipeline = IngestionPipeline(store)
                    engine = SyncEngine(pipeline)
                    engine.sync(instance)
                    logger.info(
                        "Auto-ingested %s after connect",
                        connector_id,
                    )
                except Exception as exc:
                    logger.warning(
                        "Auto-ingest failed for %s: %s",
                        connector_id,
                        exc,
                    )

            threading.Thread(target=_ingest, daemon=True).start()

        return {
            "connector_id": connector_id,
            "connected": instance.is_connected(),
            "status": "connected" if instance.is_connected() else "pending",
        }

    @router.post("/{connector_id}/disconnect")
    async def disconnect_connector(connector_id: str):
        """Disconnect a connector and clear its credentials."""
        _ensure_connectors_registered()
        if not ConnectorRegistry.contains(connector_id):
            raise HTTPException(
                status_code=404,
                detail=f"Connector '{connector_id}' not found",
            )
        instance = _get_or_create(connector_id)
        try:
            instance.disconnect()
        except Exception as exc:
            raise HTTPException(status_code=500, detail=str(exc))
        return {
            "connector_id": connector_id,
            "connected": False,
            "status": "disconnected",
        }

    # Track background sync state per connector
    _sync_threads: Dict[str, Any] = {}
    _sync_state: Dict[str, Dict[str, Any]] = {}  # {connector_id: {state, error}}

    @router.post("/{connector_id}/sync")
    def trigger_sync(connector_id: str) -> Dict[str, Any]:
        """Trigger a sync in the background and return immediately."""
        import threading

        _ensure_connectors_registered()
        if not ConnectorRegistry.contains(connector_id):
            raise HTTPException(
                status_code=404,
                detail=f"Connector '{connector_id}' not found",
            )
        inst = _get_or_create(connector_id)
        if not inst.is_connected():
            raise HTTPException(
                status_code=400,
                detail=f"Connector '{connector_id}' is not connected",
            )

        # If already syncing, don't start another
        existing = _sync_threads.get(connector_id)
        if existing and existing.is_alive():
            return {
                "connector_id": connector_id,
                "status": "already_syncing",
            }

        # Mark as syncing immediately so the UI picks it up
        _sync_state[connector_id] = {"state": "syncing", "error": None}

        def _run_sync() -> None:
            try:
                from openjarvis.connectors.pipeline import IngestionPipeline
                from openjarvis.connectors.store import KnowledgeStore
                from openjarvis.connectors.sync_engine import SyncEngine

                store = KnowledgeStore()
                pipeline = IngestionPipeline(store=store)
                engine = SyncEngine(pipeline=pipeline)
                engine.sync(inst)
                logger.info("Sync completed for %s", connector_id)
                _sync_state[connector_id] = {"state": "complete", "error": None}
            except Exception as exc:
                error_msg = str(exc)
                if "401" in error_msg or "Unauthorized" in error_msg:
                    error_msg = "Authentication failed — credentials may have expired."
                elif "403" in error_msg or "Forbidden" in error_msg:
                    error_msg = "Permission denied — check API scopes."
                elif "429" in error_msg or "Too Many Requests" in error_msg:
                    error_msg = "Rate limited — wait a minute and try again."
                elif "timeout" in error_msg.lower():
                    error_msg = "Connection timed out."
                logger.error("Sync failed for %s: %s", connector_id, error_msg)
                _sync_state[connector_id] = {"state": "error", "error": error_msg}

        t = threading.Thread(target=_run_sync, daemon=True)
        t.start()
        _sync_threads[connector_id] = t

        return {
            "connector_id": connector_id,
            "status": "started",
        }

    @router.get("/{connector_id}/sync")
    async def sync_status(connector_id: str):
        """Return the current sync status for a connector."""
        _ensure_connectors_registered()
        if not ConnectorRegistry.contains(connector_id):
            raise HTTPException(
                status_code=404,
                detail=f"Connector '{connector_id}' not found",
            )
        instance = _get_or_create(connector_id)
        try:
            status = instance.sync_status()
        except Exception as exc:
            raise HTTPException(status_code=500, detail=str(exc))

        # Override with router-level sync state (background thread tracking)
        bg = _sync_state.get(connector_id, {})
        bg_thread = _sync_threads.get(connector_id)
        is_bg_running = bg_thread is not None and bg_thread.is_alive()

        # Determine effective state
        if is_bg_running:
            effective_state = "syncing"
        elif bg.get("state") == "error":
            effective_state = "error"
        elif status.state != "idle":
            effective_state = status.state
        else:
            effective_state = status.state

        # Use the bg error if the connector doesn't have one
        effective_error = status.error or bg.get("error")

        return {
            "connector_id": connector_id,
            "state": effective_state,
            "items_synced": status.items_synced,
            "items_total": status.items_total,
            "last_sync": (status.last_sync.isoformat() if status.last_sync else None),
            "error": effective_error,
        }

    return router