Create and run a job that sends Pub/Sub status notifications

This document explains how to create a Batch job that sends Pub/Sub notifications. You can use Pub/Sub to get notifications when a job or task state changes, or when a job or task enters a specific state. For more information, see Monitor jobs using notifications .

Before you begin

  1. If you haven't used Batch before, review Get started with Batch and enable Batch by completing the prerequisites for projects and users .
  2. Create or identify a Pub/Sub topic for Batch notifications .
  3. Configure a subscription to receive and use the notifications .

Required roles

  • To get the permissions that you need to create and run a job that sends notifications, ask your administrator to grant you the following IAM roles:

    For more information about granting roles, see Manage access to projects, folders, and organizations .

    You might also be able to get the required permissions through custom roles or other predefined roles .

  • Unless you are using the default configuration for the job's service account , ensure that it has the necessary permissions.

    To ensure that the job's service account has the necessary permissions to publish Pub/Sub notifications, ask your administrator to grant the job's service account the Pub/Sub Publisher ( roles/pubsub.publisher ) IAM role on your Pub/Sub topic.

  • If you want a job to publish notifications to a Pub/Sub topic that is in a different project than the job, then the Batch service agent for the job's project must be granted permission to publish to that topic.

    To ensure that the Batch service agent for the job's project has the necessary permissions to publish Pub/Sub notifications to a Pub/Sub topic in another project, ask your administrator to grant the Batch service agent for the job's project the Pub/Sub Publisher ( roles/pubsub.publisher ) IAM role on the Pub/Sub topic.

Create and run a job that sends notifications

You can create a Batch job that sends Pub/Sub notifications by doing the following:

gcloud

Use the Google Cloud CLI to create a job that includes the notifications field and one or more jobNotification objects in the main body of the JSON file:

  { 
 ... 
  
 "notifications" 
 : 
  
 [ 
  
 { 
  
 "pubsubTopic" 
 : 
  
 "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 , 
  
 "message" 
 : 
  
 { 
  
  ATTRIBUTES 
 
  
 } 
  
 } 
  
 ] 
 ... 
 } 
 

Replace the following:

  • PROJECT_ID : the project ID of the project that contains the Pub/Sub topic.
  • TOPIC_ID : the Pub/Sub topic ID of the topic you created when you enabled Pub/Sub notifications .
  • ATTRIBUTES : specify one or more of the following attributes, which each let you receive notifications about the state of the job or all of its tasks.

    • For notifications about all job state changes, specify the following:

        "type" 
       : 
        
       "JOB_STATE_CHANGED" 
       
      
    • For notifications about a specific job state change, specify the following:

        "type" 
       : 
        
       "JOB_STATE_CHANGED" 
       , 
       "newJobState" 
       : 
        
       " JOB_STATE 
      " 
       
      

      Replace JOB_STATE with one of the following job states:

      • QUEUED
      • SCHEDULED
      • RUNNING
      • SUCCEEDED
      • FAILED

      For more information about job states, see Job lifecycle .

    • For notifications about all task state changes, specify the following:

        "type" 
       : 
        
       "TASK_STATE_CHANGED" 
       
      
    • For notifications about specific task state changes, specify the following:

        "type" 
       : 
        
       "TASK_STATE_CHANGED" 
       , 
       "newTaskState" 
       : 
        
       " TASK_STATE 
      " 
       
      

      Replace TASK_STATE with one of the following task states:

      • PENDING
      • ASSIGNED
      • RUNNING
      • SUCCEEDED
      • FAILED

      For more information about task states, see Job lifecycle .

For example, suppose you want to receive notifications about all job state changes and any time a task fails. To do so, you can have a JSON configuration file that is similar to the following:

  { 
  
 "taskGroups" 
 : 
  
 [ 
  
 { 
  
 "taskSpec" 
 : 
  
 { 
  
 "runnables" 
 : 
  
 [ 
  
 { 
  
 "script" 
 : 
  
 { 
  
 "text" 
 : 
  
 "echo Hello World! This is task $BATCH_TASK_INDEX." 
  
 } 
  
 } 
  
 ] 
  
 }, 
  
 "taskCount" 
 : 
  
 3 
 , 
  
 } 
  
 ], 
  
 "logsPolicy" 
 : 
  
 { 
  
 "destination" 
 : 
  
 "CLOUD_LOGGING" 
  
 }, 
  
 "notifications" 
 : 
  
 [ 
  
 { 
  
 "pubsubTopic" 
 : 
  
 "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 , 
  
 "message" 
 : 
  
 { 
  
 "type" 
 : 
  
 "JOB_STATE_CHANGED" 
  
 } 
  
 }, 
  
 { 
  
 "pubsubTopic" 
 : 
  
 "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 , 
  
 "message" 
 : 
  
 { 
  
 "type" 
 : 
  
 "TASK_STATE_CHANGED" 
 , 
  
 "newTaskState" 
 : 
  
 "FAILED" 
  
 } 
  
 } 
  
 ] 
 } 
 

