Advanced Techniques

This guide covers advanced techniques for using Python A2A effectively.

Function Calling

Function calling allows agents to request specific actions from other systems. Here’s how to use it:

Creating Function Calls

from python_a2a import FunctionCallContent, Message, MessageRole, FunctionParameter

# Create a function call message
message = Message(
    content=FunctionCallContent(
        name="get_weather",
        parameters=[
            FunctionParameter(name="location", value="New York"),
            FunctionParameter(name="unit", value="fahrenheit")
        ]
    ),
    role=MessageRole.AGENT
)

Handling Function Calls

from python_a2a import A2AServer, FunctionResponseContent, Message, MessageRole

class WeatherAgent(A2AServer):
    def handle_message(self, message):
        # Check if this is a function call
        if message.content.type == "function_call":
            # Get function name
            function_name = message.content.name

            # Get parameters
            parameters = {p.name: p.value for p in message.content.parameters}

            # Handle based on function name
            if function_name == "get_weather":
                location = parameters.get("location", "")
                unit = parameters.get("unit", "celsius")

                # Mock weather data
                weather_data = {"temp": 72, "condition": "Sunny"}

                # Return function response
                return Message(
                    content=FunctionResponseContent(
                        name="get_weather",
                        response=weather_data
                    ),
                    role=MessageRole.AGENT,
                    parent_message_id=message.message_id,
                    conversation_id=message.conversation_id
                )

        # Default response for non-function calls
        return super().handle_message(message)

Streaming Responses

For long-running operations, you can stream responses to provide real-time feedback:

from python_a2a import A2AServer, TaskStatus, TaskState
from python_a2a.models import AgentCard
import time

class StreamingAgent(A2AServer):
    def __init__(self):
        # Create agent card with streaming capability
        agent_card = AgentCard(
            name="Streaming Agent",
            description="Agent with streaming capabilities",
            url="http://localhost:5000",
            version="1.0.0",
            capabilities={"streaming": True}
        )
        super().__init__(agent_card=agent_card)

    def handle_task(self, task):
        # Set task to waiting state
        task.status = TaskStatus(state=TaskState.WAITING)

        # Create initial artifact
        task.artifacts = [{
            "parts": [{"type": "text", "text": "Processing..."}]
        }]

        # In a real implementation, you would use server-sent events or websockets
        # This is a simplified example
        for i in range(5):
            # In a real implementation, this would be sent as an update
            task.artifacts = [{
                "parts": [{"type": "text", "text": f"Processing... {(i+1)*20}%"}]
            }]

            # Simulate processing time
            time.sleep(1)

        # Final response
        task.artifacts = [{
            "parts": [{"type": "text", "text": "Processing complete!"}]
        }]
        task.status = TaskStatus(state=TaskState.COMPLETED)

        return task

Authentication

You can add authentication to your A2A agents to protect them from unauthorized access:

from python_a2a import A2AServer, TaskStatus, TaskState
from python_a2a.models import AgentCard
from flask import request

class AuthenticatedAgent(A2AServer):
    def __init__(self):
        # Create agent card with authentication
        agent_card = AgentCard(
            name="Authenticated Agent",
            description="Agent with authentication",
            url="http://localhost:5000",
            version="1.0.0",
            authentication="bearer"
        )
        super().__init__(agent_card=agent_card)

        # API keys
        self.api_keys = {"MY_SECRET_KEY": "user1"}

    def setup_routes(self, app):
        # Add authentication middleware
        @app.before_request
        def authenticate():
            # Skip authentication for agent card
            if request.path in ["/", "/a2a", "/agent.json", "/a2a/agent.json"]:
                return None

            # Check for Authorization header
            auth_header = request.headers.get("Authorization")
            if not auth_header or not auth_header.startswith("Bearer "):
                return {"error": "Unauthorized"}, 401

            # Get token
            token = auth_header.split("Bearer ")[1]

            # Check if token is valid
            if token not in self.api_keys:
                return {"error": "Invalid API key"}, 401

            # Token is valid
            return None

        # Call parent setup_routes
        super().setup_routes(app)

    def handle_task(self, task):
        # Get token from request
        auth_header = request.headers.get("Authorization")
        token = auth_header.split("Bearer ")[1]

        # Get user from token
        user = self.api_keys.get(token)

        # Create response
        task.artifacts = [{
            "parts": [{"type": "text", "text": f"Hello, {user}! This is a protected resource."}]
        }]
        task.status = TaskStatus(state=TaskState.COMPLETED)

        return task

