Skip to content

slack_connector

slack_connector

Slack connector — bulk channel message sync via the Slack Web API.

Uses OAuth tokens stored locally (see :mod:openjarvis.connectors.oauth). All network calls are isolated in module-level functions (_slack_api_*) to make them trivially mockable in tests.

Classes

SlackConnector

SlackConnector(credentials_path: str = '')

Bases: BaseConnector

Connector that syncs channel message history from Slack via the Web API.

Authentication is handled through Slack OAuth 2.0. Tokens are stored locally in a JSON credentials file.

PARAMETER DESCRIPTION
credentials_path

Path to the JSON file where OAuth tokens are stored. Defaults to ~/.openjarvis/connectors/slack.json.

TYPE: str DEFAULT: ''

Source code in src/openjarvis/connectors/slack_connector.py
def __init__(self, credentials_path: str = "") -> None:
    self._credentials_path = credentials_path or _DEFAULT_CREDENTIALS_PATH
    self._items_synced: int = 0
    self._items_total: int = 0
    self._last_sync: Optional[datetime] = None
    self._last_cursor: Optional[str] = None
Functions
is_connected
is_connected() -> bool

Return True if a credentials file with a valid token exists.

Source code in src/openjarvis/connectors/slack_connector.py
def is_connected(self) -> bool:
    """Return ``True`` if a credentials file with a valid token exists."""
    tokens = load_tokens(self._credentials_path)
    if tokens is None:
        return False
    return bool(tokens)
disconnect
disconnect() -> None

Delete the stored credentials file.

Source code in src/openjarvis/connectors/slack_connector.py
def disconnect(self) -> None:
    """Delete the stored credentials file."""
    delete_tokens(self._credentials_path)
auth_url
auth_url() -> str

Return a Slack OAuth consent URL requesting channel history scopes.

Source code in src/openjarvis/connectors/slack_connector.py
def auth_url(self) -> str:
    """Return a Slack OAuth consent URL requesting channel history scopes."""
    params = {
        "client_id": "",  # placeholder — real client_id from config
        "scope": _SLACK_SCOPES,
        "redirect_uri": "http://localhost:8789/callback",
    }
    return f"{_SLACK_AUTH_ENDPOINT}?{urlencode(params)}"
handle_callback
handle_callback(code: str) -> None

Handle the OAuth callback by persisting the authorization code.

In a full implementation this would exchange the code for tokens. For now the code is saved directly as the token value.

Source code in src/openjarvis/connectors/slack_connector.py
def handle_callback(self, code: str) -> None:
    """Handle the OAuth callback by persisting the authorization code.

    In a full implementation this would exchange the code for tokens.
    For now the code is saved directly as the token value.
    """
    save_tokens(self._credentials_path, {"token": code})
sync
sync(*, since: Optional[datetime] = None, cursor: Optional[str] = None) -> Iterator[Document]

Yield :class:Document objects for Slack channel messages.

Builds a user map, then paginates through channels and retrieves message history for each channel.

PARAMETER DESCRIPTION
since

Not yet used (reserved for incremental sync).

TYPE: Optional[datetime] DEFAULT: None

cursor

Not yet used (reserved for pagination resumption).

TYPE: Optional[str] DEFAULT: None

