Develop an Agent2Agent agent

This page shows you how to develop and test an Agent2Agent (A2A) agent. The A2A protocol is an open standard designed to enable seamless communication and collaboration between AI agents. This guide focuses on the local workflow, allowing you to define and verify your agent's functionality before deployment.

The core workflow involves the following steps:

  1. Define key components
  2. Create local agent
  3. Test the local agent

Define agent components

To create an A2A agent, you need to define the following components: an AgentCard , an AgentExecutor , and an ADK LlmAgent .

  • AgentCard contains a metadata document that describes your agent's capabilities. AgentCard is like a business card that other agents can use to discover what your agent can do. For more details, see the Agent Card specification .
  • AgentExecutor contains the agent's core logic and defines how it handles tasks. This is where you implement the agent's behavior. You can read more about it in the A2A protocol specification .
  • (Optional) LlmAgent defines the ADK agent, including its system instructions, generative model, and tools.

Define an AgentCard

The following code sample defines an AgentCard for a currency exchange rate agent:

  from 
  
 a2a.types 
  
 import 
 AgentCard 
 , 
 AgentSkill 
 from 
  
 vertexai.preview.reasoning_engines.templates.a2a 
  
 import 
 create_agent_card 
 # Define the skill for the CurrencyAgent 
 currency_skill 
 = 
 AgentSkill 
 ( 
 id 
 = 
 'get_exchange_rate' 
 , 
 name 
 = 
 'Get Currency Exchange Rate' 
 , 
 description 
 = 
 'Retrieves the exchange rate between two currencies on a specified date.' 
 , 
 tags 
 = 
 [ 
 'Finance' 
 , 
 'Currency' 
 , 
 'Exchange Rate' 
 ], 
 examples 
 = 
 [ 
 'What is the exchange rate from USD to EUR?' 
 , 
 'How many Japanese Yen is 1 US dollar worth today?' 
 , 
 ], 
 ) 
 # Create the agent card using the utility function 
 agent_card 
 = 
 create_agent_card 
 ( 
 agent_name 
 = 
 'Currency Exchange Agent' 
 , 
 description 
 = 
 'An agent that can provide currency exchange rates' 
 , 
 skills 
 = 
 [ 
 currency_skill 
 ] 
 ) 
 

Define an AgentExecutor

The following code example defines an AgentExecutor that responds with the currency exchange rate. It takes a CurrencyAgent instance and initializes the ADK Runner to execute requests.

  import 
  
 requests 
 from 
  
 a2a.server.agent_execution 
  
 import 
 AgentExecutor 
 , 
 RequestContext 
 from 
  
 a2a.server.events 
  
 import 
 EventQueue 
 from 
  
 a2a.server.tasks 
  
 import 
 TaskUpdater 
 from 
  
 a2a.types 
  
 import 
 TaskState 
 , 
 TextPart 
 , 
 UnsupportedOperationError 
 , 
 Part 
 from 
  
 a2a.utils 
  
 import 
 new_agent_text_message 
 from 
  
 a2a.utils.errors 
  
 import 
 ServerError 
 from 
  
 google.adk 
  
 import 
 Runner 
 from 
  
 google.adk.agents 
  
 import 
 LlmAgent 
 from 
  
 google.adk.artifacts 
  
 import 
 InMemoryArtifactService 
 from 
  
 google.adk.memory.in_memory_memory_service 
  
 import 
 InMemoryMemoryService 
 from 
  
 google.adk.sessions 
  
 import 
 InMemorySessionService 
 from 
  
 google.genai 
  
 import 
 types 
 class 
  
 CurrencyAgentExecutorWithRunner 
 ( 
 AgentExecutor 
 ): 
  
 """Executor that takes an LlmAgent instance and initializes the ADK Runner internally.""" 
 def 
  
 __init__ 
 ( 
 self 
 , 
 agent 
 : 
 LlmAgent 
 ): 
 self 
 . 
 agent 
 = 
 agent 
 self 
 . 
 runner 
 = 
 None 
 def 
  
 _init_adk 
 ( 
 self 
 ): 
 if 
 not 
 self 
 . 
 runner 
 : 
 self 
 . 
 runner 
 = 
 Runner 
 ( 
 app_name 
 = 
 self 
 . 
 agent 
 . 
 name 
 , 
 agent 
 = 
 self 
 . 
 agent 
 , 
 artifact_service 
 = 
 InMemoryArtifactService 
 (), 
 session_service 
 = 
 InMemorySessionService 
 (), 
 memory_service 
 = 
 InMemoryMemoryService 
 (), 
 ) 
 async 
 def 
  
 cancel 
 ( 
 self 
 , 
 context 
 : 
 RequestContext 
 , 
 event_queue 
 : 
 EventQueue 
 ): 
 raise 
 ServerError 
 ( 
 error 
 = 
 UnsupportedOperationError 
 ()) 
 async 
 def 
  
 execute 
 ( 
 self 
 , 
 context 
 : 
 RequestContext 
 , 
 event_queue 
 : 
 EventQueue 
 , 
 ) 
 - 
