Advanced Examples

This page provides more complex examples of using Python A2A.

Multi-Agent Weather Information System

This example demonstrates a multi-agent system where different agents collaborate to provide weather-related information and travel recommendations:

  1. Weather Data Agent - Provides weather information

  2. Travel Recommendation Agent - Recommends destinations based on weather

  3. User Interface Agent - Orchestrates the other agents

First, create the Weather Data Agent:

# weather_agent.py
from python_a2a import A2AServer, skill, agent, run_server
from python_a2a import TaskStatus, TaskState
import random

@agent(
    name="Weather API",
    description="Provides weather information for locations",
    version="1.0.0"
)
class WeatherAgent(A2AServer):

    def __init__(self):
        super().__init__()
        # Mock weather data
        self.weather_data = {
            "new york": {"temp": 72, "condition": "Sunny", "humidity": 45},
            "london": {"temp": 60, "condition": "Cloudy", "humidity": 80},
            "tokyo": {"temp": 75, "condition": "Rainy", "humidity": 85},
            "paris": {"temp": 68, "condition": "Partly Cloudy", "humidity": 60},
            "sydney": {"temp": 82, "condition": "Sunny", "humidity": 50},
            "cairo": {"temp": 95, "condition": "Sunny", "humidity": 30},
            "rio": {"temp": 88, "condition": "Sunny", "humidity": 70},
            "bangkok": {"temp": 90, "condition": "Thunderstorms", "humidity": 95},
            "moscow": {"temp": 45, "condition": "Snowy", "humidity": 70},
            "dubai": {"temp": 105, "condition": "Sunny", "humidity": 20}
        }

    @skill(
        name="Get Weather",
        description="Get current weather for a location",
        tags=["weather", "current"]
    )
    def get_weather(self, location):
        """
        Get the current weather for a location.

        Args:
            location: The location to get weather for

        Returns:
            Weather information
        """
        location = location.lower()
        if location in self.weather_data:
            return self.weather_data[location]

        # Generate random weather for unknown locations
        return {
            "temp": random.randint(50, 90),
            "condition": random.choice(["Sunny", "Cloudy", "Rainy", "Partly Cloudy"]),
            "humidity": random.randint(30, 90)
        }

    @skill(
        name="Get Forecast",
        description="Get 5-day forecast for a location",
        tags=["weather", "forecast"]
    )
    def get_forecast(self, location):
        """
        Get a 5-day forecast for a location.

        Args:
            location: The location to get forecast for

        Returns:
            5-day forecast
        """
        base_weather = self.get_weather(location)

        # Generate a simple 5-day forecast
        forecast = []
        for day in range(1, 6):
            temp_change = random.randint(-10, 10)
            forecast.append({
                "day": day,
                "temp": base_weather["temp"] + temp_change,
                "condition": random.choice(["Sunny", "Cloudy", "Rainy", "Partly Cloudy"]),
                "humidity": base_weather["humidity"] + random.randint(-20, 20)
            })

        return forecast

    def handle_task(self, task):
        # Extract message text
        message_data = task.message or {}
        content = message_data.get("content", {})
        text = content.get("text", "") if isinstance(content, dict) else ""

        # Initialize response
        response_text = "I can provide weather information. Try asking for the weather or forecast in a specific location."

        # Check for location in the query
        location = None
        if "in" in text.lower():
            location = text.lower().split("in", 1)[1].strip().rstrip("?.")

        # Check for forecast vs current weather
        if location:
            if "forecast" in text.lower():
                forecast = self.get_forecast(location)
                response_text = f"5-day forecast for {location.title()}:\n"
                for day in forecast:
                    response_text += f"Day {day['day']}: {day['temp']}°F, {day['condition']}, {day['humidity']}% humidity\n"
            else:
                weather = self.get_weather(location)
                response_text = f"Current weather in {location.title()}: {weather['temp']}°F, {weather['condition']}, {weather['humidity']}% humidity"

        # Create response artifact
        task.artifacts = [{
            "parts": [{"type": "text", "text": response_text}]
        }]
        task.status = TaskStatus(state=TaskState.COMPLETED)

        return task

# Run the server
if __name__ == "__main__":
    agent = WeatherAgent()
    run_server(agent, port=5001)

Next, create the Travel Recommendation Agent:

# travel_agent.py
from python_a2a import A2AServer, skill, agent, run_server, A2AClient
from python_a2a import TaskStatus, TaskState
import json

