I have written a sample script using python-a2a package and hitting the following error. I am not sure if the issue is with the specific version of the package.
A2ACalcClient initialized for URL: http://127.0.0.1:5001/a2a
Sending calculation request: add(a=5, b=3)
Unexpected response type from A2A agent: error
Full response content: ErrorContent(message='Failed to communicate with agent at http://127.0.0.1:5001/a2a/a2a. Tried multiple endpoint variations.', type=<ContentType.ERROR: 'error'>)
A2ACalcClient initialized for URL: http://127.0.0.1:5001
Sending calculation request: add(a=5, b=3)
Unexpected response type from A2A agent: error
Full response content: ErrorContent(message='Failed to communicate with agent at http://127.0.0.1:5001/tasks/send. Tried multiple endpoint variations.', type=<ContentType.ERROR: 'error'>)
Server code:
import math
from python_a2a import (
A2AServer, Message, TextContent, FunctionCallContent,
FunctionResponseContent, FunctionParameter, MessageRole, run_server,
Task
)
class CalculatorAgent(A2AServer):
def handle_message(self, message):
if message.content.type == "text":
return Message(
content=TextContent(
text="I'm a [Python A2A](python-a2a.html) calculator agent. You can call my functions:\n"
"- calculate: Basic arithmetic (operation, a, b)\n"
"- sqrt: Square root (value)"
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
elif message.content.type == "function_call":
function_name = message.content.name
params = {p.name: p.value for p in message.content.parameters}
try:
if function_name == "calculate":
operation = params.get("operation", "add")
a = float(params.get("a", 0))
b = float(params.get("b", 0))
if operation == "add":
result = a + b
elif operation == "subtract":
result = a - b
elif operation == "multiply":
result = a * b
elif operation == "divide":
if b == 0:
raise ValueError("Cannot divide by zero")
result = a / b
else:
raise ValueError(f"Unknown operation: {operation}")
return Message(
content=FunctionResponseContent(
name="calculate",
response={"result": result}
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
elif function_name == "sqrt":
value = float(params.get("value", 0))
if value < 0:
raise ValueError("Cannot calculate square root of negative number")
result = math.sqrt(value)
return Message(
content=FunctionResponseContent(
name="sqrt",
response={"result": result}
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
except Exception as e:
return Message(
content=FunctionResponseContent(
name=function_name,
response={"error": str(e)}
),
role=MessageRole.AGENT,
parent_message_id=message.message_id,
conversation_id=message.conversation_id
)
if __name__ == "__main__":
agent = CalculatorAgent()
run_server(agent, host="0.0.0.0", port=5001)
Client code:
import python_a2a
from python_a2a import (
A2AClient, Message, FunctionCallContent,
FunctionParameter, MessageRole
)
# Card, Message, Task, SendMessageSuccessResponse
#
class A2ACalcClient:
"""
A class to encapsulate the communication with a Python A2A agent.
"""
def __init__(self, a2a_url: str):
"""
Initializes the A2ACalcClient with the A2A agent's URL.
Args:
a2a_url: The URL of the Python A2A agent (e.g., "http://127.0.0.1:5001").
"""
self.client = A2AClient(a2a_url)
print(f"A2ACalcClient initialized for URL: {a2a_url}")
def send_calculation_request(self, operation: str, a: int, b: int) -> int | None:
"""
Sends a function call message to the A2A agent to perform a calculation.
Args:
operation: The mathematical operation (e.g., "add", "subtract", "multiply", "divide").
a: The first operand.
b: The second operand.
Returns:
The result of the calculation if successful, otherwise None.
"""
print(f"Sending calculation request: {operation}(a={a}, b={b})")
function_call_content = FunctionCallContent(
name="calculate",
parameters=[
FunctionParameter(name="operation", value=operation),
FunctionParameter(name="a", value=a),
FunctionParameter(name="b", value=b)
]
)
function_call_message = Message(
content=function_call_content,
role=MessageRole.USER
)
try:
response = self.client.send_message(function_call_message)
if response.content.type == "function_response":
result = response.content.response.get("result")
if result is not None:
print(f"Received result from A2A agent: {result}")
return result
else:
print("Function response received, but 'result' key is missing.")
return None
else:
print(f"Unexpected response type from A2A agent: {response.content.type}")
print(f"Full response content: {response.content}")
return None
except Exception as e:
print(f"An error occurred while communicating with the A2A agent: {e}")
return None
# --- Example Usage ---
if __name__ == "__main__":
# Instantiate the calcAgentClient
a2a_url = "http://127.0.0.1:5001"
# a2a_url = "http://127.0.0.1:5001/a2a" # Also tried with this URL
calcAgentClient = A2ACalcClient(a2a_url)
# Perform an addition
sum_result = calcAgentClient.send_calculation_request(operation="add", a=5, b=3)
if sum_result is not None:
print(f"Addition Result: {sum_result}") # Expected: 8
print("-" * 30)
I ran this client and server code locally and got the following response
A2ACalcClient initialized for URL: http://127.0.0.1:5001
Sending calculation request: add(a=5, b=3)
Received result from A2A agent: 8.0
Addition Result: 8.0
------------------------------
So, I don't think this is an issue with your client or server code. I think this is a network error on whichever machine is behind 10.188.106.32.
Are you sure this is the right URL you are referencing, and traffic is allowed to this address on the correct ports? Can you check if the a2a agent is running at all in your browser at this address, and check your firewall/ingress rules on this machine? It's very probable this is a network error if you're running this on a different machine and you need to allow inbound/outbound traffic on port 5001.