API

Use the REST API to create a job that includes the notifications field and one or more jobNotification objects in the main body of the JSON file:

  { 
 ... 
  
 "notifications" 
 : 
  
 [ 
  
 { 
  
 "pubsubTopic" 
 : 
  
 "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 , 
  
 "message" 
 : 
  
 { 
  
  ATTRIBUTES 
 
  
 } 
  
 } 
  
 ] 
 ... 
 } 
 

Replace the following:

  • PROJECT_ID : the project ID of the project that contains the Pub/Sub topic.
  • TOPIC_ID : the Pub/Sub topic ID of the topic you created when you enabled Pub/Sub notifications .
  • ATTRIBUTES : specify one or more of the following attributes, which each let you receive notifications about the state of the job or all of its tasks.

    • For notifications about all job state changes, specify the following:

        "type" 
       : 
        
       "JOB_STATE_CHANGED" 
       
      
    • For notifications about a specific job state change, specify the following:

        "type" 
       : 
        
       "JOB_STATE_CHANGED" 
       , 
       "newJobState" 
       : 
        
       " JOB_STATE 
      " 
       
      

      Replace JOB_STATE with one of the following job states:

      • QUEUED
      • SCHEDULED
      • RUNNING
      • SUCCEEDED
      • FAILED

      For more information about job states, see Job lifecycle .

    • For notifications about all task state changes, specify the following:

        "type" 
       : 
        
       "TASK_STATE_CHANGED" 
       
      
    • For notifications about specific task state changes, specify the following:

        "type" 
       : 
        
       "TASK_STATE_CHANGED" 
       , 
       "newTaskState" 
       : 
        
       " TASK_STATE 
      " 
       
      

      Replace TASK_STATE with one of the following task states:

      • PENDING
      • ASSIGNED
      • RUNNING
      • SUCCEEDED
      • FAILED

      For more information about task states, see Job lifecycle .

For example, suppose you want to receive notifications about all job state changes and any time a task fails. To do so, you can have a JSON configuration file that is similar to the following:

  { 
  
 "taskGroups" 
 : 
  
 [ 
  
 { 
  
 "taskSpec" 
 : 
  
 { 
  
 "runnables" 
 : 
  
 [ 
  
 { 
  
 "script" 
 : 
  
 { 
  
 "text" 
 : 
  
 "echo Hello World! This is task $BATCH_TASK_INDEX." 
  
 } 
  
 } 
  
 ] 
  
 }, 
  
 "taskCount" 
 : 
  
 3 
 , 
  
 } 
  
 ], 
  
 "logsPolicy" 
 : 
  
 { 
  
 "destination" 
 : 
  
 "CLOUD_LOGGING" 
  
 }, 
  
 "notifications" 
 : 
  
 [ 
  
 { 
  
 "pubsubTopic" 
 : 
  
 "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 , 
  
 "message" 
 : 
  
 { 
  
 "type" 
 : 
  
 "JOB_STATE_CHANGED" 
  
 } 
  
 }, 
  
 { 
  
 "pubsubTopic" 
 : 
  
 "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 , 
  
 "message" 
 : 
  
 { 
  
 "type" 
 : 
  
 "TASK_STATE_CHANGED" 
 , 
  
 "newTaskState" 
 : 
  
 "FAILED" 
  
 } 
  
 } 
  
 ] 
 } 
 

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 batch 
  
 "cloud.google.com/go/batch/apiv1" 
  
 "cloud.google.com/go/batch/apiv1/batchpb" 
  
 durationpb 
  
 "google.golang.org/protobuf/types/known/durationpb" 
 ) 
 // Creates and runs a job with configured notifications 
 func 
  
 createJobWithNotifications 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 region 
 , 
  
 jobName 
 , 
  
 topicName 
  
 string 
 ) 
  
 ( 
 * 
 batchpb 
 . 
 Job 
 , 
  
 error 
 ) 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 batchClient 
 , 
  
 err 
  
 := 
  
 batch 
 . 
  NewClient 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "batchClient error: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 batchClient 
 . 
 Close 
 () 
  
 script 
  
 := 
  