Source code in src/openjarvis/connectors/slack_connector.py
def sync(
    self,
    *,
    since: Optional[datetime] = None,  # noqa: ARG002 — reserved for future use
    cursor: Optional[str] = None,  # noqa: ARG002 — reserved for future use
) -> Iterator[Document]:
    """Yield :class:`Document` objects for Slack channel messages.

    Builds a user map, then paginates through channels and retrieves
    message history for each channel.

    Parameters
    ----------
    since:
        Not yet used (reserved for incremental sync).
    cursor:
        Not yet used (reserved for pagination resumption).
    """
    tokens = load_tokens(self._credentials_path)
    if not tokens:
        return

    token: str = tokens.get("token", tokens.get("access_token", ""))
    if not token:
        return

    # Step 1: build user map
    users_resp = _slack_api_users_list(token)
    members: List[Dict[str, Any]] = users_resp.get("members", [])
    user_map = _build_user_map(members)

    synced = 0
    channels_cursor = ""

    # Step 2: paginate through channels
    while True:
        channels_resp = _slack_api_conversations_list(token, cursor=channels_cursor)
        channels: List[Dict[str, Any]] = channels_resp.get("channels", [])

        for channel in channels:
            chan_id: str = channel.get("id", "")
            chan_name: str = channel.get("name", chan_id)
            is_member: bool = channel.get("is_member", False)
            is_private: bool = channel.get("is_private", False)
            if not chan_id:
                continue

            # Auto-join public channels; skip private channels the bot isn't in
            if not is_member:
                if is_private:
                    continue  # Can't join private channels without invite
                # Try to join the public channel
                try:
                    join_resp = _slack_api_with_retry(
                        "conversations.join", token, {"channel": chan_id},
                        http_method="POST",
                    )
                    if not join_resp.get("ok"):
                        continue
                except Exception:
                    continue

            # Step 3: paginate through message history
            history_cursor = ""
            while True:
                try:
                    history_resp = _slack_api_conversations_history(
                        token, chan_id, cursor=history_cursor
                    )
                except Exception:
                    break  # Skip channels we can't read
                if not history_resp.get("ok", True):
                    break  # not_in_channel or other error
                messages: List[Dict[str, Any]] = history_resp.get("messages", [])

                for msg in messages:
                    # Skip bot messages and non-content subtypes
                    if msg.get("bot_id") or msg.get("subtype") in (
                        "message_changed",
                        "message_deleted",
                        "bot_message",
                        "channel_join",
                        "channel_leave",
                    ):
                        continue

                    ts: str = msg.get("ts", "")
                    user_id: str = msg.get("user", "")
                    text: str = msg.get("text", "")
                    thread_ts: Optional[str] = msg.get("thread_ts")

                    user_info = user_map.get(user_id, {})
                    author = user_info.get("name", user_id)

                    timestamp = _ts_to_datetime(ts)
                    url = _slack_archive_url("", chan_id, ts)

                    doc = Document(
                        doc_id=f"slack:{chan_id}:{ts}",
                        source="slack",
                        doc_type="message",
                        content=text,
                        title=f"#{chan_name}",
                        author=author,
                        timestamp=timestamp,
                        thread_id=thread_ts,
                        url=url,
                        metadata={
                            "channel_id": chan_id,
                            "channel_name": chan_name,
                            "user_id": user_id,
                            "ts": ts,
                        },
                    )
                    synced += 1
                    yield doc

                next_history_cursor: str = (
                    history_resp.get("response_metadata", {}).get("next_cursor", "")
                    or ""
                )
                if not history_resp.get("has_more") or not next_history_cursor:
                    break
                history_cursor = next_history_cursor

        next_channels_cursor: str = (
            channels_resp.get("response_metadata", {}).get("next_cursor", "") or ""
        )
        if not next_channels_cursor:
            self._last_cursor = None
            break
        channels_cursor = next_channels_cursor
        self._last_cursor = channels_cursor

    self._items_synced = synced
    self._last_sync = datetime.now()
sync_status
sync_status() -> SyncStatus

Return sync progress from the most recent :meth:sync call.

Source code in src/openjarvis/connectors/slack_connector.py
def sync_status(self) -> SyncStatus:
    """Return sync progress from the most recent :meth:`sync` call."""
    return SyncStatus(
        state="idle",
        items_synced=self._items_synced,
        last_sync=self._last_sync,
        cursor=self._last_cursor,
    )
mcp_tools
mcp_tools() -> List[ToolSpec]

Expose three MCP tool specs for real-time Slack queries.

Source code in src/openjarvis/connectors/slack_connector.py
def mcp_tools(self) -> List[ToolSpec]:
    """Expose three MCP tool specs for real-time Slack queries."""
    return [
        ToolSpec(
            name="slack_search_messages",
            description=(
                "Search Slack messages using a query string. "
                "Returns matching messages across all accessible channels."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query string",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of messages to return",
                        "default": 20,
                    },
                },
                "required": ["query"],
            },
            category="communication",
        ),
        ToolSpec(
            name="slack_get_thread",
            description=(
                "Retrieve all messages in a Slack thread by channel ID "
                "and thread timestamp."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "channel_id": {
                        "type": "string",
                        "description": "Slack channel ID",
                    },
                    "thread_ts": {
                        "type": "string",
                        "description": (
                            "Thread timestamp (ts of the parent message)"
                        ),
                    },
                },
                "required": ["channel_id", "thread_ts"],
            },
            category="communication",
        ),
        ToolSpec(
            name="slack_list_channels",
            description=(
                "List accessible Slack channels, optionally filtered by type."
            ),
            parameters={
                "type": "object",
                "properties": {
                    "types": {
                        "type": "string",
                        "description": (
                            "Comma-separated channel types to include "
                            "(e.g. 'public_channel,private_channel')"
                        ),
                        "default": "public_channel,private_channel",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of channels to return",
                        "default": 100,
                    },
                },
                "required": [],
            },
            category="communication",
        ),
    ]

Functions