Skip to content

twitter_channel

twitter_channel

TwitterChannel — Twitter/X API v2 adapter using OAuth 1.0a.

Classes

TwitterChannel

TwitterChannel(bearer_token: str = '', *, api_key: str = '', api_secret: str = '', access_token: str = '', access_secret: str = '', bot_user_id: str = '', poll_interval: int = 60, bus: Optional[EventBus] = None)

Bases: BaseChannel

Native Twitter/X channel adapter using the v2 API.

PARAMETER DESCRIPTION
bearer_token

Bearer token for read endpoints. Falls back to TWITTER_BEARER_TOKEN env var.

TYPE: str DEFAULT: ''

api_key

OAuth 1.0a consumer credentials. Fall back to TWITTER_API_KEY / TWITTER_API_SECRET.

TYPE: str DEFAULT: ''

access_token

OAuth 1.0a user credentials. Fall back to TWITTER_ACCESS_TOKEN / TWITTER_ACCESS_SECRET.

TYPE: str DEFAULT: ''

bot_user_id

Numeric Twitter user ID for the bot account. Falls back to TWITTER_BOT_USER_ID.

TYPE: str DEFAULT: ''

poll_interval

Seconds between mention polls (default 60).

TYPE: int DEFAULT: 60

bus

Optional event bus for publishing channel events.

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/channels/twitter_channel.py
def __init__(
    self,
    bearer_token: str = "",
    *,
    api_key: str = "",
    api_secret: str = "",
    access_token: str = "",
    access_secret: str = "",
    bot_user_id: str = "",
    poll_interval: int = 60,
    bus: Optional[EventBus] = None,
) -> None:
    self._bearer = bearer_token or os.environ.get("TWITTER_BEARER_TOKEN", "")
    self._api_key = api_key or os.environ.get("TWITTER_API_KEY", "")
    self._api_secret = api_secret or os.environ.get("TWITTER_API_SECRET", "")
    self._access_token = access_token or os.environ.get("TWITTER_ACCESS_TOKEN", "")
    self._access_secret = access_secret or os.environ.get(
        "TWITTER_ACCESS_SECRET", "",
    )
    self._bot_user_id = bot_user_id or os.environ.get("TWITTER_BOT_USER_ID", "")
    self._poll_interval = poll_interval
    self._bus = bus

    self._handlers: List[ChannelHandler] = []
    self._status = ChannelStatus.DISCONNECTED
    self._listener_thread: Optional[threading.Thread] = None
    self._stop_event = threading.Event()
    self._since_id: Optional[str] = None
Functions
connect
connect() -> None

Validate credentials and start mention polling.

Source code in src/openjarvis/channels/twitter_channel.py
def connect(self) -> None:
    """Validate credentials and start mention polling."""
    if not self._bearer:
        logger.warning("No Twitter bearer token configured")
        self._status = ChannelStatus.ERROR
        return

    self._stop_event.clear()
    self._status = ChannelStatus.CONNECTING

    self._listener_thread = threading.Thread(
        target=self._poll_mentions, daemon=True,
    )
    self._listener_thread.start()
    self._status = ChannelStatus.CONNECTED
    logger.info("Twitter channel connected (polling mentions)")
disconnect
disconnect() -> None

Stop the polling thread.

Source code in src/openjarvis/channels/twitter_channel.py
def disconnect(self) -> None:
    """Stop the polling thread."""
    self._stop_event.set()
    if self._listener_thread is not None:
        self._listener_thread.join(timeout=5.0)
        self._listener_thread = None
    self._status = ChannelStatus.DISCONNECTED
send
send(channel: str, content: str, *, conversation_id: str = '', metadata: Dict[str, Any] | None = None) -> bool

Post a tweet, optionally as a reply.

channel is ignored (tweets go to the bot's timeline). conversation_id is used as reply.in_reply_to_tweet_id when set. Content is truncated to 280 characters.

Source code in src/openjarvis/channels/twitter_channel.py
def send(
    self,
    channel: str,
    content: str,
    *,
    conversation_id: str = "",
    metadata: Dict[str, Any] | None = None,
) -> bool:
    """Post a tweet, optionally as a reply.

    *channel* is ignored (tweets go to the bot's timeline).
    *conversation_id* is used as ``reply.in_reply_to_tweet_id`` when set.
    Content is truncated to 280 characters.
    """
    if not self._api_key:
        logger.warning("Cannot send: no Twitter OAuth credentials")
        return False

    text = content[:280]

    try:
        import httpx

        payload: Dict[str, Any] = {"text": text}
        if conversation_id:
            payload["reply"] = {"in_reply_to_tweet_id": conversation_id}

        resp = httpx.post(
            f"{_API_BASE}/tweets",
            json=payload,
            auth=self._oauth(),
            timeout=10.0,
        )
        if resp.status_code < 300:
            self._publish_sent(channel, text, conversation_id)
            return True
        logger.warning(
            "Twitter API returned status %d: %s",
            resp.status_code,
            resp.text,
        )
        return False
    except Exception:
        logger.debug("Twitter send failed", exc_info=True)
        return False
status
status() -> ChannelStatus

Return the current connection status.

Source code in src/openjarvis/channels/twitter_channel.py
def status(self) -> ChannelStatus:
    """Return the current connection status."""
    return self._status
list_channels
list_channels() -> List[str]

Return available channel identifiers.

Source code in src/openjarvis/channels/twitter_channel.py
def list_channels(self) -> List[str]:
    """Return available channel identifiers."""
    return ["twitter"]
on_message
on_message(handler: ChannelHandler) -> None

Register a callback for incoming mentions.

Source code in src/openjarvis/channels/twitter_channel.py
def on_message(self, handler: ChannelHandler) -> None:
    """Register a callback for incoming mentions."""
    self._handlers.append(handler)