& batchpb 
 . 
 Runnable_Script_ 
 { 
  
 Script 
 : 
  
& batchpb 
 . 
 Runnable_Script 
 { 
  
 Command 
 : 
  
& batchpb 
 . 
 Runnable_Script_Text 
 { 
  
 Text 
 : 
  
 "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks." 
 , 
  
 }, 
  
 }, 
  
 } 
  
 taskSpec 
  
 := 
  
& batchpb 
 . 
 TaskSpec 
 { 
  
 ComputeResource 
 : 
  
& batchpb 
 . 
 ComputeResource 
 { 
  
 // CpuMilli is milliseconds per cpu-second. This means the task requires 2 whole CPUs. 
  
 CpuMilli 
 : 
  
 2000 
 , 
  
 MemoryMib 
 : 
  
 16 
 , 
  
 }, 
  
 MaxRunDuration 
 : 
  
& durationpb 
 . 
 Duration 
 { 
  
 Seconds 
 : 
  
 3600 
 , 
  
 }, 
  
 MaxRetryCount 
 : 
  
 2 
 , 
  
 Runnables 
 : 
  
 [] 
 * 
 batchpb 
 . 
 Runnable 
{{
			Executable: script,
		} }, 
  
 } 
  
 taskGroups 
  
 := 
  
 [] 
 * 
 batchpb 
 . 
 TaskGroup 
 { 
  
 { 
  
 TaskCount 
 : 
  
 4 
 , 
  
 TaskSpec 
 : 
  
 taskSpec 
 , 
  
 }, 
  
 } 
  
 labels 
  
 := 
  
 map 
 [ 
 string 
 ] 
 string 
 { 
 "env" 
 : 
  
 "testing" 
 , 
  
 "type" 
 : 
  
 "container" 
 } 
  
 // Policies are used to define on what kind of virtual machines the tasks will run on. 
  
 // In this case, we tell the system to use "e2-standard-4" machine type. 
  
 // Read more about machine types here: https://cloud.google.com/compute/docs/machine-types 
  
 allocationPolicy 
  
 := 
  
& batchpb 
 . 
 AllocationPolicy 
 { 
  
 Instances 
 : 
  
 [] 
 * 
 batchpb 
 . 
 AllocationPolicy_InstancePolicyOrTemplate 
{{
			PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{
				Policy: &batchpb.AllocationPolicy_InstancePolicy{
					MachineType: "e2-standard-4",
				},
			},
		} }, 
  
 } 
  
 // We use Cloud Logging as it's an out of the box available option 
  
 logsPolicy 
  
 := 
  
& batchpb 
 . 
 LogsPolicy 
 { 
  
 Destination 
 : 
  
 batchpb 
 . 
  LogsPolicy_CLOUD_LOGGING 
 
 , 
  
 } 
  
 notifications 
  
 := 
  
 [] 
 * 
 batchpb 
 . 
 JobNotification 
 { 
  
 { 
  
 PubsubTopic 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/topics/%s" 
 , 
  
 projectID 
 , 
  
 topicName 
 ), 
  
 Message 
 : 
  
& batchpb 
 . 
 JobNotification_Message 
 { 
  
 Type 
 : 
  
 batchpb 
 . 
  JobNotification_JOB_STATE_CHANGED 
 
 , 
  
 }, 
  
 }, 
  
 { 
  
 PubsubTopic 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/topics/%s" 
 , 
  
 projectID 
 , 
  
 topicName 
 ), 
  
 Message 
 : 
  
& batchpb 
 . 
 JobNotification_Message 
 { 
  
 Type 
 : 
  
 batchpb 
 . 
  JobNotification_TASK_STATE_CHANGED 
 
 , 
  
 NewTaskState 
 : 
  
 batchpb 
 . 
  TaskStatus_FAILED 
 
 , 
  
 }, 
  
 }, 
  
 } 
  
 job 
  
 := 
  
& batchpb 
 . 
 Job 
 { 
  
 Name 
 : 
  
 jobName 
 , 
  
 TaskGroups 
 : 
  
 taskGroups 
 , 
  
 AllocationPolicy 
 : 
  
 allocationPolicy 
 , 
  
 Labels 
 : 
  
 labels 
 , 
  
 Notifications 
 : 
  
 notifications 
 , 
  
 LogsPolicy 
 : 
  
 logsPolicy 
 , 
  
 } 
  
 request 
  
 := 
  