> None 
 : 
 self 
 . 
 _init_adk 
 () 
 # Initialize on first execute call 
 if 
 not 
 context 
 . 
 message 
 : 
 return 
 user_id 
 = 
 context 
 . 
 message 
 . 
 metadata 
 . 
 get 
 ( 
 'user_id' 
 ) 
 if 
 context 
 . 
 message 
 and 
 context 
 . 
 message 
 . 
 metadata 
 else 
 'a2a_user' 
 updater 
 = 
 TaskUpdater 
 ( 
 event_queue 
 , 
 context 
 . 
 task_id 
 , 
 context 
 . 
 context_id 
 ) 
 if 
 not 
 context 
 . 
 current_task 
 : 
 await 
 updater 
 . 
 submit 
 () 
 await 
 updater 
 . 
 start_work 
 () 
 query 
 = 
 context 
 . 
 get_user_input 
 () 
 content 
 = 
 types 
 . 
 Content 
 ( 
 role 
 = 
 'user' 
 , 
 parts 
 = 
 [ 
 types 
 . 
 Part 
 ( 
 text 
 = 
 query 
 )]) 
 try 
 : 
 session 
 = 
 await 
 self 
 . 
 runner 
 . 
 session_service 
 . 
 get_session 
 ( 
 app_name 
 = 
 self 
 . 
 runner 
 . 
 app_name 
 , 
 user_id 
 = 
 user_id 
 , 
 session_id 
 = 
 context 
 . 
 context_id 
 , 
 ) 
 or 
 await 
 self 
 . 
 runner 
 . 
 session_service 
 . 
 create_session 
 ( 
 app_name 
 = 
 self 
 . 
 runner 
 . 
 app_name 
 , 
 user_id 
 = 
 user_id 
 , 
 session_id 
 = 
 context 
 . 
 context_id 
 , 
 ) 
 final_event 
 = 
 None 
 async 
 for 
 event 
 in 
 self 
 . 
 runner 
 . 
 run_async 
 ( 
 session_id 
 = 
 session 
 . 
 id 
 , 
 user_id 
 = 
 user_id 
 , 
 new_message 
 = 
 content 
 ): 
 if 
 event 
 . 
 is_final_response 
 (): 
 final_event 
 = 
 event 
 if 
 final_event 
 and 
 final_event 
 . 
 content 
 and 
 final_event 
 . 
 content 
 . 
 parts 
 : 
 response_text 
 = 
 "" 
 . 
 join 
 ( 
 part 
 . 
 text 
 for 
 part 
 in 
 final_event 
 . 
 content 
 . 
 parts 
 if 
 hasattr 
 ( 
 part 
 , 
 'text' 
 ) 
 and 
 part 
 . 
 text 
 ) 
 if 
 response_text 
 : 
 await 
 updater 
 . 
 add_artifact 
 ( 
 [ 
 TextPart 
 ( 
 text 
 = 
 response_text 
 )], 
 name 
 = 
 'result' 
 , 
 ) 
 await 
 updater 
 . 
 complete 
 () 
 return 
 await 
 updater 
 . 
 update_status 
 ( 
 TaskState 
 . 
 failed 
 , 
 message 
 = 
 new_agent_text_message 
 ( 
 'Failed to generate a final response with text content.' 
 ), 
 final 
 = 
 True 
 ) 
 except 
 Exception 
 as 
 e 
 : 
 await 
 updater 
 . 
 update_status 
 ( 
 TaskState 
 . 
 failed 
 , 
 message 
 = 
 new_agent_text_message 
 ( 
 f 
 "An error occurred: 
 { 
 str 
 ( 
 e 
 ) 
 } 
 " 
 ), 
 final 
 = 
 True 
 , 
 ) 
 

