A2AServer(agent_card: AgentCard, *, handler: Optional[Callable[[str], str]] = None, bus: Optional[EventBus] = None, auth_token: Optional[str] = None)
A2A server that processes incoming tasks via agent execution.
Can be mounted as routes in the FastAPI server.
When auth_token is set, every :meth:handle_request call must present a
matching bearer token or it is rejected before any agent runs. The token
is advertised on the agent card's authentication field. When unset,
the server is unauthenticated — only mount it on a trusted network.
Source code in src/openjarvis/a2a/server.py
| def __init__(
self,
agent_card: AgentCard,
*,
handler: Optional[Callable[[str], str]] = None,
bus: Optional[EventBus] = None,
auth_token: Optional[str] = None,
) -> None:
self._card = agent_card
self._handler = handler
self._bus = bus
self._auth_token = auth_token or None
self._tasks: Dict[str, A2ATask] = {}
if self._auth_token:
# Advertise the required scheme on the discovery card.
self._card.authentication = {"schemes": ["bearer"]}
|
Functions
authenticate
authenticate(token: Optional[str]) -> bool
Constant-time check of a presented bearer token.
Returns True when no auth_token is configured (auth disabled).
Source code in src/openjarvis/a2a/server.py
| def authenticate(self, token: Optional[str]) -> bool:
"""Constant-time check of a presented bearer *token*.
Returns ``True`` when no ``auth_token`` is configured (auth disabled).
"""
if not self._auth_token:
return True
return bool(token) and secrets.compare_digest(token, self._auth_token)
|
handle_request
handle_request(request_data: Dict[str, Any], *, token: Optional[str] = None) -> Dict[str, Any]
Process a JSON-RPC 2.0 A2A request.
token is the bearer credential extracted by the transport (e.g. the
HTTP Authorization header). It is validated before dispatch when
the server is configured with an auth_token.
Source code in src/openjarvis/a2a/server.py
| def handle_request(
self,
request_data: Dict[str, Any],
*,
token: Optional[str] = None,
) -> Dict[str, Any]:
"""Process a JSON-RPC 2.0 A2A request.
*token* is the bearer credential extracted by the transport (e.g. the
HTTP ``Authorization`` header). It is validated before dispatch when
the server is configured with an ``auth_token``.
"""
req_id = request_data.get("id", "")
if not self.authenticate(token):
return A2AResponse(
error={"code": -32001, "message": "Unauthorized"},
request_id=req_id,
).to_dict()
method = request_data.get("method", "")
params = request_data.get("params", {})
if method == "tasks/send":
return self._handle_task_send(params, req_id)
elif method == "tasks/get":
return self._handle_task_get(params, req_id)
elif method == "tasks/cancel":
return self._handle_task_cancel(params, req_id)
else:
return A2AResponse(
error={"code": -32601, "message": f"Method not found: {method}"},
request_id=req_id,
).to_dict()
|
get_routes
get_routes() -> List[Dict[str, Any]]
Return route definitions for mounting in a web framework.
Source code in src/openjarvis/a2a/server.py
| def get_routes(self) -> List[Dict[str, Any]]:
"""Return route definitions for mounting in a web framework."""
return [
{
"path": "/.well-known/agent.json",
"method": "GET",
"handler": "agent_card",
},
{"path": "/a2a/tasks", "method": "POST", "handler": "handle_request"},
]
|