@agent(
    name="Travel Advisor",
    description="Provides travel recommendations based on weather",
    version="1.0.0"
)
class TravelAgent(A2AServer):

    def __init__(self):
        super().__init__()
        # Connect to the weather agent
        self.weather_client = A2AClient("http://localhost:5001")

        # Destination information
        self.destinations = {
            "new york": {"activities": ["Central Park", "Museums", "Broadway Shows"]},
            "london": {"activities": ["Big Ben", "Museums", "Thames River Cruise"]},
            "tokyo": {"activities": ["Temples", "Shopping", "Cherry Blossoms"]},
            "paris": {"activities": ["Eiffel Tower", "Louvre", "Cafes"]},
            "sydney": {"activities": ["Opera House", "Beaches", "Harbour Bridge"]},
            "cairo": {"activities": ["Pyramids", "Nile Cruise", "Markets"]},
            "rio": {"activities": ["Beaches", "Christ the Redeemer", "Samba"]},
            "bangkok": {"activities": ["Temples", "Street Food", "Markets"]},
            "moscow": {"activities": ["Red Square", "Museums", "Ballet"]},
            "dubai": {"activities": ["Shopping", "Desert Safari", "Burj Khalifa"]}
        }

    @skill(
        name="Recommend Destination",
        description="Recommend a destination based on weather preferences",
        tags=["travel", "recommendation"]
    )
    def recommend_destination(self, weather_pref, activity_pref=None):
        """
        Recommend a destination based on weather and activity preferences.

        Args:
            weather_pref: Weather preference (warm, cool, etc.)
            activity_pref: Optional activity preference

        Returns:
            Destination recommendation
        """
        # Get weather for all destinations
        destinations = []
        for dest in self.destinations.keys():
            try:
                weather = eval(self.weather_client.ask(f"What's the weather in {dest}?"))
                destinations.append({
                    "name": dest,
                    "weather": weather,
                    "activities": self.destinations[dest]["activities"]
                })
            except:
                # Skip if we can't get weather
                continue

        # Filter by weather preference
        filtered = []
        if weather_pref.lower() == "warm" or weather_pref.lower() == "hot":
            filtered = [d for d in destinations if d["weather"]["temp"] > 75]
        elif weather_pref.lower() == "cool" or weather_pref.lower() == "cold":
            filtered = [d for d in destinations if d["weather"]["temp"] < 60]
        elif weather_pref.lower() == "moderate" or weather_pref.lower() == "mild":
            filtered = [d for d in destinations if 60 <= d["weather"]["temp"] <= 75]
        else:
            filtered = destinations

        # Filter by activity preference if provided
        if activity_pref:
            activity_filtered = []
            for dest in filtered:
                for activity in dest["activities"]:
                    if activity_pref.lower() in activity.lower():
                        activity_filtered.append(dest)
                        break
            filtered = activity_filtered

        # Return results
        if filtered:
            return filtered
        else:
            return "No destinations match your preferences."

    def handle_task(self, task):
        # Extract message text
        message_data = task.message or {}
        content = message_data.get("content", {})
        text = content.get("text", "") if isinstance(content, dict) else ""

        # Initialize response
        response_text = "I can recommend destinations based on weather and activities. Try asking for recommendations for warm or cool places."

        # Extract preferences
        weather_pref = None
        activity_pref = None

        if "warm" in text.lower() or "hot" in text.lower():
            weather_pref = "warm"
        elif "cool" in text.lower() or "cold" in text.lower():
            weather_pref = "cool"
        elif "moderate" in text.lower() or "mild" in text.lower():
            weather_pref = "moderate"

        # Check for activities
        common_activities = ["beach", "museum", "food", "shopping", "nature", "cruise", "show"]
        for activity in common_activities:
            if activity in text.lower():
                activity_pref = activity
                break

        # Generate recommendations if preferences found
        if weather_pref:
            try:
                recommendations = self.recommend_destination(weather_pref, activity_pref)

                if isinstance(recommendations, str):
                    response_text = recommendations
                else:
                    response_text = f"Here are some {weather_pref} destinations"
                    if activity_pref:
                        response_text += f" with {activity_pref} activities"
                    response_text += ":\n\n"

                    for dest in recommendations[:3]:  # Limit to top 3
                        response_text += f"- {dest['name'].title()}: {dest['weather']['temp']}°F, {dest['weather']['condition']}\n"
                        response_text += f"  Activities: {', '.join(dest['activities'])}\n\n"
            except Exception as e:
                response_text = f"Sorry, I couldn't generate recommendations: {str(e)}"

        # Create response artifact
        task.artifacts = [{
            "parts": [{"type": "text", "text": response_text}]
        }]
        task.status = TaskStatus(state=TaskState.COMPLETED)

        return task

