TwilioSMSChannel(*, account_sid: str = '', auth_token: str = '', phone_number: str = '', bus: Optional[EventBus] = None)
Bases: BaseChannel
Send and receive SMS via Twilio.
Sending uses the Twilio REST API. Receiving is handled by
the /webhooks/twilio FastAPI endpoint which calls
registered handlers.
Source code in src/openjarvis/channels/twilio_sms.py
| def __init__(
self,
*,
account_sid: str = "",
auth_token: str = "",
phone_number: str = "",
bus: Optional[EventBus] = None,
) -> None:
self._account_sid = account_sid or os.environ.get("TWILIO_ACCOUNT_SID", "")
self._auth_token = auth_token or os.environ.get("TWILIO_AUTH_TOKEN", "")
self._phone_number = phone_number or os.environ.get("TWILIO_PHONE_NUMBER", "")
self._bus = bus
self._client: Any = None
self._handlers: List[ChannelHandler] = []
self._status = ChannelStatus.DISCONNECTED
|
Functions
handle_webhook
handle_webhook(from_number: str, body: str, message_sid: str = '') -> None
Called by the webhook route for incoming SMS.
Source code in src/openjarvis/channels/twilio_sms.py
| def handle_webhook(
self,
from_number: str,
body: str,
message_sid: str = "",
) -> None:
"""Called by the webhook route for incoming SMS."""
msg = ChannelMessage(
channel="twilio",
sender=from_number,
content=body,
message_id=message_sid,
)
for handler in self._handlers:
handler(msg)
if self._bus:
self._bus.publish(
EventType.CHANNEL_MESSAGE_RECEIVED,
{
"channel": "twilio",
"sender": from_number,
"content": body,
"message_id": message_sid,
},
)
|