& batchpb 
 . 
 CreateJobRequest 
 { 
  
 Parent 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/locations/%s" 
 , 
  
 projectID 
 , 
  
 region 
 ), 
  
 JobId 
 : 
  
 jobName 
 , 
  
 Job 
 : 
  
 job 
 , 
  
 } 
  
 created_job 
 , 
  
 err 
  
 := 
  
 batchClient 
 . 
 CreateJob 
 ( 
 ctx 
 , 
  
 request 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "unable to create job: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Job created: %v\n" 
 , 
  
 created_job 
 ) 
  
 return 
  
 created_job 
 , 
  
 nil 
 } 
 

Java

  import 
  
 com.google.cloud.batch.v1. BatchServiceClient 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. CreateJobRequest 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. Job 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. JobNotification 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. JobNotification 
. Message 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. JobNotification 
. Type 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. LogsPolicy 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. LogsPolicy 
. Destination 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. Runnable 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. Runnable 
. Script 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. TaskGroup 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. TaskSpec 
 
 ; 
 import 
  
 com.google.cloud.batch.v1. TaskStatus 
.State 
 ; 
 import 
  
 com.google.common.collect.Lists 
 ; 
 import 
  
 com.google.protobuf. Duration 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 CreateBatchNotification 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
 , 
  
 TimeoutException 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 // Project ID or project number of the Google Cloud project you want to use. 
  
 String 
  
 projectId 
  
 = 
  
 "YOUR_PROJECT_ID" 
 ; 
  
 // Name of the region you want to use to run the job. Regions that are 
  
 // available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations 
  
 String 
  
 region 
  
 = 
  
 "europe-central2" 
 ; 
  
 // The name of the job that will be created. 
  
 // It needs to be unique for each project and region pair. 
  
 String 
  
 jobName 
  
 = 
  
 "JOB_NAME" 
 ; 
  
 // The Pub/Sub topic ID to send the notifications to. 
  
 String 
  
 topicId 
  
 = 
  
 "TOPIC_ID" 
 ; 
  
 createBatchNotification 
 ( 
 projectId 
 , 
  
 region 
 , 
  
 jobName 
 , 
  
 topicId 
 ); 
  
 } 
  
 // Create a Batch job that sends notifications to Pub/Sub 
  
 public 
  
 static 
  
  Job 
 
  
 createBatchNotification 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 region 
 , 
  
 String 
  
 jobName 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
 , 
  
 TimeoutException 
  
 { 
  
 // Initialize client that will be used to send requests. This client only needs to be created 
  
 // once, and can be reused for multiple requests. 
  
 try 
  
 ( 
  BatchServiceClient 
 
  
 batchServiceClient 
  
 = 
  
  BatchServiceClient 
 
 . 
 create 
 ()) 
  
 { 
  
 // Define what will be done as part of the job. 
  
  Runnable 
 
  
 runnable 
  
 = 
  
  Runnable 
 
 . 
 newBuilder 
 () 
  
 . 
  setScript 
 
 ( 
  
  Script 
 
 . 
 newBuilder 
 () 
  
 . 
  setText 
 
 ( 
  
 "echo Hello world! This is task ${BATCH_TASK_INDEX}. " 
  
 + 
  
 "This job has a total of ${BATCH_TASK_COUNT} tasks." 
 ) 
  
 // You can also run a script from a file. Just remember, that needs to be a 
  
 // script that's already on the VM that will be running the job. 
  
 // Using setText() and setPath() is mutually exclusive. 
  
 // .setPath("/tmp/test.sh") 
  
 . 
 build 
 ()) 
  
 . 
 build 
 (); 
  
  TaskSpec 
 
  
 task 
  
 = 
  
  TaskSpec 
 
 . 
 newBuilder 
 () 
  
 // Jobs can be divided into tasks. In this case, we have only one task. 
  
 . 
  addRunnables 
 
 ( 
 runnable 
 ) 
  
 . 
  setMaxRetryCount 
 
 ( 
 2 
 ) 
  
 . 
  setMaxRunDuration 
 
 ( 
  Duration 
 
 . 
 newBuilder 
 (). 
 setSeconds 
 ( 
 3600 
 ). 
 build 
 ()) 
  
 . 
 build 
 (); 
  
 // Tasks are grouped inside a job using TaskGroups. 
  
 // Currently, it's possible to have only one task group. 
  
  TaskGroup 
 
  
 taskGroup 
  
 = 
  
  TaskGroup 
 
 . 
 newBuilder 
 () 
  
 . 
  setTaskCount 
 
 ( 
 3 
 ) 
  
 . 
  setParallelism 
 
 ( 
 1 
 ) 
  
 . 
  setTaskSpec 
 
 ( 
 task 
 ) 
  
 . 
 build 
 (); 
  
  Job 
 
  
 job 
  
 = 
  
  Job 
 
 . 
 newBuilder 
 () 
  
 . 
  addTaskGroups 
 
 ( 
 taskGroup 
 ) 
  
 . 
  addAllNotifications 
 
 ( 
 buildNotifications 
 ( 
 projectId 
 , 
  
 topicId 
 )) 
  
 . 
 putLabels 
 ( 
 "env" 
 , 
  
 "testing" 
 ) 
  
 . 
 putLabels 
 ( 
 "type" 
 , 
  
 "script" 
 ) 
  
 // We use Cloud Logging as it's an out of the box available option. 
  
 . 
  setLogsPolicy 
 
 ( 
  
  LogsPolicy 
 
 . 
 newBuilder 
 (). 
  setDestination 
 
 ( 
  Destination 
 
 . 
 CLOUD_LOGGING 
 )) 
  
 . 
 build 
 (); 
  
  CreateJobRequest 
 
  
 createJobRequest 
  
 = 
  
  CreateJobRequest 
 
 . 
 newBuilder 
 () 
  
 // The job's parent is the region in which the job will run. 
  
 . 
 setParent 
 ( 
 String 
 . 
 format 
 ( 
 "projects/%s/locations/%s" 
 , 
  
 projectId 
 , 
  
 region 
 )) 
  
 . 
 setJob 
 ( 
 job 
 ) 
  
 . 
  setJobId 
 
 ( 
 jobName 
 ) 
  
 . 
 build 
 (); 
  
  Job 
 
  
 result 
  
 = 
  
 batchServiceClient 
  
 . 
  createJobCallable 
 
 () 
  
 . 
 futureCall 
 ( 
 createJobRequest 
 ) 
  
 . 
 get 
 ( 
 5 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Successfully created the job: %s" 
 , 
  
 result 
 . 
  getName 
 
 ()); 
  
 return 
  
 result 
 ; 
  
 } 
  
 } 
  
 // Creates notification configurations to send messages to Pub/Sub when the state is changed 
  
 private 
  
 static 
  
 Iterable<JobNotification> 
  
 buildNotifications 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 { 
  
 String 
  
 pubsubTopic 
  
 = 
  
 String 
 . 
 format 
 ( 
 "projects/%s/topics/%s" 
 , 
  
 projectId 
 , 
  
 topicId 
 ); 
  
  JobNotification 
 
  
 jobStateChanged 
  
 = 
  
  JobNotification 
 
 . 
 newBuilder 
 () 
  
 . 
  setPubsubTopic 
 
 ( 
 pubsubTopic 
 ) 
  
 . 
 setMessage 
 ( 
  Message 
 
 . 
 newBuilder 
 (). 
 setType 
 ( 
  Type 
 
 . 
 JOB_STATE_CHANGED 
 )) 
  
 . 
 build 
 (); 
  
  JobNotification 
 
  
 taskStateChanged 
  
 = 
  
  JobNotification 
 
 . 
 newBuilder 
 () 
  
 . 
  setPubsubTopic 
 
 ( 
 pubsubTopic 
 ) 
  
 . 
 setMessage 
 ( 
  Message 
 
 . 
 newBuilder 
 () 
  
 . 
 setType 
 ( 
  Type 
 
 . 
 TASK_STATE_CHANGED 
 ) 
  
 . 
  setNewTaskState 
 
 ( 
 State 
 . 
 FAILED 
 )) 
  
 . 
 build 
 (); 
  
 return 
  
 Lists 
 . 
 newArrayList 
 ( 
 jobStateChanged 
 , 
  
 taskStateChanged 
 ); 
  
 } 
 } 
 

