Bidirectional streaming with Vertex AI Agent Engine Runtime

This page describes how to use bidirectional streaming with Vertex AI Agent Engine Runtime.

Overview

Bidirectional streaming provides a persistent, two-way communication channel between your application and the agent, letting you move beyond turn-based, request-response patterns. Bidirectional streaming works for use cases where your agent needs to process information and respond continuously, such as interacting with audio or video inputs with low latency.

Bidirectional streaming with Vertex AI Agent Engine Runtime supports interactive, real-time agent use cases and data exchange for multimodal live APIs. Bidirectional streaming is supported for all frameworks, and custom bidirectional streaming methods are available through registering custom methods . You can use bidirectional streaming to interact with Gemini Live API using Agent Development Kit (ADK) on Vertex AI Agent Engine.

Deploying a remote agent with bidirectional query methods is only supported through the Google Gen AI SDK . When bidirectional query methods are detected, the Gen AI SDK automatically sets the EXPERIMENTAL agent server mode when calling the REST API.

Develop an agent

While developing an agent, use the following steps to implement bidirectional streaming:

Define a bidirectional streaming query method

You can define a bidi_stream_query method that asynchronously takes stream requests as input and outputs streaming responses. As an example, the following template extends the basic template to stream requests and responses and is deployable on Agent Engine:

  import 
  
 asyncio 
 from 
  
 typing 
  
 import 
 Any 
 , 
 AsyncIterable 
 class 
  
 BidiStreamingAgent 
 ( 
 StreamingAgent 
 ): 
 async 
 def 
  
 bidi_stream_query 
 ( 
 self 
 , 
 request_queue 
 : 
 asyncio 
 . 
 Queue 
 [ 
 Any 
 ] 
 ) 
 - 
> AsyncIterable 
 [ 
 Any 
 ]: 
 from 
  
 langchain.load.dump 
  
 import 
 dumpd 
 while 
 True 
 : 
 request 
 = 
 await 
 request_queue 
 . 
 get 
 () 
 # This is just an illustration, you're free to use any termination mechanism. 
 if 
 request 
 == 
 "END" 
 : 
 break 
 for 
 chunk 
 in 
 self 
 . 
 graph 
 . 
 stream 
 ( 
 request 
 ): 
 yield 
 dumpd 
 ( 
 chunk 
 ) 
 agent 
 = 
 BidiStreamingAgent 
 ( 
 model 
 = 
 model 
 , 
 # Required. 
 tools 
 = 
 [ 
 get_exchange_rate 
 ], 
 # Optional. 
 project 
 = 
 "PROJECT_ID" 
 , 
 location 
 = 
 "LOCATION" 
 , 
 ) 
 agent 
 . 
 set_up 
 () 
 

Keep the following in mind when using the bidirectional streaming API:

  • asyncio.Queue : You can put any data type in this request queue to wait to be sent to the model API.

  • Maximum timeout: The maximum timeout for bidirectional streaming query is 10 minutes. If your agent requires longer processing times, consider breaking down the task into smaller chunks and use session or memory to keep the states.

  • Throttle content consumption: When consuming content from a bidirectional stream, it's important to manage the rate at which your agent processes incoming data. If your agent consumes data too slowly, it can lead to issues like increased latency or memory pressure on the server side. Implement mechanisms to actively pull data when your agent is ready to process it, and avoid blocking operations that could halt content consumption.

  • Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), you should throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.

Test the bidirectional streaming query method

You can test the bidirectional streaming query locally by calling the bidi_stream_query method and iterating through the results:

  import 
  
 asyncio 
 import 
  
 pprint 
 import 
  
 time 
 request_queue 
 = 
 asyncio 
 . 
 Queue 
 () 
 async 
 def 
  
 generate_input 
 (): 
 # This is just an illustration, you're free to use any appropriate input generator. 
 request_queue 
 . 
 put_nowait 
 ( 
 { 
 "input" 
 : 
 "What is the exchange rate from US dolloars to Swedish currency" 
 } 
 ) 
 time 
 . 
 sleep 
 ( 
 5 
 ) 
 request_queue 
 . 
 put_nowait 
 ( 
 { 
 "input" 
 : 
 "What is the exchange rate from US dolloars to Euro currency" 
 } 
 ) 
 time 
 . 
 sleep 
 ( 
 5 
 ) 
 request_queue 
 . 
 put_nowait 
 ( 
 "END" 
 ) 
 async 
 def 
  
 print_query_result 
 (): 
 async 
 for 
 chunk 
 in 
 agent 
 . 
 bidi_stream_query 
 ( 
 request_queue 
 ): 
 pprint 
 . 
 pprint 
 ( 
 chunk 
 , 
 depth 
 = 
 1 
 ) 
 input_task 
 = 
 asyncio 
 . 
 create_task 
 ( 
 generate_input 
 ()) 
 output_task 
 = 
 asyncio 
 . 
 create_task 
 ( 
 print_query_result 
 ()) 
 await 
 asyncio 
 . 
 gather 
 ( 
 input_task 
 , 
 output_task 
 , 
 return_exceptions 
 = 
 True 
 ) 
 

