Skip to main content

Request/Response Structure

3. Request/Response Structure

The A2A protocol uses JSON-RPC 2.0 as the payload format for its requests and responses. JSON-RPC is a remote procedure call protocol encoded in JSON. It allows for calling methods on a remote server and getting responses. It's commonly used for web services and APIs to facilitate communication between a client and a server. Your agent should expose two main endpoints:

  1. GET /.well-known/agent.json The well-known endpoint which returns the information about the agent on the agent card

  2. POST /
    JSON-RPC simplifies API design by using a single endpoint for all method calls. The client requests all A2A RPC methods by sending an HTTP POST request to the A2A Server's base url (as specified in its AgentCard). The body of the HTTP POST request MUST be a JSON-RPC request object, and the Content-Type header MUST be application/json. Your agent's POST / endpoint will use the method field in the request to route to the appropriate function.

Sample JSON-RPC Request Object

{
"jsonrpc": "2.0",
"method": "message/send",
"id": 3,
"params": {
"message": {
"role": "user",
"parts": [
{
"type": "text",
"text": "ping"
}
]
}
}
}

Sample JSON-RPC Success Response

  {
"jsonrpc": "2.0",
"id": 3,
"result": {
"role": "agent",
"parts": [
{
"type": "text",
"text": "pong"
}
],
"kind": "message",
"message_id": "fbuvdhb4ke6vq8hbva9qh9ha"
}
}

Sample JSON-RPC Error Response

{
"jsonrpc": "2.0",
"id": 3,
"error": {
"code": -32600,
"message": "Invalid Request"
}
}

Code Example:

@app.post("/")
async def handle_rpc_request(request: Request):
body = await request.json()

method = body.get('method')

if method == "message/send":
#route to function handling that method
result = await handle_message_send(request)

elif method == "task/subscribe":
result = await handle_task_subscription(request)

else:
#throw return an error if method doesnt exist
error = {
"jsonrpc": "2.0",
"id": body.get("id", None),
"error": {
"code": -32601,
"message": "Method not found"
}
}

return error

return result

async def handle_message_send(request):
#Process the incoming message
return

async def handle_task_subscription(request):
#Handle subscription to task updates
return