Define an LlmAgent

First, define a currency exchange tool for the LlmAgent to use:

  def 
  
 get_exchange_rate 
 ( 
 currency_from 
 : 
 str 
 = 
 "USD" 
 , 
 currency_to 
 : 
 str 
 = 
 "EUR" 
 , 
 currency_date 
 : 
 str 
 = 
 "latest" 
 , 
 ): 
  
 """Retrieves the exchange rate between two currencies on a specified date. 
 Uses the Frankfurter API (https://api.frankfurter.app/) to obtain 
 exchange rate data. 
 """ 
 try 
 : 
 response 
 = 
 requests 
 . 
 get 
 ( 
 f 
 "https://api.frankfurter.app/ 
 { 
 currency_date 
 } 
 " 
 , 
 params 
 = 
 { 
 "from" 
 : 
 currency_from 
 , 
 "to" 
 : 
 currency_to 
 }, 
 ) 
 response 
 . 
 raise_for_status 
 () 
 return 
 response 
 . 
 json 
 () 
 except 
 requests 
 . 
 exceptions 
 . 
 RequestException 
 as 
 e 
 : 
 return 
 { 
 "error" 
 : 
 str 
 ( 
 e 
 )} 
 

Then, define an ADK LlmAgent that uses the tool.

  my_llm_agent 
 = 
 LlmAgent 
 ( 
 model 
 = 
 'gemini-2.0-flash' 
 , 
 name 
 = 
 'currency_exchange_agent' 
 , 
 description 
 = 
 'An agent that can provide currency exchange rates.' 
 , 
 instruction 
 = 
 """You are a helpful currency exchange assistant. 
 Use the get_exchange_rate tool to answer user questions. 
 If the tool returns an error, inform the user about the error.""" 
 , 
 tools 
 = 
 [ 
 get_exchange_rate 
 ], 
 ) 
 

Create a local agent

Once you have defined your agent's components, create an instance of the A2aAgent class that uses the AgentCard , AgentExecutor , and LlmAgent to begin local testing.

  from 
  
 vertexai.preview.reasoning_engines 
  
 import 
 A2aAgent 
 a2a_agent 
 = 
 A2aAgent 
 ( 
 agent_card 
 = 
 agent_card 
 , 
 # Assuming agent_card is defined 
 agent_executor_builder 
 = 
 lambda 
 : 
 CurrencyAgentExecutorWithRunner 
 ( 
 agent 
 = 
 my_llm_agent 
 , 
 ) 
 ) 
 a2a_agent 
 . 
 set_up 
 () 
 

The A2A Agent template helps you create an A2A-compliant service. The service acts as a wrapper, abstracting away the converting layer from you.

Test the local agent

The currency exchange rate agent supports the following three methods:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Test handle_authenticated_agent_card

