Skip to content

webhook_routes

webhook_routes

Webhook endpoints for receiving messages from external platforms.

Functions

create_webhook_router

create_webhook_router(bridge: Any, twilio_auth_token: str = '', bluebubbles_password: str = '', whatsapp_verify_token: str = '', whatsapp_app_secret: str = '', sendblue_channel: Any = None) -> APIRouter

Create a FastAPI router with webhook endpoints.

Args: bridge: ChannelBridge instance for routing messages. twilio_auth_token: Twilio auth token for signatures. bluebubbles_password: BlueBubbles server password. whatsapp_verify_token: WhatsApp verification token. whatsapp_app_secret: WhatsApp app secret for HMAC. sendblue_channel: SendBlueChannel instance for reply-back.

Source code in src/openjarvis/server/webhook_routes.py
def create_webhook_router(
    bridge: Any,
    twilio_auth_token: str = "",
    bluebubbles_password: str = "",
    whatsapp_verify_token: str = "",
    whatsapp_app_secret: str = "",
    sendblue_channel: Any = None,
) -> APIRouter:
    """Create a FastAPI router with webhook endpoints.

    Args:
        bridge: ChannelBridge instance for routing messages.
        twilio_auth_token: Twilio auth token for signatures.
        bluebubbles_password: BlueBubbles server password.
        whatsapp_verify_token: WhatsApp verification token.
        whatsapp_app_secret: WhatsApp app secret for HMAC.
        sendblue_channel: SendBlueChannel instance for reply-back.
    """
    router = APIRouter(prefix="/webhooks", tags=["webhooks"])

    # ----------------------------------------------------------
    # Twilio SMS
    # ----------------------------------------------------------

    @router.post("/twilio")
    async def twilio_incoming(request: Request) -> Response:
        form = await request.form()
        params = dict(form)
        signature = request.headers.get("X-Twilio-Signature", "")
        url = str(request.url)

        if twilio_auth_token and not _validate_twilio_signature(
            twilio_auth_token, url, params, signature
        ):
            return Response("Invalid signature", status_code=403)

        from_number = params.get("From", "")
        body = params.get("Body", "")

        if not from_number or not body:
            return Response(
                content="<Response></Response>",
                media_type="application/xml",
            )

        # Use bridge or app.state.channel_bridge
        active_bridge = bridge or getattr(
            request.app.state, "channel_bridge", None,
        )

        def _handle_twilio() -> None:

            # Send ack via Twilio API
            try:
                from twilio.rest import Client

                # Get creds from app state bindings
                mgr = getattr(
                    request.app.state, "agent_manager", None,
                )
                twilio_client = None
                twilio_from = ""
                if mgr:
                    for agent in mgr.list_agents():
                        aid = agent.get("id", "")
                        for b in mgr.list_channel_bindings(aid):
                            if b.get("channel_type") == "twilio":
                                cfg = b.get("config", {})
                                sid = cfg.get("account_sid", "")
                                tok = cfg.get("auth_token", "")
                                twilio_from = cfg.get(
                                    "phone_number", "",
                                )
                                if sid and tok:
                                    twilio_client = Client(
                                        sid, tok,
                                    )
                                break

                if twilio_client and twilio_from:
                    twilio_client.messages.create(
                        body=(
                            "Message received! "
                            "Working on it now..."
                        ),
                        from_=twilio_from,
                        to=from_number,
                    )
            except Exception as _e:
                logger.warning("Twilio ack failed: %s", _e)

            # Process via bridge or agent directly
            response = ""
            if active_bridge:
                response = active_bridge.handle_incoming(
                    from_number, body, "twilio",
                    max_length=1600,
                )
            else:
                # Direct agent fallback
                try:
                    from openjarvis.agents.deep_research import (
                        DeepResearchAgent,
                    )
                    from openjarvis.server.agent_manager_routes import (
                        _build_deep_research_tools,
                    )

                    engine = getattr(
                        request.app.state, "engine", None,
                    )
                    if engine:
                        tools = _build_deep_research_tools(
                            engine=engine, model="",
                        )
                        agent = DeepResearchAgent(
                            engine=engine,
                            model=getattr(
                                engine, "_model", "",
                            ),
                            tools=tools,
                            max_turns=5,
                        )
                        result = agent.run(body)
                        response = result.content or ""
                except Exception as _exc:
                    response = f"Error: {_exc}"

            # Send response via Twilio
            if response and twilio_client and twilio_from:
                try:
                    clean = _format_for_sms(response)
                    # Twilio SMS limit is 1600 chars
                    if len(clean) > 1500:
                        clean = clean[:1500] + "\n\n(truncated)"
                    twilio_client.messages.create(
                        body=clean,
                        from_=twilio_from,
                        to=from_number,
                    )
                except Exception as _e:
                    logger.warning(
                        "Twilio reply failed: %s", _e,
                    )

        task = asyncio.create_task(
            asyncio.to_thread(_handle_twilio),
        )
        task.add_done_callback(_log_task_exception)

        return Response(
            content="<Response></Response>",
            media_type="application/xml",
        )

    # ----------------------------------------------------------
    # BlueBubbles (iMessage)
    # ----------------------------------------------------------

    @router.post("/bluebubbles")
    async def bluebubbles_incoming(
        request: Request,
    ) -> Response:
        auth = request.headers.get("Authorization", "")
        if bluebubbles_password and auth != bluebubbles_password:
            return Response("Invalid password", status_code=403)

        payload = await request.json()
        msg_type = payload.get("type", "")
        if msg_type != "new-message":
            return Response("OK", status_code=200)

        data = payload.get("data", {})
        handle = data.get("handle", {})
        sender = handle.get("address", "")
        text = data.get("text", "")

        task = asyncio.create_task(
            asyncio.to_thread(
                bridge.handle_incoming,
                sender,
                text,
                "bluebubbles",
            )
        )
        task.add_done_callback(_log_task_exception)

        return Response("OK", status_code=200)

    # ----------------------------------------------------------
    # WhatsApp Cloud API
    # ----------------------------------------------------------

    @router.get("/whatsapp")
    async def whatsapp_verify(request: Request) -> Response:
        mode = request.query_params.get("hub.mode", "")
        token = request.query_params.get("hub.verify_token", "")
        challenge = request.query_params.get("hub.challenge", "")

        if mode == "subscribe" and token == whatsapp_verify_token:
            return PlainTextResponse(challenge)
        return Response("Forbidden", status_code=403)

    @router.post("/whatsapp")
    async def whatsapp_incoming(
        request: Request,
    ) -> Response:
        body_bytes = await request.body()

        # Verify signature
        if whatsapp_app_secret:
            signature = request.headers.get("X-Hub-Signature-256", "")
            expected = (
                "sha256="
                + hmac.new(
                    whatsapp_app_secret.encode(),
                    body_bytes,
                    hashlib.sha256,
                ).hexdigest()
            )
            if not hmac.compare_digest(signature, expected):
                return Response("Invalid signature", status_code=403)

        payload = json.loads(body_bytes)
        for entry in payload.get("entry", []):
            for change in entry.get("changes", []):
                value = change.get("value", {})
                for message in value.get("messages", []):
                    if message.get("type") != "text":
                        continue
                    sender = message.get("from", "")
                    text = message.get("text", {}).get("body", "")

                    task = asyncio.create_task(
                        asyncio.to_thread(
                            bridge.handle_incoming,
                            sender,
                            text,
                            "whatsapp",
                        )
                    )
                    task.add_done_callback(_log_task_exception)

        return Response("OK", status_code=200)

    # ----------------------------------------------------------
    # SendBlue (iMessage / SMS)
    # ----------------------------------------------------------

    @router.post("/sendblue")
    async def sendblue_incoming(request: Request) -> Response:
        payload = await request.json()

        # Get the SendBlue channel — may be passed at init or set later
        sb = sendblue_channel or getattr(
            request.app.state, "sendblue_channel", None
        )

        # Verify webhook secret if configured
        if sb and sb.webhook_secret:
            header_secret = request.headers.get("x-sendblue-secret", "")
            if header_secret != sb.webhook_secret:
                return Response("Invalid secret", status_code=403)
        elif sb:
            logger.warning(
                "SendBlue webhook received without secret verification. "
                "Set webhook_secret for HMAC validation."
            )

        # Ignore outbound status callbacks
        if payload.get("is_outbound", False):
            return Response("OK", status_code=200)

        from_number = payload.get("from_number", "")
        content = payload.get("content", "")

        if not from_number or not content:
            return Response("OK", status_code=200)

        # Capture sb for the closure
        reply_channel = sb
        # Also check for a dynamically-created bridge on app.state
        active_bridge = bridge or getattr(
            request.app.state, "channel_bridge", None
        )

        if not active_bridge:
            logger.warning("No channel bridge — cannot process SendBlue msg")
            return Response("OK", status_code=200)

        # Message queue tracking (per-sender)
        _sendblue_queues = getattr(
            request.app.state, "_sendblue_queues", None
        )
        if _sendblue_queues is None:
            import threading as _th

            _sendblue_queues = {}
            _sendblue_queues["_lock"] = _th.Lock()
            request.app.state._sendblue_queues = _sendblue_queues

        def _handle_and_reply() -> None:
            import threading

            lock = _sendblue_queues["_lock"]

            # Track queue depth for this sender
            with lock:
                q = _sendblue_queues.setdefault(
                    from_number, {"pending": 0}
                )
                q["pending"] += 1
                position = q["pending"]

            # Immediate acknowledgment
            if reply_channel:
                if position > 1:
                    reply_channel.send(
                        from_number,
                        f"Message received! Message {position} in"
                        f" queue, will respond ASAP",
                    )
                else:
                    reply_channel.send(
                        from_number,
                        "Message received! Working on it now...",
                    )

            # Periodic "still working" reminders every 60s
            done_event = threading.Event()

            def _send_reminders() -> None:
                while not done_event.wait(60):
                    if reply_channel:
                        reply_channel.send(
                            from_number,
                            "Still working! Will reply ASAP",
                        )

            reminder = threading.Thread(
                target=_send_reminders, daemon=True
            )
            reminder.start()

            try:
                response = active_bridge.handle_incoming(
                    from_number, content, "sendblue"
                )
            finally:
                done_event.set()
                with lock:
                    q["pending"] = max(0, q["pending"] - 1)

            # Format response for clean text message display
            if response and reply_channel:
                reply_channel.send(
                    from_number, _format_for_sms(response)
                )

        task = asyncio.create_task(asyncio.to_thread(_handle_and_reply))
        task.add_done_callback(_log_task_exception)

        return Response("OK", status_code=200)

    return router