The same bidirectional query connection can handle multiple requests and responses. For each new request from the queue, the following example generates a stream of chunks containing different information about the response:

  { 
 'actions' 
 : 
 [ 
 ... 
 ], 
 'messages' 
 : 
 [ 
 ... 
 ]} 
 { 
 'messages' 
 : 
 [ 
 ... 
 ], 
 'steps' 
 : 
 [ 
 ... 
 ]} 
 { 
 'messages' 
 : 
 [ 
 ... 
 ], 
 'output' 
 : 
 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. 
 \n 
 ' 
 } 
 { 
 'actions' 
 : 
 [ 
 ... 
 ], 
 'messages' 
 : 
 [ 
 ... 
 ]} 
 { 
 'messages' 
 : 
 [ 
 ... 
 ], 
 'steps' 
 : 
 [ 
 ... 
 ]} 
 { 
 'messages' 
 : 
 [ 
 ... 
 ], 
 'output' 
 : 
 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. 
 \n 
 ' 
 } 
 

(Optional) Register custom methods

Operations can be registered as either standard (represented by an empty string "" ), streaming ( stream ), or bidirectional streaming ( bidi_stream ) execution modes.

  from 
  
 typing 
  
 import 
 AsyncIterable 
 , 
 Iterable 
 class 
  
 CustomAgent 
 ( 
 BidiStreamingAgent 
 ): 
 # ... same get_state and get_state_history function definition. 
 async 
 def 
  
 get_state_bidi_mode 
 ( 
 self 
 , 
 request_queue 
 : 
 asyncio 
 . 
 Queue 
 [ 
 Any 
 ] 
 ) 
 - 
> AsyncIterable 
 [ 
 Any 
 ]: 
 while 
 True 
 : 
 request 
 = 
 await 
 request_queue 
 . 
 get 
 () 
 if 
 request 
 == 
 "END" 
 : 
 break 
 yield 
 self 
 . 
 graph 
 . 
 get_state 
 ( 
 request 
 ) 
 . 
 _asdict 
 () 
 def 
  
 register_operations 
 ( 
 self 
 ): 
 return 
 { 
 # The list of synchrounous operations to be registered 
 "" 
 : 
 [ 
 "query" 
 , 
 "get_state" 
 ] 
 # The list of streaming operations to be registered 
 "stream" 
 : 
 [ 
 "stream_query" 
 , 
 "get_state_history" 
 ] 
 # The list of bidi streaming operations to be registered 
 "bidi_stream" 
 : 
 [ 
 "bidi_stream_query" 
 , 
 "get_state_bidi_mode" 
 ] 
 } 
 

Deploy an agent

Once you develop your agent as live_agent , you can deploy the agent to Agent Engine by creating an Agent Engine instance.

Note that with the Gen AI SDK, all deployment configurations (additional packages and customized resource controls) are assigned as a value of config when creating the Agent Engine instance.

Initialize the Gen AI client:

  import 
  
  vertexai 
 
 client 
 = 
  vertexai 
 
 . 
 Client 
 ( 
 project 
 = 
 PROJECT 
 , 
 location 
 = 
 LOCATION 
 ) 
 

Deploy the agent to Agent Engine:

  remote_live_agent 
 = 
 client 
 . 
 agent_engines 
 . 
 create 
 ( 
 agent 
 = 
 live_agent 
 , 
 config 
 = 
 { 
 "staging_bucket" 
 : 
 STAGING_BUCKET 
 , 
 "requirements" 
 : 
 [ 
 "google-cloud-aiplatform[agent_engines,adk]==1.88.0" 
 , 
 "cloudpickle==3.0" 
 , 
 "websockets" 
 ], 
 }, 
 ) 
 

For information on the steps happening in the background during deployment, see Create an AgentEngine instance .

Get the agent resource ID:

  remote_live_agent 
 . 
 api_resource 
 . 
 name 
 

Use an agent

If you defined a bidi_stream_query operation when developing your agent, you can bidirectional stream query the agent asynchronously using the Gen AI SDK for Python.