The following code retrieves the agent's authenticated card, which describes the agent's capabilities.

  # Test the `authenticated_agent_card` endpoint. 
 response_get_card 
 = 
 await 
 a2a_agent 
 . 
 handle_authenticated_agent_card 
 ( 
 request 
 = 
 None 
 , 
 context 
 = 
 None 
 ) 
 print 
 ( 
 response_get_card 
 ) 
 

Test on_message_send

The following code simulates a client sending a new message to the agent. The A2aAgent creates a new task and returns the task's ID.

  import 
  
 json 
 from 
  
 starlette.requests 
  
 import 
 Request 
 import 
  
 asyncio 
 # 1. Define the message payload you want to send. 
 message_data 
 = 
 { 
 "message" 
 : 
 { 
 "messageId" 
 : 
 "local-test-message-id" 
 , 
 "content" 
 :[ 
 { 
 "text" 
 : 
 "What is the exchange rate from USD to EUR today?" 
 } 
 ], 
 "role" 
 : 
 "ROLE_USER" 
 , 
 }, 
 } 
 # 2. Construct the request 
 scope 
 = 
 { 
 "type" 
 : 
 "http" 
 , 
 "http_version" 
 : 
 "1.1" 
 , 
 "method" 
 : 
 "POST" 
 , 
 "headers" 
 : 
 [( 
 b 
 "content-type" 
 , 
 b 
 "application/json" 
 )], 
 } 
 async 
 def 
  
 receive 
 (): 
 byte_data 
 = 
 json 
 . 
 dumps 
 ( 
 message_data 
 ) 
 . 
 encode 
 ( 
 "utf-8" 
 ) 
 return 
 { 
 "type" 
 : 
 "http.request" 
 , 
 "body" 
 : 
 byte_data 
 , 
 "more_body" 
 : 
 False 
 } 
 post_request 
 = 
 Request 
 ( 
 scope 
 , 
 receive 
 = 
 receive 
 ) 
 # 3. Call the agent 
 send_message_response 
 = 
 await 
 a2a_agent 
 . 
 on_message_send 
 ( 
 request 
 = 
 post_request 
 , 
 context 
 = 
 None 
 ) 
 print 
 ( 
 send_message_response 
 ) 
 

Test on_get_task

The following code retrieves the status and the result of a task. The output shows that the task is completed and includes the "Hello World" response artifact.

  from 
  
 starlette.requests 
  
 import 
 Request 
 import 
  
 asyncio 
 # 1. Provide the task_id from the previous step. 
 # In a real application, you would store and retrieve this ID. 
 task_id_to_get 
 = 
 send_message_response 
 [ 
 'task' 
 ][ 
 'id' 
 ] 
 # 2. Define the path parameters for the request. 
 task_data 
 = 
 { 
 "id" 
 : 
 task_id_to_get 
 } 
 # 3. Construct the starlette.requests.Request object directly. 
 scope 
 = 
 { 
 "type" 
 : 
 "http" 
 , 
 "http_version" 
 : 
 "1.1" 
 , 
 "method" 
 : 
 "GET" 
 , 
 "headers" 
 : 
 [], 
 "query_string" 
 : 
 b 
 '' 
 , 
 "path_params" 
 : 
 task_data 
 , 
 } 
 async 
 def 
  
 empty_receive 
 (): 
 return 
 { 
 "type" 
 : 
 "http.disconnect" 
 } 
 get_request 
 = 
 Request 
 ( 
 scope 
 , 
 empty_receive 
 ) 
 # 4. Call the agent's handler to get the task status. 
 task_status_response 
 = 
 await 
 a2a_agent 
 . 
 on_get_task 
 ( 
 request 
 = 
 get_request 
 , 
 context 
 = 
 None 
 ) 
 print 
 ( 
 f 
 "Successfully retrieved status for Task ID: 
 { 
 task_id_to_get 
 } 
 " 
 ) 
 print 
 ( 
 " 
 \n 
 Full task status response:" 
 ) 
 print 
 ( 
 task_status_response 
 ) 
 

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: