Skip to content

Index

a2a

Agent-to-Agent protocol — Google A2A spec implementation.

Classes

A2AClient

A2AClient(base_url: str, *, timeout: float = 30.0)

Client for calling external A2A-compatible agents.

Discovers agent capabilities via /.well-known/agent.json and sends tasks via /a2a/tasks.

Source code in src/openjarvis/a2a/client.py
def __init__(self, base_url: str, *, timeout: float = 30.0) -> None:
    self._base_url = base_url.rstrip("/")
    self._timeout = timeout
    self._card: Optional[AgentCard] = None
Functions
discover
discover() -> AgentCard

Fetch the agent card from /.well-known/agent.json.

Source code in src/openjarvis/a2a/client.py
def discover(self) -> AgentCard:
    """Fetch the agent card from /.well-known/agent.json."""
    import httpx
    resp = httpx.get(
        f"{self._base_url}/.well-known/agent.json",
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    self._card = AgentCard(
        name=data.get("name", ""),
        description=data.get("description", ""),
        url=data.get("url", self._base_url),
        version=data.get("version", ""),
        capabilities=data.get("capabilities", []),
        skills=data.get("skills", []),
    )
    return self._card
send_task
send_task(input_text: str, **kwargs: Any) -> A2ATask

Send a task to the remote agent and return the result.

Source code in src/openjarvis/a2a/client.py
def send_task(self, input_text: str, **kwargs: Any) -> A2ATask:
    """Send a task to the remote agent and return the result."""
    import httpx
    request = A2ARequest(
        method="tasks/send",
        params={
            "message": {
                "role": "user",
                "parts": [{"text": input_text}],
            },
        },
    )
    resp = httpx.post(
        f"{self._base_url}/a2a/tasks",
        json=request.to_dict(),
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    result = data.get("result", {})
    return A2ATask(
        task_id=result.get("id", ""),
        state=result.get("state", "unknown"),
        input_text=result.get("input", input_text),
        output_text=result.get("output", ""),
        history=result.get("history", []),
    )
get_task
get_task(task_id: str) -> A2ATask

Get the status of a previously submitted task.

Source code in src/openjarvis/a2a/client.py
def get_task(self, task_id: str) -> A2ATask:
    """Get the status of a previously submitted task."""
    import httpx
    request = A2ARequest(
        method="tasks/get",
        params={"id": task_id},
    )
    resp = httpx.post(
        f"{self._base_url}/a2a/tasks",
        json=request.to_dict(),
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    result = data.get("result", {})
    return A2ATask(
        task_id=result.get("id", task_id),
        state=result.get("state", "unknown"),
        output_text=result.get("output", ""),
    )
cancel_task
cancel_task(task_id: str) -> A2ATask

Cancel a running task.

Source code in src/openjarvis/a2a/client.py
def cancel_task(self, task_id: str) -> A2ATask:
    """Cancel a running task."""
    import httpx
    request = A2ARequest(
        method="tasks/cancel",
        params={"id": task_id},
    )
    resp = httpx.post(
        f"{self._base_url}/a2a/tasks",
        json=request.to_dict(),
        timeout=self._timeout,
    )
    resp.raise_for_status()
    data = resp.json()
    result = data.get("result", {})
    return A2ATask(
        task_id=result.get("id", task_id),
        state=result.get("state", "canceled"),
    )

A2ARequest dataclass

A2ARequest(method: str, params: Dict[str, Any] = dict(), request_id: str = (lambda: hex[:8])())

JSON-RPC 2.0 request for A2A.

A2AResponse dataclass

A2AResponse(result: Any = None, error: Optional[Dict[str, Any]] = None, request_id: str = '')

JSON-RPC 2.0 response for A2A.

A2ATask dataclass

A2ATask(task_id: str = (lambda: hex[:16])(), state: TaskState = SUBMITTED, input_text: str = '', output_text: str = '', history: List[Dict[str, str]] = list(), metadata: Dict[str, Any] = dict())

An A2A task with state machine.

AgentCard dataclass

AgentCard(name: str, description: str = '', url: str = '', version: str = '0.1.0', capabilities: List[str] = list(), skills: List[str] = list(), authentication: Dict[str, Any] = dict())

Agent discovery card served at /.well-known/agent.json.

A2AServer

A2AServer(agent_card: AgentCard, *, handler: Optional[Callable[[str], str]] = None, bus: Optional[EventBus] = None)

A2A server that processes incoming tasks via agent execution.

Can be mounted as routes in the FastAPI server.

Source code in src/openjarvis/a2a/server.py
def __init__(
    self,
    agent_card: AgentCard,
    *,
    handler: Optional[Callable[[str], str]] = None,
    bus: Optional[EventBus] = None,
) -> None:
    self._card = agent_card
    self._handler = handler
    self._bus = bus
    self._tasks: Dict[str, A2ATask] = {}
Functions
handle_request
handle_request(request_data: Dict[str, Any]) -> Dict[str, Any]

Process a JSON-RPC 2.0 A2A request.

Source code in src/openjarvis/a2a/server.py
def handle_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
    """Process a JSON-RPC 2.0 A2A request."""
    method = request_data.get("method", "")
    params = request_data.get("params", {})
    req_id = request_data.get("id", "")

    if method == "tasks/send":
        return self._handle_task_send(params, req_id)
    elif method == "tasks/get":
        return self._handle_task_get(params, req_id)
    elif method == "tasks/cancel":
        return self._handle_task_cancel(params, req_id)
    else:
        return A2AResponse(
            error={"code": -32601, "message": f"Method not found: {method}"},
            request_id=req_id,
        ).to_dict()
get_routes
get_routes() -> List[Dict[str, Any]]

Return route definitions for mounting in a web framework.

Source code in src/openjarvis/a2a/server.py
def get_routes(self) -> List[Dict[str, Any]]:
    """Return route definitions for mounting in a web framework."""
    return [
        {
            "path": "/.well-known/agent.json",
            "method": "GET",
            "handler": "agent_card",
        },
        {"path": "/a2a/tasks", "method": "POST", "handler": "handle_request"},
    ]

A2AAgentTool

A2AAgentTool(client: A2AClient, *, name: str = '')

Bases: BaseTool

Wraps an external A2A agent as a BaseTool.

Follows the MCPToolAdapter pattern for external tool integration.

Source code in src/openjarvis/a2a/tool.py
def __init__(self, client: A2AClient, *, name: str = "") -> None:
    self._client = client
    self._name = name or "a2a_agent"
    self.tool_id = self._name
    # Try to discover agent info
    try:
        card = client.discover()
        if not name:
            self._name = f"a2a_{card.name.lower().replace(' ', '_')}"
            self.tool_id = self._name
        self._description = card.description or f"External A2A agent: {card.name}"
    except Exception as exc:
        logger.debug("Failed to fetch A2A agent description: %s", exc)
        self._description = "External A2A agent"