Advanced Error Handling

Proper error handling ensures robustness in your agents:

from python_a2a import A2AServer, TaskStatus, TaskState

class RobustAgent(A2AServer):
    def handle_task(self, task):
        try:
            # Extract message text
            message_data = task.message or {}
            content = message_data.get("content", {})
            text = content.get("text", "") if isinstance(content, dict) else ""

            # Process the message
            # This might raise exceptions
            result = self.process_message(text)

            # Create response artifact
            task.artifacts = [{
                "parts": [{"type": "text", "text": result}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)

        except ValueError as e:
            # Handle validation errors
            task.artifacts = [{
                "parts": [{"type": "text", "text": f"Validation error: {str(e)}"}]
            }]
            task.status = TaskStatus(state=TaskState.INPUT_REQUIRED)

        except ConnectionError as e:
            # Handle connection errors
            task.artifacts = [{
                "parts": [{"type": "text", "text": f"Service unavailable: {str(e)}"}]
            }]
            task.status = TaskStatus(state=TaskState.FAILED)

        except Exception as e:
            # Handle unexpected errors
            import traceback
            task.artifacts = [{
                "parts": [{"type": "text", "text": f"An unexpected error occurred: {str(e)}"}]
            }]
            task.status = TaskStatus(state=TaskState.FAILED)

            # Log the error
            print(f"Error: {str(e)}")
            print(traceback.format_exc())

        return task

    def process_message(self, text):
        # This is a placeholder for your actual processing logic
        if not text:
            raise ValueError("Empty message")

        if "error" in text.lower():
            raise Exception("Simulated error")

        return f"Processed: {text}"

Custom Content Types

You can extend the A2A protocol with custom content types:

from python_a2a import A2AServer, Message, MessageRole, BaseModel
from dataclasses import dataclass
from typing import Dict, Any, List

# Define a custom content type
@dataclass
class ChartContent(BaseModel):
    """Chart content type"""
    type: str = "chart"
    title: str = ""
    labels: List[str] = None
    data: List[float] = None
    chart_type: str = "bar"  # bar, line, pie, etc.

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary representation"""
        return {
            "type": self.type,
            "title": self.title,
            "labels": self.labels,
            "data": self.data,
            "chart_type": self.chart_type
        }

# Create an agent that uses the custom content type
class ChartAgent(A2AServer):
    def handle_message(self, message):
        # Generate a chart
        chart_content = ChartContent(
            title="Sample Chart",
            labels=["A", "B", "C", "D"],
            data=[10, 20, 15, 25],
            chart_type="bar"
        )

        # Return the chart
        return Message(
            content=chart_content,
            role=MessageRole.AGENT,
            parent_message_id=message.message_id,
            conversation_id=message.conversation_id
        )

Testing A2A Agents

Here’s how to write unit tests for A2A agents:

import unittest
from python_a2a import Message, TextContent, MessageRole
from your_project import YourAgent

class TestYourAgent(unittest.TestCase):
    def setUp(self):
        # Create the agent
        self.agent = YourAgent()

    def test_greeting(self):
        # Create a greeting message
        message = Message(
            content=TextContent(text="Hello"),
            role=MessageRole.USER
        )

        # Get the response
        response = self.agent.handle_message(message)

        # Check the response
        self.assertEqual(response.role, MessageRole.AGENT)
        self.assertEqual(response.content.type, "text")
        self.assertIn("hello", response.content.text.lower())

    def test_task_handling(self):
        # Create a task
        from python_a2a import Task

        task = Task(
            message={
                "content": {
                    "type": "text",
                    "text": "Hello"
                },
                "role": "user"
            }
        )

        # Get the response
        response = self.agent.handle_task(task)

        # Check the response
        self.assertEqual(response.status.state, "completed")
        self.assertTrue(response.artifacts)
        self.assertEqual(response.artifacts[0]["parts"][0]["type"], "text")
        self.assertIn("hello", response.artifacts[0]["parts"][0]["text"].lower())

if __name__ == "__main__":
    unittest.main()

Next Steps

Now that you’ve learned advanced techniques, you can:

  • Build more sophisticated agents with robust error handling

  • Add authentication to protect your agents

  • Create custom content types for specialized applications

  • Write tests to ensure your agents work correctly

Check out the Advanced Examples for complete examples of these techniques.