You can modify the following example with any data recognizable by your agent, using any applicable termination logic for input stream and output stream:

  async 
 with 
 client 
 . 
 aio 
 . 
 live 
 . 
 agent_engines 
 . 
 connect 
 ( 
 agent_engine 
 = 
 remote_live_agent 
 . 
 api_resource 
 . 
 name 
 , 
 config 
 = 
 { 
 "class_method" 
 : 
 "bidi_stream_query" 
 } 
 ) 
 as 
 connection 
 : 
 while 
 True 
 : 
 # 
 input_str 
 = 
 input 
 ( 
 "Enter your question: " 
 ) 
 if 
 input_str 
 == 
 "exit" 
 : 
 break 
 await 
 connection 
 . 
 send 
 ({ 
 "input" 
 : 
 input_str 
 }) 
 while 
 True 
 : 
 response 
 = 
 await 
 connection 
 . 
 receive 
 () 
 print 
 ( 
 response 
 ) 
 if 
 response 
 [ 
 "bidiStreamOutput" 
 ][ 
 "output" 
 ] 
 == 
 "end of turn" 
 : 
 break 
 

Vertex AI Agent Engine Runtime streams responses as a sequence of iteratively generated objects. For example, a set of two responses in the first turn might look like the following:

 Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit 

Use an Agent Development Kit agent

If you developed your agent using Agent Development Kit (ADK), you can use bidirectional streaming to interact with the Gemini Live API .

The following example creates a conversation agent that takes user text questions and receives Gemini Live API response audio data:

  import 
  
 numpy 
  
 as 
  
 np 
 from 
  
 google.adk.agents.live_request_queue 
 improt 
 LiveRequest 
 from 
  
 google.adk.events 
  
 import 
 Event 
 from 
  
 google.genai 
  
 import 
 types 
 def 
  
 prepare_live_request 
 ( 
 input_text 
 : 
 str 
 ) 
 - 
> LiveRequest 
 : 
 part 
 = 
 types 
 . 
 Part 
 . 
 from_text 
 ( 
 text 
 = 
 input_text 
 ) 
 content 
 = 
 types 
 . 
 Content 
 ( 
 parts 
 = 
 [ 
 part 
 ]) 
 return 
 LiveRequest 
 ( 
 content 
 = 
 content 
 ) 
 async 
 with 
 client 
 . 
 aio 
 . 
 live 
 . 
 agent_engines 
 . 
 connect 
 ( 
 agent_engine 
 = 
 remote_live_agent 
 . 
 api_resource 
 . 
 name 
 , 
 config 
 = 
 { 
 "class_method" 
 : 
 "bidi_stream_query" 
 , 
 "input" 
 : 
 { 
 "input_str" 
 : 
 "hello" 
 }, 
 }) 
 as 
 connection 
 : 
 first_req 
 = 
 True 
 while 
 True 
 : 
 input_text 
 = 
 input 
 ( 
 "Enter your question: " 
 ) 
 if 
 input_text 
 = 
 "exit" 
 : 
 break 
 if 
 first_req 
 : 
 await 
 connection 
 . 
 send 
 ({ 
 "user_id" 
 : 
 USER_ID 
 , 
 "live_request" 
 : 
 prepare_live_request 
 ( 
 input_text 
 ) 
 . 
 dict 
 () 
 }) 
 first_req 
 = 
 False 
 else 
 : 
 await 
 connection 
 . 
 send 
 ( 
 prepare_live_request 
 ( 
 input_text 
 ) 
 . 
 dict 
 ()) 
 audio_data 
 = 
 [] 
 while 
 True 
 : 
 async 
 def 
  
 receive 
 (): 
 return 
 await 
 connection 
 . 
 receive 
 () 
 receiving 
 = 
 asyncio 
 . 
 Task 
 ( 
 receive 
 ()) 
 done 
 , 
 _ 
 = 
 await 
 asyncio 
 . 
 wait 
 ([ 
 receiving 
 ]) 
 if 
 receiving 
 not 
 in 
 done 
 : 
 receiving 
 . 
 cancel 
 () 
 break 
 event 
 = 
 Event 
 . 
 model_validate 
 ( 
 receiving 
 . 
 result 
 ()[ 
 "bidiStreamOutput" 
 ]) 
 part 
 = 
 event 
 . 
 content 
 and 
 event 
 . 
 content 
 . 
 parts 
 and 
 event 
 . 
 content 
 . 
 parts 
 [ 
 0 
 ] 
 if 
 part 
 . 
 inline_data 
 and 
 part 
 . 
 inline_data 
 . 
 data 
 : 
 chunk_data 
 = 
 part 
 . 
 inline_data 
 . 
 data 
 data 
 = 
 np 
 . 
 frombuffer 
 ( 
 chunk_data 
 , 
 dtype 
 = 
 np 
 . 
 int16 
 ) 
 audio_data 
 . 
 append 
 ( 
 data 
 ) 
 else 
 : 
 print 
 ( 
 part 
 ) 
 if 
 audio_data 
 : 
 concatenated_audio 
 = 
 np 
 . 
 concatenate 
 ( 
 audio_data 
 ) 
 display 
 ( 
 Audio 
 ( 
 concatenated_audio 
 , 
 rate 
 = 
 24000 
 , 
 autoplay 
 = 
 True 
 )) 
 

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: