Skip to content

transport

transport

MCP transport implementations.

Classes

MCPTransport

Bases: ABC

Abstract transport layer for MCP communication.

Functions
send abstractmethod
send(request: MCPRequest) -> MCPResponse

Send a request and return the response.

Source code in src/openjarvis/mcp/transport.py
@abstractmethod
def send(self, request: MCPRequest) -> MCPResponse:
    """Send a request and return the response."""
send_notification
send_notification(request: MCPRequest) -> None

Send a JSON-RPC notification (no response expected).

The default implementation delegates to :meth:send and discards the response. Transports may override this when the server returns no body for notifications (e.g. HTTP 202 Accepted).

Source code in src/openjarvis/mcp/transport.py
def send_notification(self, request: MCPRequest) -> None:
    """Send a JSON-RPC notification (no response expected).

    The default implementation delegates to :meth:`send` and discards the
    response.  Transports may override this when the server returns no
    body for notifications (e.g. HTTP 202 Accepted).
    """
    self.send(request)
close abstractmethod
close() -> None

Release transport resources.

Source code in src/openjarvis/mcp/transport.py
@abstractmethod
def close(self) -> None:
    """Release transport resources."""

InProcessTransport

InProcessTransport(server: MCPServer)

Bases: MCPTransport

Direct in-process transport for testing.

Routes requests directly to an MCPServer instance without serialization overhead.

Source code in src/openjarvis/mcp/transport.py
def __init__(self, server: MCPServer) -> None:
    self._server = server
Functions
send
send(request: MCPRequest) -> MCPResponse

Dispatch request directly to the server.

Source code in src/openjarvis/mcp/transport.py
def send(self, request: MCPRequest) -> MCPResponse:
    """Dispatch request directly to the server."""
    return self._server.handle(request)
close
close() -> None

No resources to release.

Source code in src/openjarvis/mcp/transport.py
def close(self) -> None:
    """No resources to release."""

StdioTransport

StdioTransport(command: List[str])

Bases: MCPTransport

JSON-RPC over stdin/stdout subprocess transport.

Launches a subprocess and communicates via JSON lines on stdin/stdout.

Source code in src/openjarvis/mcp/transport.py
def __init__(self, command: List[str]) -> None:
    self._command = command
    self._process: Optional[subprocess.Popen[str]] = None
    self._start()
Functions
send
send(request: MCPRequest) -> MCPResponse

Write request as JSON line, read response line.

Source code in src/openjarvis/mcp/transport.py
def send(self, request: MCPRequest) -> MCPResponse:
    """Write request as JSON line, read response line."""
    proc = self._process
    if proc is None or proc.stdin is None or proc.stdout is None:
        raise RuntimeError("Transport process is not running")

    line = request.to_json() + "\n"
    proc.stdin.write(line)
    proc.stdin.flush()

    response_line = proc.stdout.readline()
    if not response_line:
        raise RuntimeError("No response from subprocess")
    return MCPResponse.from_json(response_line.strip())
close
close() -> None

Terminate the subprocess.

Source code in src/openjarvis/mcp/transport.py
def close(self) -> None:
    """Terminate the subprocess."""
    if self._process is not None:
        self._process.terminate()
        self._process.wait(timeout=5)
        self._process = None

StreamableHTTPTransport

StreamableHTTPTransport(url: str, *, connect_timeout: float = 10.0, request_timeout: float = 60.0)

Bases: MCPTransport

MCP Streamable HTTP transport (JSON-RPC over HTTP).

Uses a persistent httpx.Client session, tracks the Mcp-Session-Id header, and sends the Accept header required by the MCP Streamable HTTP specification.

Source code in src/openjarvis/mcp/transport.py
def __init__(
    self,
    url: str,
    *,
    connect_timeout: float = 10.0,
    request_timeout: float = 60.0,
) -> None:
    import httpx

    self._url = url
    self._session_id: Optional[str] = None
    self._client = httpx.Client(
        timeout=httpx.Timeout(
            connect=connect_timeout,
            read=request_timeout,
            write=request_timeout,
            pool=connect_timeout,
        ),
    )
Functions
send
send(request: MCPRequest) -> MCPResponse

Send request via HTTP POST following the MCP Streamable HTTP spec.

Handles both application/json and text/event-stream responses as allowed by the MCP Streamable HTTP specification.

Source code in src/openjarvis/mcp/transport.py
def send(self, request: MCPRequest) -> MCPResponse:
    """Send request via HTTP POST following the MCP Streamable HTTP spec.

    Handles both ``application/json`` and ``text/event-stream`` responses
    as allowed by the MCP Streamable HTTP specification.
    """
    response = self._post(request)
    content_type = response.headers.get("content-type", "")
    body = response.text
    if "text/event-stream" in content_type or body.lstrip().startswith("event:"):
        body = self._extract_json_from_sse(body)
    return MCPResponse.from_json(body)
send_notification
send_notification(request: MCPRequest) -> None

Send a notification — accept any 2xx, don't parse the body.

Source code in src/openjarvis/mcp/transport.py
def send_notification(self, request: MCPRequest) -> None:
    """Send a notification — accept any 2xx, don't parse the body."""
    # Track session id but don't try to parse a JSON-RPC response.
    # Servers may return 202 Accepted with an empty body.
    self._post(request)
close
close() -> None

Close the underlying httpx client.

Source code in src/openjarvis/mcp/transport.py
def close(self) -> None:
    """Close the underlying httpx client."""
    self._client.close()