# Run the server
if __name__ == "__main__":
    agent = TravelAgent()
    run_server(agent, port=5002)

Finally, create the User Interface Agent:

# ui_agent.py
from python_a2a import A2AServer, skill, agent, run_server, A2AClient
from python_a2a import TaskStatus, TaskState

@agent(
    name="Travel Assistant",
    description="Your personal travel assistant",
    version="1.0.0"
)
class AssistantAgent(A2AServer):

    def __init__(self):
        super().__init__()
        # Connect to other agents
        self.weather_client = A2AClient("http://localhost:5001")
        self.travel_client = A2AClient("http://localhost:5002")

    def handle_task(self, task):
        # Extract message text
        message_data = task.message or {}
        content = message_data.get("content", {})
        text = content.get("text", "") if isinstance(content, dict) else ""

        # Initialize response
        response_text = "I'm your travel assistant. I can help with weather information and travel recommendations."

        # Determine which agent to route to
        if "weather" in text.lower() or "forecast" in text.lower() or "temperature" in text.lower():
            # Route to weather agent
            response_text = self.weather_client.ask(text)
        elif "recommend" in text.lower() or "suggest" in text.lower() or "destination" in text.lower() or "where should" in text.lower():
            # Route to travel agent
            response_text = self.travel_client.ask(text)
        elif text.lower() in ["hi", "hello", "hey"]:
            # Greeting
            response_text = "Hello! I'm your travel assistant. I can help with weather information and travel recommendations. Try asking about the weather in a city or for recommendations for warm places with beaches."
        elif "help" in text.lower() or "what can you do" in text.lower():
            # Help message
            response_text = """I can help you with:

            1. Weather information: "What's the weather in Paris?" or "Get me the forecast for Tokyo"
            2. Travel recommendations: "Recommend warm destinations" or "Suggest cool places with museums"

            Just let me know what you're interested in!"""

        # Create response artifact
        task.artifacts = [{
            "parts": [{"type": "text", "text": response_text}]
        }]
        task.status = TaskStatus(state=TaskState.COMPLETED)

        return task

# Run the server
if __name__ == "__main__":
    agent = AssistantAgent()
    run_server(agent, port=5000)

To run this multi-agent system:

  1. Start the weather agent: python weather_agent.py

  2. Start the travel agent: python travel_agent.py

  3. Start the UI agent: python ui_agent.py

You can then interact with the UI agent at http://localhost:5000.

MCP Integration with LLM

This example demonstrates how to integrate an LLM-based agent with MCP tools:

# llm_mcp_agent.py
import os
from python_a2a import A2AServer, A2AMCPAgent, run_server, AgentCard
from python_a2a import OpenAIA2AServer, TaskStatus, TaskState
from python_a2a.mcp import FastMCP, text_response

# Create MCP server with calculation tools
calculator_mcp = FastMCP(
    name="Calculator MCP",
    description="Provides calculation functions"
)

@calculator_mcp.tool()
def add(a: float, b: float) -> float:
    """Add two numbers together."""
    return a + b

@calculator_mcp.tool()
def subtract(a: float, b: float) -> float:
    """Subtract b from a."""
    return a - b

@calculator_mcp.tool()
def multiply(a: float, b: float) -> float:
    """Multiply two numbers together."""
    return a * b

@calculator_mcp.tool()
def divide(a: float, b: float) -> float:
    """Divide a by b."""
    if b == 0:
        return "Cannot divide by zero"
    return a / b

# Create MCP server with data lookup tools
data_mcp = FastMCP(
    name="Data MCP",
    description="Provides data lookup functions"
)

@data_mcp.tool()
def get_country_capital(country: str) -> str:
    """
    Get the capital city of a country.

    Args:
        country: The country to look up

    Returns:
        The capital city
    """
    capitals = {
        "usa": "Washington, D.C.",
        "uk": "London",
        "france": "Paris",
        "germany": "Berlin",
        "japan": "Tokyo",
        "china": "Beijing",
        "india": "New Delhi",
        "brazil": "Brasília",
        "australia": "Canberra",
        "canada": "Ottawa"
    }

    country = country.lower()
    if country in capitals:
        return capitals[country]
    elif country == "united states" or country == "united states of america":
        return capitals["usa"]
    elif country == "united kingdom":
        return capitals["uk"]
    else:
        return f"I don't know the capital of {country}"

