This page describes how to use bidirectional streaming with Vertex AI Agent Engine Runtime.
Overview
Bidirectional streaming provides a persistent, two-way communication channel between your application and the agent, letting you move beyond turn-based, request-response patterns. Bidirectional streaming works for use cases where your agent needs to process information and respond continuously, such as interacting with audio or video inputs with low latency.
Bidirectional streaming with Vertex AI Agent Engine Runtime supports interactive, real-time agent use cases and data exchange for multimodal live APIs. Bidirectional streaming is supported for all frameworks, and custom bidirectional streaming methods are available through registering custom methods . You can use bidirectional streaming to interact with Gemini Live API using Agent Development Kit (ADK) on Vertex AI Agent Engine.
Deploying a remote agent with bidirectional query methods is only supported through the Google Gen AI SDK
. When bidirectional query methods are detected, the Gen AI SDK automatically sets the EXPERIMENTAL
agent server mode when calling the REST API.
Develop an agent
While developing an agent, use the following steps to implement bidirectional streaming:
-
Register custom methods (optional)
Define a bidirectional streaming query method
You can define a bidi_stream_query
method that asynchronously takes stream requests as input and outputs streaming responses. As an example, the following template extends the basic template
to stream requests and responses and is deployable on Agent Engine:
import
asyncio
from
typing
import
Any
,
AsyncIterable
class
BidiStreamingAgent
(
StreamingAgent
):
async
def
bidi_stream_query
(
self
,
request_queue
:
asyncio
.
Queue
[
Any
]
)
-
> AsyncIterable
[
Any
]:
from
langchain.load.dump
import
dumpd
while
True
:
request
=
await
request_queue
.
get
()
# This is just an illustration, you're free to use any termination mechanism.
if
request
==
"END"
:
break
for
chunk
in
self
.
graph
.
stream
(
request
):
yield
dumpd
(
chunk
)
agent
=
BidiStreamingAgent
(
model
=
model
,
# Required.
tools
=
[
get_exchange_rate
],
# Optional.
project
=
"PROJECT_ID"
,
location
=
"LOCATION"
,
)
agent
.
set_up
()
Keep the following in mind when using the bidirectional streaming API:
-
asyncio.Queue
: You can put any data type in this request queue to wait to be sent to the model API. -
Maximum timeout: The maximum timeout for bidirectional streaming query is 10 minutes. If your agent requires longer processing times, consider breaking down the task into smaller chunks and use session or memory to keep the states.
-
Throttle content consumption: When consuming content from a bidirectional stream, it's important to manage the rate at which your agent processes incoming data. If your agent consumes data too slowly, it can lead to issues like increased latency or memory pressure on the server side. Implement mechanisms to actively pull data when your agent is ready to process it, and avoid blocking operations that could halt content consumption.
-
Throttle content generation: If you encounter backpressure issues (where the producer generates data faster than the consumer can process it), you should throttle your content generation rate. This can help prevent buffer overflows and ensure a smooth streaming experience.
Test the bidirectional streaming query method
You can test the bidirectional streaming query locally by calling the bidi_stream_query
method and iterating through the results:
import
asyncio
import
pprint
import
time
request_queue
=
asyncio
.
Queue
()
async
def
generate_input
():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue
.
put_nowait
(
{
"input"
:
"What is the exchange rate from US dolloars to Swedish currency"
}
)
time
.
sleep
(
5
)
request_queue
.
put_nowait
(
{
"input"
:
"What is the exchange rate from US dolloars to Euro currency"
}
)
time
.
sleep
(
5
)
request_queue
.
put_nowait
(
"END"
)
async
def
print_query_result
():
async
for
chunk
in
agent
.
bidi_stream_query
(
request_queue
):
pprint
.
pprint
(
chunk
,
depth
=
1
)
input_task
=
asyncio
.
create_task
(
generate_input
())
output_task
=
asyncio
.
create_task
(
print_query_result
())
await
asyncio
.
gather
(
input_task
,
output_task
,
return_exceptions
=
True
)
The same bidirectional query connection can handle multiple requests and responses. For each new request from the queue, the following example generates a stream of chunks containing different information about the response:
{
'actions'
:
[
...
],
'messages'
:
[
...
]}
{
'messages'
:
[
...
],
'steps'
:
[
...
]}
{
'messages'
:
[
...
],
'output'
:
'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK.
\n
'
}
{
'actions'
:
[
...
],
'messages'
:
[
...
]}
{
'messages'
:
[
...
],
'steps'
:
[
...
]}
{
'messages'
:
[
...
],
'output'
:
'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR.
\n
'
}
(Optional) Register custom methods
Operations can be registered as either standard (represented by an empty string ""
), streaming ( stream
), or bidirectional streaming ( bidi_stream
) execution modes.
from
typing
import
AsyncIterable
,
Iterable
class
CustomAgent
(
BidiStreamingAgent
):
# ... same get_state and get_state_history function definition.
async
def
get_state_bidi_mode
(
self
,
request_queue
:
asyncio
.
Queue
[
Any
]
)
-
> AsyncIterable
[
Any
]:
while
True
:
request
=
await
request_queue
.
get
()
if
request
==
"END"
:
break
yield
self
.
graph
.
get_state
(
request
)
.
_asdict
()
def
register_operations
(
self
):
return
{
# The list of synchrounous operations to be registered
""
:
[
"query"
,
"get_state"
]
# The list of streaming operations to be registered
"stream"
:
[
"stream_query"
,
"get_state_history"
]
# The list of bidi streaming operations to be registered
"bidi_stream"
:
[
"bidi_stream_query"
,
"get_state_bidi_mode"
]
}
Deploy an agent
Once you develop your agent as live_agent
, you can deploy the agent to Agent Engine by creating an Agent Engine instance.
Note that with the Gen AI SDK, all deployment configurations (additional packages and customized resource controls) are assigned as a value of config
when creating the Agent Engine instance.
Initialize the Gen AI client:
import
vertexai
client
=
vertexai
.
Client
(
project
=
PROJECT
,
location
=
LOCATION
)
Deploy the agent to Agent Engine:
remote_live_agent
=
client
.
agent_engines
.
create
(
agent
=
live_agent
,
config
=
{
"staging_bucket"
:
STAGING_BUCKET
,
"requirements"
:
[
"google-cloud-aiplatform[agent_engines,adk]==1.88.0"
,
"cloudpickle==3.0"
,
"websockets"
],
},
)
For information on the steps happening in the background during deployment, see Create an AgentEngine instance .
Get the agent resource ID:
remote_live_agent
.
api_resource
.
name
Use an agent
If you defined a bidi_stream_query
operation when developing your agent, you can bidirectional stream query the agent asynchronously using the Gen AI SDK for Python.
You can modify the following example with any data recognizable by your agent, using any applicable termination logic for input stream and output stream:
async
with
client
.
aio
.
live
.
agent_engines
.
connect
(
agent_engine
=
remote_live_agent
.
api_resource
.
name
,
config
=
{
"class_method"
:
"bidi_stream_query"
}
)
as
connection
:
while
True
:
#
input_str
=
input
(
"Enter your question: "
)
if
input_str
==
"exit"
:
break
await
connection
.
send
({
"input"
:
input_str
})
while
True
:
response
=
await
connection
.
receive
()
print
(
response
)
if
response
[
"bidiStreamOutput"
][
"output"
]
==
"end of turn"
:
break
Vertex AI Agent Engine Runtime streams responses as a sequence of iteratively generated objects. For example, a set of two responses in the first turn might look like the following:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Use an Agent Development Kit agent
If you developed your agent using Agent Development Kit (ADK), you can use bidirectional streaming to interact with the Gemini Live API .
The following example creates a conversation agent that takes user text questions and receives Gemini Live API response audio data:
import
numpy
as
np
from
google.adk.agents.live_request_queue
improt
LiveRequest
from
google.adk.events
import
Event
from
google.genai
import
types
def
prepare_live_request
(
input_text
:
str
)
-
> LiveRequest
:
part
=
types
.
Part
.
from_text
(
text
=
input_text
)
content
=
types
.
Content
(
parts
=
[
part
])
return
LiveRequest
(
content
=
content
)
async
with
client
.
aio
.
live
.
agent_engines
.
connect
(
agent_engine
=
remote_live_agent
.
api_resource
.
name
,
config
=
{
"class_method"
:
"bidi_stream_query"
,
"input"
:
{
"input_str"
:
"hello"
},
})
as
connection
:
first_req
=
True
while
True
:
input_text
=
input
(
"Enter your question: "
)
if
input_text
=
"exit"
:
break
if
first_req
:
await
connection
.
send
({
"user_id"
:
USER_ID
,
"live_request"
:
prepare_live_request
(
input_text
)
.
dict
()
})
first_req
=
False
else
:
await
connection
.
send
(
prepare_live_request
(
input_text
)
.
dict
())
audio_data
=
[]
while
True
:
async
def
receive
():
return
await
connection
.
receive
()
receiving
=
asyncio
.
Task
(
receive
())
done
,
_
=
await
asyncio
.
wait
([
receiving
])
if
receiving
not
in
done
:
receiving
.
cancel
()
break
event
=
Event
.
model_validate
(
receiving
.
result
()[
"bidiStreamOutput"
])
part
=
event
.
content
and
event
.
content
.
parts
and
event
.
content
.
parts
[
0
]
if
part
.
inline_data
and
part
.
inline_data
.
data
:
chunk_data
=
part
.
inline_data
.
data
data
=
np
.
frombuffer
(
chunk_data
,
dtype
=
np
.
int16
)
audio_data
.
append
(
data
)
else
:
print
(
part
)
if
audio_data
:
concatenated_audio
=
np
.
concatenate
(
audio_data
)
display
(
Audio
(
concatenated_audio
,
rate
=
24000
,
autoplay
=
True
))
What's next
- Learn more about the Gemini Live API .