def create_webhook_router(
bridge: Any,
twilio_auth_token: str = "",
bluebubbles_password: str = "",
whatsapp_verify_token: str = "",
whatsapp_app_secret: str = "",
sendblue_channel: Any = None,
) -> APIRouter:
"""Create a FastAPI router with webhook endpoints.
Args:
bridge: ChannelBridge instance for routing messages.
twilio_auth_token: Twilio auth token for signatures.
bluebubbles_password: BlueBubbles server password.
whatsapp_verify_token: WhatsApp verification token.
whatsapp_app_secret: WhatsApp app secret for HMAC.
sendblue_channel: SendBlueChannel instance for reply-back.
"""
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
# ----------------------------------------------------------
# Twilio SMS
# ----------------------------------------------------------
@router.post("/twilio")
async def twilio_incoming(request: Request) -> Response:
form = await request.form()
params = dict(form)
signature = request.headers.get("X-Twilio-Signature", "")
url = str(request.url)
if twilio_auth_token and not _validate_twilio_signature(
twilio_auth_token, url, params, signature
):
return Response("Invalid signature", status_code=403)
from_number = params.get("From", "")
body = params.get("Body", "")
if not from_number or not body:
return Response(
content="<Response></Response>",
media_type="application/xml",
)
# Use bridge or app.state.channel_bridge
active_bridge = bridge or getattr(
request.app.state, "channel_bridge", None,
)
def _handle_twilio() -> None:
# Send ack via Twilio API
try:
from twilio.rest import Client
# Get creds from app state bindings
mgr = getattr(
request.app.state, "agent_manager", None,
)
twilio_client = None
twilio_from = ""
if mgr:
for agent in mgr.list_agents():
aid = agent.get("id", "")
for b in mgr.list_channel_bindings(aid):
if b.get("channel_type") == "twilio":
cfg = b.get("config", {})
sid = cfg.get("account_sid", "")
tok = cfg.get("auth_token", "")
twilio_from = cfg.get(
"phone_number", "",
)
if sid and tok:
twilio_client = Client(
sid, tok,
)
break
if twilio_client and twilio_from:
twilio_client.messages.create(
body=(
"Message received! "
"Working on it now..."
),
from_=twilio_from,
to=from_number,
)
except Exception as _e:
logger.warning("Twilio ack failed: %s", _e)
# Process via bridge or agent directly
response = ""
if active_bridge:
response = active_bridge.handle_incoming(
from_number, body, "twilio",
max_length=1600,
)
else:
# Direct agent fallback
try:
from openjarvis.agents.deep_research import (
DeepResearchAgent,
)
from openjarvis.server.agent_manager_routes import (
_build_deep_research_tools,
)
engine = getattr(
request.app.state, "engine", None,
)
if engine:
tools = _build_deep_research_tools(
engine=engine, model="",
)
agent = DeepResearchAgent(
engine=engine,
model=getattr(
engine, "_model", "",
),
tools=tools,
max_turns=5,
)
result = agent.run(body)
response = result.content or ""
except Exception as _exc:
response = f"Error: {_exc}"
# Send response via Twilio
if response and twilio_client and twilio_from:
try:
clean = _format_for_sms(response)
# Twilio SMS limit is 1600 chars
if len(clean) > 1500:
clean = clean[:1500] + "\n\n(truncated)"
twilio_client.messages.create(
body=clean,
from_=twilio_from,
to=from_number,
)
except Exception as _e:
logger.warning(
"Twilio reply failed: %s", _e,
)
task = asyncio.create_task(
asyncio.to_thread(_handle_twilio),
)
task.add_done_callback(_log_task_exception)
return Response(
content="<Response></Response>",
media_type="application/xml",
)
# ----------------------------------------------------------
# BlueBubbles (iMessage)
# ----------------------------------------------------------
@router.post("/bluebubbles")
async def bluebubbles_incoming(
request: Request,
) -> Response:
auth = request.headers.get("Authorization", "")
if bluebubbles_password and auth != bluebubbles_password:
return Response("Invalid password", status_code=403)
payload = await request.json()
msg_type = payload.get("type", "")
if msg_type != "new-message":
return Response("OK", status_code=200)
data = payload.get("data", {})
handle = data.get("handle", {})
sender = handle.get("address", "")
text = data.get("text", "")
task = asyncio.create_task(
asyncio.to_thread(
bridge.handle_incoming,
sender,
text,
"bluebubbles",
)
)
task.add_done_callback(_log_task_exception)
return Response("OK", status_code=200)
# ----------------------------------------------------------
# WhatsApp Cloud API
# ----------------------------------------------------------
@router.get("/whatsapp")
async def whatsapp_verify(request: Request) -> Response:
mode = request.query_params.get("hub.mode", "")
token = request.query_params.get("hub.verify_token", "")
challenge = request.query_params.get("hub.challenge", "")
if mode == "subscribe" and token == whatsapp_verify_token:
return PlainTextResponse(challenge)
return Response("Forbidden", status_code=403)
@router.post("/whatsapp")
async def whatsapp_incoming(
request: Request,
) -> Response:
body_bytes = await request.body()
# Verify signature
if whatsapp_app_secret:
signature = request.headers.get("X-Hub-Signature-256", "")
expected = (
"sha256="
+ hmac.new(
whatsapp_app_secret.encode(),
body_bytes,
hashlib.sha256,
).hexdigest()
)
if not hmac.compare_digest(signature, expected):
return Response("Invalid signature", status_code=403)
payload = json.loads(body_bytes)
for entry in payload.get("entry", []):
for change in entry.get("changes", []):
value = change.get("value", {})
for message in value.get("messages", []):
if message.get("type") != "text":
continue
sender = message.get("from", "")
text = message.get("text", {}).get("body", "")
task = asyncio.create_task(
asyncio.to_thread(
bridge.handle_incoming,
sender,
text,
"whatsapp",
)
)
task.add_done_callback(_log_task_exception)
return Response("OK", status_code=200)
# ----------------------------------------------------------
# SendBlue (iMessage / SMS)
# ----------------------------------------------------------
@router.post("/sendblue")
async def sendblue_incoming(request: Request) -> Response:
payload = await request.json()
# Get the SendBlue channel — may be passed at init or set later
sb = sendblue_channel or getattr(
request.app.state, "sendblue_channel", None
)
# Verify webhook secret if configured
if sb and sb.webhook_secret:
header_secret = request.headers.get("x-sendblue-secret", "")
if header_secret != sb.webhook_secret:
return Response("Invalid secret", status_code=403)
elif sb:
logger.warning(
"SendBlue webhook received without secret verification. "
"Set webhook_secret for HMAC validation."
)
# Ignore outbound status callbacks
if payload.get("is_outbound", False):
return Response("OK", status_code=200)
from_number = payload.get("from_number", "")
content = payload.get("content", "")
if not from_number or not content:
return Response("OK", status_code=200)
# Capture sb for the closure
reply_channel = sb
# Also check for a dynamically-created bridge on app.state
active_bridge = bridge or getattr(
request.app.state, "channel_bridge", None
)
if not active_bridge:
logger.warning("No channel bridge — cannot process SendBlue msg")
return Response("OK", status_code=200)
# Message queue tracking (per-sender)
_sendblue_queues = getattr(
request.app.state, "_sendblue_queues", None
)
if _sendblue_queues is None:
import threading as _th
_sendblue_queues = {}
_sendblue_queues["_lock"] = _th.Lock()
request.app.state._sendblue_queues = _sendblue_queues
def _handle_and_reply() -> None:
import threading
lock = _sendblue_queues["_lock"]
# Track queue depth for this sender
with lock:
q = _sendblue_queues.setdefault(
from_number, {"pending": 0}
)
q["pending"] += 1
position = q["pending"]
# Immediate acknowledgment
if reply_channel:
if position > 1:
reply_channel.send(
from_number,
f"Message received! Message {position} in"
f" queue, will respond ASAP",
)
else:
reply_channel.send(
from_number,
"Message received! Working on it now...",
)
# Periodic "still working" reminders every 60s
done_event = threading.Event()
def _send_reminders() -> None:
while not done_event.wait(60):
if reply_channel:
reply_channel.send(
from_number,
"Still working! Will reply ASAP",
)
reminder = threading.Thread(
target=_send_reminders, daemon=True
)
reminder.start()
try:
response = active_bridge.handle_incoming(
from_number, content, "sendblue"
)
finally:
done_event.set()
with lock:
q["pending"] = max(0, q["pending"] - 1)
# Format response for clean text message display
if response and reply_channel:
reply_channel.send(
from_number, _format_for_sms(response)
)
task = asyncio.create_task(asyncio.to_thread(_handle_and_reply))
task.add_done_callback(_log_task_exception)
return Response("OK", status_code=200)
return router