Develop an Agent2Agent agent

This page shows you how to develop and test an Agent2Agent (A2A) agent. The A2A protocol is an open standard designed to enable seamless communication and collaboration between AI agents. This guide focuses on the local workflow, allowing you to define and verify your agent's functionality before deployment.

The core workflow involves the following steps:

  1. Define key components
  2. Create local agent
  3. Test the local agent

Define agent components

To create an A2A agent, you need to define the following components: an AgentCard, an AgentExecutor, and an ADK LlmAgent.

  • AgentCard contains a metadata document that describes your agent's capabilities. AgentCard is like a business card that other agents can use to discover what your agent can do. For more details, see the Agent Card specification.
  • AgentExecutor contains the agent's core logic and defines how it handles tasks. This is where you implement the agent's behavior. You can read more about it in the A2A protocol specification.
  • (Optional) LlmAgent defines the ADK agent, including its system instructions, generative model, and tools.

Define an AgentCard

The following code sample defines an AgentCard for a currency exchange rate agent:

from a2a.types import AgentCard, AgentSkill from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card # Define the skill for the CurrencyAgent currency_skill = AgentSkill( id='get_exchange_rate', name='Get Currency Exchange Rate', description='Retrieves the exchange rate between two currencies on a specified date.', tags=['Finance', 'Currency', 'Exchange Rate'], examples=[ 'What is the exchange rate from USD to EUR?', 'How many Japanese Yen is 1 US dollar worth today?', ], ) # Create the agent card using the utility function agent_card = create_agent_card( agent_name='Currency Exchange Agent', description='An agent that can provide currency exchange rates', skills=[currency_skill] ) 

Define an AgentExecutor

The following code example defines an AgentExecutor that responds with the currency exchange rate. It takes a CurrencyAgent instance and initializes the ADK Runner to execute requests.

import requests from a2a.server.agent_execution import AgentExecutor, RequestContext from a2a.server.events import EventQueue from a2a.server.tasks import TaskUpdater from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part from a2a.utils import new_agent_text_message from a2a.utils.errors import ServerError from google.adk import Runner from google.adk.agents import LlmAgent from google.adk.artifacts import InMemoryArtifactService from google.adk.memory.in_memory_memory_service import InMemoryMemoryService from google.adk.sessions import InMemorySessionService from google.genai import types class CurrencyAgentExecutorWithRunner(AgentExecutor):  """Executor that takes an LlmAgent instance and initializes the ADK Runner internally.""" def __init__(self, agent: LlmAgent): self.agent = agent self.runner = None def _init_adk(self): if not self.runner: self.runner = Runner( app_name=self.agent.name, agent=self.agent, artifact_service=InMemoryArtifactService(), session_service=InMemorySessionService(), memory_service=InMemoryMemoryService(), ) async def cancel(self, context: RequestContext, event_queue: EventQueue): raise ServerError(error=UnsupportedOperationError()) async def execute( self, context: RequestContext, event_queue: EventQueue, ) -> None: self._init_adk() # Initialize on first execute call if not context.message: return user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user' updater = TaskUpdater(event_queue, context.task_id, context.context_id) if not context.current_task: await updater.submit() await updater.start_work() query = context.get_user_input() content = types.Content(role='user', parts=[types.Part(text=query)]) try: session = await self.runner.session_service.get_session( app_name=self.runner.app_name, user_id=user_id, session_id=context.context_id, ) or await self.runner.session_service.create_session( app_name=self.runner.app_name, user_id=user_id, session_id=context.context_id, ) final_event = None async for event in self.runner.run_async( session_id=session.id, user_id=user_id, new_message=content ): if event.is_final_response(): final_event = event if final_event and final_event.content and final_event.content.parts: response_text = "".join( part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text ) if response_text: await updater.add_artifact( [TextPart(text=response_text)], name='result', ) await updater.complete() return await updater.update_status( TaskState.failed, message=new_agent_text_message('Failed to generate a final response with text content.'), final=True ) except Exception as e: await updater.update_status( TaskState.failed, message=new_agent_text_message(f"An error occurred: {str(e)}"), final=True, ) 

Define an LlmAgent

First, define a currency exchange tool for the LlmAgent to use:

def get_exchange_rate( currency_from: str = "USD", currency_to: str = "EUR", currency_date: str = "latest", ):  """Retrieves the exchange rate between two currencies on a specified date.  Uses the Frankfurter API (https://api.frankfurter.app/) to obtain  exchange rate data.  """ try: response = requests.get( f"https://api.frankfurter.app/{currency_date}", params={"from": currency_from, "to": currency_to}, ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: return {"error": str(e)} 

Then, define an ADK LlmAgent that uses the tool.

my_llm_agent = LlmAgent( model='gemini-2.0-flash', name='currency_exchange_agent', description='An agent that can provide currency exchange rates.', instruction="""You are a helpful currency exchange assistant.  Use the get_exchange_rate tool to answer user questions.  If the tool returns an error, inform the user about the error.""", tools=[get_exchange_rate], ) 

Create a local agent

Once you have defined your agent's components, create an instance of the A2aAgent class that uses the AgentCard, AgentExecutor, and LlmAgent to begin local testing.

from vertexai.preview.reasoning_engines import A2aAgent a2a_agent = A2aAgent( agent_card=agent_card, # Assuming agent_card is defined agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner( agent=my_llm_agent, ) ) a2a_agent.set_up() 

The A2A Agent template helps you create an A2A-compliant service. The service acts as a wrapper, abstracting away the converting layer from you.

Test the local agent

The currency exchange rate agent supports the following three methods:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Test handle_authenticated_agent_card

The following code retrieves the agent's authenticated card, which describes the agent's capabilities.

# Test the `authenticated_agent_card` endpoint. response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None) print(response_get_card) 

Test on_message_send

The following code simulates a client sending a new message to the agent. The A2aAgent creates a new task and returns the task's ID.

import json from starlette.requests import Request import asyncio # 1. Define the message payload you want to send. message_data = { "message": { "messageId": "local-test-message-id", "content":[ { "text": "What is the exchange rate from USD to EUR today?" } ], "role": "ROLE_USER", }, } # 2. Construct the request scope = { "type": "http", "http_version": "1.1", "method": "POST", "headers": [(b"content-type", b"application/json")], } async def receive(): byte_data = json.dumps(message_data).encode("utf-8") return {"type": "http.request", "body": byte_data, "more_body": False} post_request = Request(scope, receive=receive) # 3. Call the agent send_message_response = await a2a_agent.on_message_send(request=post_request, context=None) print(send_message_response) 

Test on_get_task

The following code retrieves the status and the result of a task. The output shows that the task is completed and includes the "Hello World" response artifact.

 from starlette.requests import Request import asyncio # 1. Provide the task_id from the previous step. # In a real application, you would store and retrieve this ID. task_id_to_get = send_message_response['task']['id'] # 2. Define the path parameters for the request. task_data = {"id": task_id_to_get} # 3. Construct the starlette.requests.Request object directly. scope = { "type": "http", "http_version": "1.1", "method": "GET", "headers": [], "query_string": b'', "path_params": task_data, } async def empty_receive(): return {"type": "http.disconnect"} get_request = Request(scope, empty_receive) # 4. Call the agent's handler to get the task status. task_status_response = await a2a_agent.on_get_task(request=get_request, context=None) print(f"Successfully retrieved status for Task ID: {task_id_to_get}") print("\nFull task status response:") print(task_status_response) 

What's next