Node.js

  // Imports the Batch library 
 const 
  
 batchLib 
  
 = 
  
 require 
 ( 
 ' @google-cloud/batch 
' 
 ); 
 const 
  
 batch 
  
 = 
  
 batchLib 
 . 
 protos 
 . 
 google 
 . 
 cloud 
 . 
 batch 
 . 
 v1 
 ; 
 // Instantiates a client 
 const 
  
 batchClient 
  
 = 
  
 new 
  
 batchLib 
 . 
 v1 
 . 
  BatchServiceClient 
 
 (); 
 /** 
 * TODO(developer): Update these variables before running the sample. 
 */ 
 // Project ID or project number of the Google Cloud project you want to use. 
 const 
  
 PROJECT_ID 
  
 = 
  
 await 
  
 batchClient 
 . 
 getProjectId 
 (); 
 // Name of the region you want to use to run the job. Regions that are 
 // available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations 
 const 
  
 REGION 
  
 = 
  
 'europe-central2' 
 ; 
 // The name of the job that will be created. 
 // It needs to be unique for each project and region pair. 
 const 
  
 JOB_NAME 
  
 = 
  
 'job-name-batch-notifications' 
 ; 
 // The Pub/Sub topic ID to send the notifications to. 
 const 
  
 TOPIC_ID 
  
 = 
  
 'topic-id' 
 ; 
 // Define what will be done as part of the job. 
 const 
  
 task 
  
 = 
  
 new 
  
 batch 
 . 
 TaskSpec 
 (); 
 const 
  
 runnable 
  
 = 
  
 new 
  
 batch 
 . 
 Runnable 
 (); 
 runnable 
 . 
 script 
  
 = 
  
 new 
  
 batch 
 . 
 Runnable 
 . 
 Script 
 (); 
 runnable 
 . 
 script 
 . 
 commands 
  
 = 
  
 [ 
  
 '-c' 
 , 
  
 'echo Hello world! This is task ${BATCH_TASK_INDEX}.' 
 , 
 ]; 
 task 
 . 
 runnables 
  
 = 
  
 [ 
 runnable 
 ]; 
 task 
 . 
 maxRetryCount 
  
 = 
  
 2 
 ; 
 task 
 . 
 maxRunDuration 
  
 = 
  
 { 
 seconds 
 : 
  
 3600 
 }; 
 // Tasks are grouped inside a job using TaskGroups. 
 const 
  
 group 
  
 = 
  
 new 
  
 batch 
 . 
 TaskGroup 
 (); 
 group 
 . 
 taskCount 
  
 = 
  
 3 
 ; 
 group 
 . 
 taskSpec 
  
 = 
  
 task 
 ; 
 // Create batch notification when job state changed 
 const 
  
 notification1 
  
 = 
  
 new 
  
 batch 
 . 
 JobNotification 
 (); 
 notification1 
 . 
 pubsubTopic 
  
 = 
  
 `projects/ 
 ${ 
 PROJECT_ID 
 } 
 /topics/ 
 ${ 
 TOPIC_ID 
 } 
 ` 
 ; 
 notification1 
 . 
 message 
  
 = 
  
 { 
  
 type 
 : 
  
 'JOB_STATE_CHANGED' 
 , 
 }; 
 // Create batch notification when task state changed 
 const 
  
 notification2 
  
 = 
  
 new 
  
 batch 
 . 
 JobNotification 
 (); 
 notification2 
 . 
 pubsubTopic 
  
 = 
  
 `projects/ 
 ${ 
 PROJECT_ID 
 } 
 /topics/ 
 ${ 
 TOPIC_ID 
 } 
 ` 
 ; 
 notification2 
 . 
 message 
  
 = 
  
 { 
  
 type 
 : 
  
 'TASK_STATE_CHANGED' 
 , 
  
 newTaskState 
 : 
  
 'FAILED' 
 , 
 }; 
 const 
  
 job 
  
 = 
  
 new 
  
 batch 
 . 
 Job 
 (); 
 job 
 . 
 name 
  
 = 
  
 JOB_NAME 
 ; 
 job 
 . 
 taskGroups 
  
 = 
  
 [ 
 group 
 ]; 
 job 
 . 
 notifications 
  
 = 
  
 [ 
 notification1 
 , 
  
 notification2 
 ]; 
 job 
 . 
 labels 
  
 = 
  
 { 
 env 
 : 
  
 'testing' 
 , 
  
 type 
 : 
  
 'script' 
 }; 
 // We use Cloud Logging as it's an option available out of the box 
 job 
 . 
 logsPolicy 
  
 = 
  
 new 
  
 batch 
 . 
 LogsPolicy 
 (); 
 job 
 . 
 logsPolicy 
 . 
 destination 
  
 = 
  
 batch 
 . 
 LogsPolicy 
 . 
 Destination 
 . 
 CLOUD_LOGGING 
 ; 
 // The job's parent is the project and region in which the job will run 
 const 
  
 parent 
  
 = 
  
 `projects/ 
 ${ 
 PROJECT_ID 
 } 
 /locations/ 
 ${ 
 REGION 
 } 
 ` 
 ; 
 async 
  
 function 
  
 callCreateBatchNotifications 
 () 
  
 { 
  
 // Construct request 
  
 const 
  
 request 
  
 = 
  
 { 
  
 parent 
 , 
  
 jobId 
 : 
  
 JOB_NAME 
 , 
  
 job 
 , 
  
 }; 
  
 // Run request 
  
 const 
  
 [ 
 response 
 ] 
  
 = 
  
 await 
  
 batchClient 
 . 
 createJob 
 ( 
 request 
 ); 
  
 console 
 . 
 log 
 ( 
 JSON 
 . 
 stringify 
 ( 
 response 
 )); 
 } 
 await 
  
 callCreateBatchNotifications 
 (); 
 

