Skip to content

Index

a2a

Agent-to-Agent protocol — Google A2A spec implementation.

Classes

A2AClient

A2AClient(base_url: str, *, timeout: float = 30.0)

Client for calling external A2A-compatible agents.

Discovers agent capabilities via /.well-known/agent.json and sends tasks via /a2a/tasks.

Source code in src/openjarvis/a2a/client.py
def __init__(self, base_url: str, *, timeout: float = 30.0) -> None:
    self._base_url = base_url.rstrip("/")
    self._timeout = timeout
    self._card: Optional[AgentCard] = None
Functions
discover
discover() -> AgentCard

Fetch the agent card from /.well-known/agent.json.

Source code in src/openjarvis/a2a/client.py
def discover(self) -> AgentCard:
    """Fetch the agent card from /.well-known/agent.json."""
    import httpx

    resp = httpx.get(
        f"{self._base_url}/.well-known/agent.json",
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    self._card = AgentCard(
        name=data.get("name", ""),
        description=data.get("description", ""),
        url=data.get("url", self._base_url),
        version=data.get("version", ""),
        capabilities=data.get("capabilities", []),
        skills=data.get("skills", []),
    )
    return self._card
send_task
send_task(input_text: str, **kwargs: Any) -> A2ATask

Send a task to the remote agent and return the result.

Source code in src/openjarvis/a2a/client.py
def send_task(self, input_text: str, **kwargs: Any) -> A2ATask:
    """Send a task to the remote agent and return the result."""
    import httpx

    request = A2ARequest(
        method="tasks/send",
        params={
            "message": {
                "role": "user",
                "parts": [{"text": input_text}],
            },
        },
    )
    resp = httpx.post(
        f"{self._base_url}/a2a/tasks",
        json=request.to_dict(),
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    result = data.get("result", {})
    return A2ATask(
        task_id=result.get("id", ""),
        state=result.get("state", "unknown"),
        input_text=result.get("input", input_text),
        output_text=result.get("output", ""),
        history=result.get("history", []),
    )
get_task
get_task(task_id: str) -> A2ATask

Get the status of a previously submitted task.

Source code in src/openjarvis/a2a/client.py
def get_task(self, task_id: str) -> A2ATask:
    """Get the status of a previously submitted task."""
    import httpx

    request = A2ARequest(
        method="tasks/get",
        params={"id": task_id},
    )
    resp = httpx.post(
        f"{self._base_url}/a2a/tasks",
        json=request.to_dict(),
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    result = data.get("result", {})
    return A2ATask(
        task_id=result.get("id", task_id),
        state=result.get("state", "unknown"),
        output_text=result.get("output", ""),
    )
cancel_task
cancel_task(task_id: str) -> A2ATask

Cancel a running task.

Source code in src/openjarvis/a2a/client.py
def cancel_task(self, task_id: str) -> A2ATask:
    """Cancel a running task."""
    import httpx

    request = A2ARequest(
        method="tasks/cancel",
        params={"id": task_id},
    )
    resp = httpx.post(
        f"{self._base_url}/a2a/tasks",
        json=request.to_dict(),
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    result = data.get("result", {})
    return A2ATask(
        task_id=result.get("id", task_id),
        state=result.get("state", "canceled"),
    )

A2ARequest dataclass

A2ARequest(method: str, params: Dict[str, Any] = dict(), request_id: str = (lambda: hex[:8])())

JSON-RPC 2.0 request for A2A.

A2AResponse dataclass

A2AResponse(result: Any = None, error: Optional[Dict[str, Any]] = None, request_id: str = '')

JSON-RPC 2.0 response for A2A.

A2ATask dataclass

A2ATask(task_id: str = (lambda: hex[:16])(), state: TaskState = SUBMITTED, input_text: str = '', output_text: str = '', history: List[Dict[str, str]] = list(), metadata: Dict[str, Any] = dict())

An A2A task with state machine.

AgentCard dataclass

AgentCard(name: str, description: str = '', url: str = '', version: str = '0.1.0', capabilities: List[str] = list(), skills: List[str] = list(), authentication: Dict[str, Any] = dict())

Agent discovery card served at /.well-known/agent.json.

A2AServer

A2AServer(agent_card: AgentCard, *, handler: Optional[Callable[[str], str]] = None, bus: Optional[EventBus] = None, auth_token: Optional[str] = None)

A2A server that processes incoming tasks via agent execution.

Can be mounted as routes in the FastAPI server.

When auth_token is set, every :meth:handle_request call must present a matching bearer token or it is rejected before any agent runs. The token is advertised on the agent card's authentication field. When unset, the server is unauthenticated — only mount it on a trusted network.

Source code in src/openjarvis/a2a/server.py
def __init__(
    self,
    agent_card: AgentCard,
    *,
    handler: Optional[Callable[[str], str]] = None,
    bus: Optional[EventBus] = None,
    auth_token: Optional[str] = None,
) -> None:
    self._card = agent_card
    self._handler = handler
    self._bus = bus
    self._auth_token = auth_token or None
    self._tasks: Dict[str, A2ATask] = {}
    if self._auth_token:
        # Advertise the required scheme on the discovery card.
        self._card.authentication = {"schemes": ["bearer"]}
Functions
authenticate
authenticate(token: Optional[str]) -> bool

Constant-time check of a presented bearer token.

Returns True when no auth_token is configured (auth disabled).

Source code in src/openjarvis/a2a/server.py
def authenticate(self, token: Optional[str]) -> bool:
    """Constant-time check of a presented bearer *token*.

    Returns ``True`` when no ``auth_token`` is configured (auth disabled).
    """
    if not self._auth_token:
        return True
    return bool(token) and secrets.compare_digest(token, self._auth_token)
handle_request
handle_request(request_data: Dict[str, Any], *, token: Optional[str] = None) -> Dict[str, Any]

Process a JSON-RPC 2.0 A2A request.

token is the bearer credential extracted by the transport (e.g. the HTTP Authorization header). It is validated before dispatch when the server is configured with an auth_token.

Source code in src/openjarvis/a2a/server.py
def handle_request(
    self,
    request_data: Dict[str, Any],
    *,
    token: Optional[str] = None,
) -> Dict[str, Any]:
    """Process a JSON-RPC 2.0 A2A request.

    *token* is the bearer credential extracted by the transport (e.g. the
    HTTP ``Authorization`` header). It is validated before dispatch when
    the server is configured with an ``auth_token``.
    """
    req_id = request_data.get("id", "")
    if not self.authenticate(token):
        return A2AResponse(
            error={"code": -32001, "message": "Unauthorized"},
            request_id=req_id,
        ).to_dict()

    method = request_data.get("method", "")
    params = request_data.get("params", {})

    if method == "tasks/send":
        return self._handle_task_send(params, req_id)
    elif method == "tasks/get":
        return self._handle_task_get(params, req_id)
    elif method == "tasks/cancel":
        return self._handle_task_cancel(params, req_id)
    else:
        return A2AResponse(
            error={"code": -32601, "message": f"Method not found: {method}"},
            request_id=req_id,
        ).to_dict()
get_routes
get_routes() -> List[Dict[str, Any]]

Return route definitions for mounting in a web framework.

Source code in src/openjarvis/a2a/server.py
def get_routes(self) -> List[Dict[str, Any]]:
    """Return route definitions for mounting in a web framework."""
    return [
        {
            "path": "/.well-known/agent.json",
            "method": "GET",
            "handler": "agent_card",
        },
        {"path": "/a2a/tasks", "method": "POST", "handler": "handle_request"},
    ]

A2AAgentTool

A2AAgentTool(client: A2AClient, *, name: str = '')

Bases: BaseTool

Wraps an external A2A agent as a BaseTool.

Follows the MCPToolAdapter pattern for external tool integration.

Source code in src/openjarvis/a2a/tool.py
def __init__(self, client: A2AClient, *, name: str = "") -> None:
    self._client = client
    self._name = name or "a2a_agent"
    self.tool_id = self._name
    # Try to discover agent info
    try:
        card = client.discover()
        if not name:
            self._name = f"a2a_{card.name.lower().replace(' ', '_')}"
            self.tool_id = self._name
        self._description = card.description or f"External A2A agent: {card.name}"
    except Exception as exc:
        logger.debug("Failed to fetch A2A agent description: %s", exc)
        self._description = "External A2A agent"