@data_mcp.tool()
def get_country_population(country: str) -> str:
    """
    Get the population of a country.

    Args:
        country: The country to look up

    Returns:
        The population (approximate, as of 2023)
    """
    populations = {
        "usa": "331 million",
        "uk": "67 million",
        "france": "65 million",
        "germany": "83 million",
        "japan": "126 million",
        "china": "1.4 billion",
        "india": "1.38 billion",
        "brazil": "212 million",
        "australia": "25 million",
        "canada": "38 million"
    }

    country = country.lower()
    if country in populations:
        return populations[country]
    elif country == "united states" or country == "united states of america":
        return populations["usa"]
    elif country == "united kingdom":
        return populations["uk"]
    else:
        return f"I don't know the population of {country}"

# Create the OpenAI-based MCP-enabled agent
class SmartAssistant(OpenAIA2AServer, A2AMCPAgent):
    def __init__(self, api_key):
        # Create agent card
        agent_card = AgentCard(
            name="Smart Assistant",
            description="A smart assistant that can calculate and look up information",
            url="http://localhost:5000",
            version="1.0.0"
        )

        # Initialize OpenAI A2A server
        OpenAIA2AServer.__init__(
            self,
            api_key=api_key,
            model="gpt-4",
            system_prompt="""You are a helpful assistant that can calculate and look up information.

            When a user asks for calculations, use the calculator tools to perform the calculation.
            When a user asks for country information, use the data lookup tools.

            Make your responses concise and helpful."""
        )

        # Initialize MCP agent
        A2AMCPAgent.__init__(
            self,
            name="Smart Assistant",
            description="A smart assistant that can calculate and look up information",
            mcp_servers={
                "calc": calculator_mcp,
                "data": data_mcp
            }
        )

    async def handle_message_async(self, message):
        """Route all messages through OpenAI but add MCP capabilities"""
        try:
            # First try normal OpenAI processing
            response = OpenAIA2AServer.handle_message(self, message)

            # Check if the response is a function call
            if response.content.type == "function_call":
                # Use our MCP handler to execute the function call
                return await super().handle_message_async(message)

            return response
        except Exception as e:
            # Fall back to default handling
            return await super().handle_message_async(message)

    def handle_message(self, message):
        """Override to use our async handler"""
        import asyncio
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.handle_message_async(message))

    async def handle_task_async(self, task):
        """Handle a task by converting to message and back"""
        # Convert task to message
        message_data = task.message or {}

        # Create a Message object
        from python_a2a import Message, TextContent, MessageRole
        message = Message(
            content=TextContent(text=message_data.get("content", {}).get("text", "")),
            role=MessageRole.USER,
            conversation_id=task.id
        )

        # Process with handle_message
        response = await self.handle_message_async(message)

        # Convert response to task
        if response.content.type == "text":
            task.artifacts = [{
                "parts": [{"type": "text", "text": response.content.text}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        elif response.content.type == "function_response":
            task.artifacts = [{
                "parts": [{"type": "text", "text": str(response.content.response)}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.artifacts = [{
                "parts": [{"type": "text", "text": f"Response type: {response.content.type}"}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)

        return task

    def handle_task(self, task):
        """Override to use our async handler"""
        import asyncio
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.handle_task_async(task))

# Run the agent
if __name__ == "__main__":
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("Please set the OPENAI_API_KEY environment variable.")

    agent = SmartAssistant(api_key)
    run_server(agent, port=5000)

This agent can:

  1. Answer questions using OpenAI’s LLM

  2. Perform calculations using the calculator MCP

  3. Look up country information using the data MCP

Example interactions:

  • “What is 125 × 37?”

  • “What is the capital of France?”

  • “What is the population of China?”

Next Steps

These advanced examples demonstrate the power and flexibility of Python A2A. You can:

  • Build complex multi-agent systems

  • Integrate LLMs with external tools and data

  • Create specialized agents for different tasks

For even more examples, check out the [GitHub repository](https://github.com/themanojdesai/python-a2a/tree/main/examples).