Python

  from 
  
 google.cloud 
  
 import 
  batch_v1 
 
 def 
  
 create_with_pubsub_notification_job 
 ( 
 project_id 
 : 
 str 
 , 
 region 
 : 
 str 
 , 
 job_name 
 : 
 str 
 , 
 topic_name 
 : 
 str 
 ) 
 - 
> batch_v1 
 . 
 Job 
 : 
  
 """ 
 This method shows how to create a sample Batch Job that will run 
 a simple command inside a container on Cloud Compute instances. 
 Args: 
 project_id: project ID or project number of the Cloud project you want to use. 
 region: name of the region you want to use to run the job. Regions that are 
 available for Batch are listed on: https://cloud.google.com/batch/docs/locations 
 job_name: the name of the job that will be created. 
 It needs to be unique for each project and region pair. 
 topic_name: the name of the Pub/Sub topic to which the notification will be sent. 
 The topic should be created in GCP Pub/Sub before running this method. 
 The procedure for creating a topic is listed here: https://cloud.google.com/pubsub/docs/create-topic 
 Returns: 
 A job object representing the job created. 
 """ 
 client 
 = 
  batch_v1 
 
 . 
  BatchServiceClient 
 
 () 
 # Define what will be done as part of the job. 
 runnable 
 = 
  batch_v1 
 
 . 
  Runnable 
 
 () 
 runnable 
 . 
 container 
 = 
  batch_v1 
 
 . 
  Runnable 
 
 . 
  Container 
 
 () 
 runnable 
 . 
 container 
 . 
 image_uri 
 = 
 "gcr.io/google-containers/busybox" 
 runnable 
 . 
 container 
 . 
 entrypoint 
 = 
 "/bin/sh" 
 runnable 
 . 
 container 
 . 
 commands 
 = 
 [ 
 "-c" 
 , 
 "echo Hello world! This is task $ 
 {BATCH_TASK_INDEX} 
 . This job has a total of $ 
 {BATCH_TASK_COUNT} 
 tasks." 
 , 
 ] 
 # Jobs can be divided into tasks. In this case, we have only one task. 
 task 
 = 
  batch_v1 
 
 . 
  TaskSpec 
 
 () 
 task 
 . 
 runnables 
 = 
 [ 
 runnable 
 ] 
 # We can specify what resources are requested by each task. 
 resources 
 = 
  batch_v1 
 
 . 
  ComputeResource 
 
 () 
 resources 
 . 
 cpu_milli 
 = 
 2000 
 # in milliseconds per cpu-second. This means the task requires 2 whole CPUs. 
 resources 
 . 
 memory_mib 
 = 
 16 
 # in MiB 
 task 
 . 
 compute_resource 
 = 
 resources 
 task 
 . 
 max_retry_count 
 = 
 2 
 task 
 . 
 max_run_duration 
 = 
 "3600s" 
 # Tasks are grouped inside a job using TaskGroups. 
 # Currently, it's possible to have only one task group. 
 group 
 = 
  batch_v1 
 
 . 
  TaskGroup 
 
 () 
 group 
 . 
 task_count 
 = 
 4 
 group 
 . 
 task_spec 
 = 
 task 
 # Policies are used to define on what kind of virtual machines the tasks will run on. 
 # In this case, we tell the system to use "e2-standard-4" machine type. 
 # Read more about machine types here: https://cloud.google.com/compute/docs/machine-types 
 policy 
 = 
  batch_v1 
 
 . 
  AllocationPolicy 
 
 . 
  InstancePolicy 
 
 () 
 policy 
 . 
 machine_type 
 = 
 "e2-standard-4" 
 instances 
 = 
  batch_v1 
 
 . 
  AllocationPolicy 
 
 . 
  InstancePolicyOrTemplate 
 
 () 
 instances 
 . 
 policy 
 = 
 policy 
 allocation_policy 
 = 
  batch_v1 
 
 . 
  AllocationPolicy 
 
 () 
 allocation_policy 
 . 
 instances 
 = 
 [ 
 instances 
 ] 
 job 
 = 
  batch_v1 
 
 . 
  Job 
 
 () 
 job 
 . 
 task_groups 
 = 
 [ 
 group 
 ] 
 job 
 . 
 allocation_policy 
 = 
 allocation_policy 
 job 
 . 
 labels 
 = 
 { 
 "env" 
 : 
 "testing" 
 , 
 "type" 
 : 
 "container" 
 } 
 # We use Cloud Logging as it's an out of the box available option 
 job 
 . 
 logs_policy 
 = 
  batch_v1 
 
 . 
  LogsPolicy 
 
 () 
 job 
 . 
 logs_policy 
 . 
 destination 
 = 
  batch_v1 
 
 . 
  LogsPolicy 
 
 . 
  Destination 
 
 . 
 CLOUD_LOGGING 
 # Configuring the first notification 
 notification1 
 = 
  batch_v1 
 
 . 
  JobNotification 
 
 () 
 notification1 
 . 
 pubsub_topic 
 = 
 f 
 "projects/ 
 { 
 project_id 
 } 
 /topics/ 
 { 
 topic_name 
 } 
 " 
 # Define the message that will be sent to the topic 
 first_massage 
 = 
  batch_v1 
 
 . 
  JobNotification 
 
 . 
  Message 
 
 () 
 # Specify the new job state that will trigger the notification 
 # In this case, the notification is triggered when the job state changes to SUCCEEDED 
 first_massage 
 . 
 type_ 
 = 
  batch_v1 
 
 . 
  JobNotification 
 
 . 
  Type 
 
 . 
 JOB_STATE_CHANGED 
 first_massage 
 . 
 new_job_state 
 = 
  batch_v1 
 
 . 
  JobStatus 
 
 . 
 State 
 . 
 SUCCEEDED 
 # Assign the message to the notification 
 notification1 
 . 
 message 
 = 
 first_massage 
 # Configuring the second notification 
 notification2 
 = 
  batch_v1 
 
 . 
  JobNotification 
 
 () 
 notification2 
 . 
 pubsub_topic 
 = 
 f 
 "projects/ 
 { 
 project_id 
 } 
 /topics/ 
 { 
 topic_name 
 } 
 " 
 second_message 
 = 
  batch_v1 
 
 . 
  JobNotification 
 
 . 
  Message 
 
 () 
 second_message 
 . 
 type_ 
 = 
  batch_v1 
 
 . 
  JobNotification 
 
 . 
  Type 
 
 . 
 TASK_STATE_CHANGED 
 second_message 
 . 
 new_task_state 
 = 
  batch_v1 
 
 . 
  TaskStatus 
 
 . 
 State 
 . 
 FAILED 
 notification2 
 . 
 message 
 = 
 second_message 
 # Assign a list of notifications to the job. 
 job 
 . 
 notifications 
 = 
 [ 
 notification1 
 , 
 notification2 
 ] 
 create_request 
 = 
  batch_v1 
 
 . 
  CreateJobRequest 
 
 () 
 create_request 
 . 
 job 
 = 
 job 
 create_request 
 . 
 job_id 
 = 
 job_name 
 # The job's parent is the region in which the job will run 
 create_request 
 . 
 parent 
 = 
 f 
 "projects/ 
 { 
 project_id 
 } 
 /locations/ 
 { 
 region 
 } 
 " 
 return 
 client 
 . 
  create_job 
 
 ( 
 create_request 
 ) 
 

After the job starts running, you can use its notifications. For example, if the Pub/Sub topic for your job has a subscription that streams notifications to BigQuery, you can analyze the Pub/Sub notifications in BigQuery .

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: