Monitor pipeline status

This page describes how to publish Cloud Data Fusion pipeline events, such as pipeline status, to Pub/Sub topics. It also describes how to create Cloud Run functions that process the Pub/Sub messages and take actions, such as identifying and retrying failed pipelines.

Before you begin

  • Create a topic where Pub/Sub can publish Cloud Data Fusion pipeline events.

Required roles

To ensure that the Cloud Data Fusion Service Account has the necessary permissions to publish pipeline events to a Pub/Sub topic, ask your administrator to grant the Cloud Data Fusion Service Account the Pub/Sub Publisher ( roles/pubsub.publisher ) IAM role on the project where you create the Pub/Sub topic.

For more information about granting roles, see Manage access to projects, folders, and organizations .

Your administrator might also be able to give the Cloud Data Fusion Service Account the required permissions through custom roles or other predefined roles .

Manage event publishing in a Cloud Data Fusion instance

You can manage event publishing in new and existing Cloud Data Fusion instances using the REST API in versions 6.7.0 and later.

Publish events in a new instance

Create a new instance and include the EventPublishConfig field. For more information about required fields for new instances, see the Instances resource reference.

 curl  
-X  
POST  
 \ 
  
-H  
 "Authorization: Bearer 
 $( 
gcloud  
auth  
print-access-token ) 
 " 
  
 \ 
  
-H  
 "Content-Type: application/json" 
  
 \ 
  
 "https://datafusion.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ LOCATION 
/instances?instanceId= INSTANCE_ID 
" 
  
 \ 
  
-d  
 '{ 
 "version": " VERSION_NUMBER 
", 
 "event_publish_config": { 
 "enabled": true, 
 "topic": "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 } 
 }' 
 

Replace the following:

  • PROJECT_ID : the Google Cloud project ID
  • LOCATION : the location of your project
  • INSTANCE_ID : the ID of your Cloud Data Fusion instance
  • VERSION_NUMBER : The version of Cloud Data Fusion where you create the instance–for example, 6.10.1
  • TOPIC_ID : the ID of the Pub/Sub topic

Enable event publishing in an existing Cloud Data Fusion instance

Update the EventPublishConfig field in an existing Cloud Data Fusion instance:

 curl  
-X  
PATCH  
 \ 
  
-H  
 "Authorization: Bearer 
 $( 
gcloud  
auth  
print-access-token ) 
 " 
  
 \ 
  
-H  
 "Content-Type: application/json" 
  
 \ 
  
https://datafusion.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ LOCATION 
/instances/ INSTANCE_ID 
?updateMask = 
event_publish_config  
 \ 
  
-d  
 '{ 
 "event_publish_config": { 
 "enabled": true, 
 "topic": "projects/ PROJECT_ID 
/topics/ TOPIC_ID 
" 
 } 
 }' 
 

Replace the following:

  • PROJECT_ID : the Google Cloud project ID
  • LOCATION : the location of your project
  • INSTANCE_ID : the ID of your Cloud Data Fusion instance
  • TOPIC_ID : the ID of the Pub/Sub topic

Remove event publishing from an instance

To remove event publishing from an instance, update the event publishing enabled value to false :

 curl  
-X  
PATCH  
 \ 
  
-H  
 "Authorization: Bearer 
 $( 
gcloud  
auth  
print-access-token ) 
 " 
  
 \ 
  
-H  
 "Content-Type: application/json" 
  
 \ 
 "https://datafusion.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ LOCATION 
/instances/ INSTANCE_ID 
?updateMask=event_publish_config" 
  
 \ 
  
-d  
 '{ "event_publish_config": { "enabled": false } }' 
 

Create functions to read Pub/Sub messages

Cloud Run functions can read Pub/Sub messages and act on them, such as retrying failed pipelines. To make a Cloud Run functions, do the following:

  1. In the Google Cloud console, go to the Cloud Run functionspage.

    Go to Cloud Run functions

  2. Click Create function.

  3. Enter a function name and region.

  4. In the Trigger typefield, select Cloud Pub/Sub.

  5. Enter the Pub/Sub topic ID.

  6. Click Next.

  7. Add functions to read the Pub/Sub messages and take other actions. For example, you can add functions for the following use cases:

    • Send alerts for pipeline failures.
    • Send alerts for KPIs, such as record count or run information.
    • Restart a failed pipeline that hasn't been rerun.

    For Cloud Run function examples, see the use case section.

  8. Click Deploy. For more information, see Deploy a Cloud Run function .

Use case: Document pipeline status and retry failed pipelines

The following example Cloud Run functions read Pub/Sub messages about the pipeline run status, and then retry the failed pipelines in Cloud Data Fusion.

These example functions refer to the following Google Cloud components:

  • Google Cloud project: the project where Cloud Run functions and Pub/Sub topics are created
  • Pub/Sub topic: the Pub/Sub topic linked to your Cloud Data Fusion instance
  • Cloud Data Fusion instance: the Cloud Data Fusion instance where you design and execute pipelines
  • BigQuery table: the BigQuery table that captures the pipeline status and the run and rerun details
  • Cloud Run function: the Cloud Run function where you deploy the code that retries failed pipelines
  1. The following Cloud Run function example reads the Pub/Sub messages about Cloud Data Fusion status events.

      # Triggered from a message on a Pub/Sub topic. 
     @functions_framework 
     . 
     cloud_event 
     def 
      
     cdf_event_trigger 
     ( 
     cloud_event 
     ): 
     decoded_message 
     = 
     base64 
     . 
     b64decode 
     ( 
     cloud_event 
     . 
     data 
     [ 
     "message" 
     ][ 
     "data" 
     ]) 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     # Decode Pub/Sub message. 
     pubsub_message 
     = 
     json 
     . 
     loads 
     ( 
     decoded_message 
     ) 
     # Extract pipeline run details. 
     projectName 
     = 
     pubsub_message 
     [ 
     "projectName" 
     ] 
     publishTime 
     = 
     pubsub_message 
     [ 
     "publishTime" 
     ] 
     instanceName 
     = 
     pubsub_message 
     [ 
     "instanceName" 
     ] 
     namespace 
     = 
     pubsub_message 
     [ 
     "programStatusEventDetails" 
     ][ 
     "namespace" 
     ] 
     applicationName 
     = 
     pubsub_message 
     [ 
     "programStatusEventDetails" 
     ][ 
     "applicationName" 
     ] 
     status 
     = 
     pubsub_message 
     [ 
     "programStatusEventDetails" 
     ][ 
     "status" 
     ] 
     event_timestamp 
     = 
     pd 
     . 
     to_datetime 
     ( 
     pubsub_message 
     [ 
     "programStatusEventDetails" 
     ][ 
     "eventTime" 
     ], 
     unit 
     = 
     'ms' 
     ) 
     print 
     ( 
     f 
     "projectName: 
     { 
     projectName 
     } 
     " 
     ) 
     print 
     ( 
     f 
     "publishTime: 
     { 
     publishTime 
     } 
     " 
     ) 
     print 
     ( 
     f 
     "instanceName: 
     { 
     instanceName 
     } 
     " 
     ) 
     print 
     ( 
     f 
     "namespace: 
     { 
     namespace 
     } 
     " 
     ) 
     print 
     ( 
     f 
     "applicationName: 
     { 
     applicationName 
     } 
     " 
     ) 
     print 
     ( 
     f 
     "status: 
     { 
     status 
     } 
     " 
     ) 
     print 
     ( 
     f 
     "event timestamp: 
     { 
     event_timestamp 
     } 
     " 
     ) 
     try 
     : 
     error 
     = 
     pubsub_message 
     [ 
     "programStatusEventDetails" 
     ][ 
     "error" 
     ] 
     print 
     ( 
     f 
     "error: 
     { 
     error 
     } 
     " 
     ) 
     except 
     : 
     print 
     ( 
     f 
     "Pipeline: 
     { 
     applicationName 
     } 
     's current status: 
     { 
     status 
     } 
     " 
     ) 
     
    
  2. The following example function creates and saves a BigQuery table, and queries the pipeline run details.

      # Global variables. 
     pipeline_rerun_count 
     = 
     0 
     has_pipeline_failed_and_rerun_recently 
     = 
     False 
     # Timeframe: within last 60 minutes. 
     table_id 
     = 
     "bigquery-table-1" 
     # The BigQuery target table for storing pipeline run information. 
     # Update BigQuery table with the pipeline status and rerun details. 
     schema 
     = 
     [ 
     bigquery 
     . 
     SchemaField 
     ( 
     "Project_Name" 
     , 
     "STRING" 
     ), 
     bigquery 
     . 
     SchemaField 
     ( 
     "Instance_Name" 
     , 
     "STRING" 
     ), 
     bigquery 
     . 
     SchemaField 
     ( 
     "Namespace" 
     , 
     "STRING" 
     ), 
     bigquery 
     . 
     SchemaField 
     ( 
     "Pipeline_Name" 
     , 
     "STRING" 
     ), 
     bigquery 
     . 
     SchemaField 
     ( 
     "Pipeline_Status" 
     , 
     "STRING" 
     ), 
     bigquery 
     . 
     SchemaField 
     ( 
     "Event_Timestamp" 
     , 
     "TIMESTAMP" 
     ), 
     bigquery 
     . 
     SchemaField 
     ( 
     "Pipeline_Rerun_Count" 
     , 
     "INTEGER" 
     ), 
     ] 
     # Prepare DataFrame to load the data in BigQuery. 
     data 
     = 
     { 
     'Project_Name' 
     :[ 
     projectName 
     ], 
     'Instance_Name' 
     :[ 
     instanceName 
     ], 
     'Namespace' 
     :[ 
     namespace 
     ], 
     'Pipeline_Name' 
     :[ 
     applicationName 
     ], 
     'Pipeline_Status' 
     :[ 
     status 
     ], 
     'Event_Timestamp' 
     :[ 
     event_timestamp 
     ], 
     'Pipeline_Rerun_Count' 
     :[ 
     pipeline_rerun_count 
     ]} 
     dataframe 
     = 
     pd 
     . 
     DataFrame 
     ( 
     data 
     ) 
     # Prepare BigQuery data load job configuration. 
     job_config 
     = 
     bigquery 
     . 
     LoadJobConfig 
     ( 
     schema 
     = 
     schema 
     ) 
     job 
     = 
     bq_client 
     . 
     load_table_from_dataframe 
     ( 
     dataframe 
     , 
     table_id 
     , 
     job_config 
     = 
     job_config 
     ) 
     job 
     . 
     result 
     () 
     # Wait for the job to complete. 
     table 
     = 
     bq_client 
     . 
     get_table 
     ( 
     table_id 
     ) 
     # Make an API request. 
     print 
     ( 
     "BigQuery table: 
     {} 
     updated." 
     . 
     format 
     ( 
     table_id 
     )) 
     
    
  3. The following example function checks for pipelines that have failed and whether they were rerun in the last hour.

      bq_client 
     = 
     bigquery 
     . 
     Client 
     () 
     if 
     status 
     == 
     "FAILED" 
     : 
     print 
     ( 
     f 
     "ALERT -- Pipeline: 
     { 
     applicationName 
     } 
     has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes." 
     ) 
     QUERY 
     = 
     f 
     """ 
     SELECT * FROM ` 
     { 
     table_id 
     } 
     ` 
     WHERE Pipeline_Name = " 
     { 
     applicationName 
     } 
     " AND Pipeline_Status = "FAILED" 
     AND " 
     { 
     event_timestamp 
     } 
     " < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) 
     AND Pipeline_Rerun_Count > 0 
     """ 
     query_job 
     = 
     bq_client 
     . 
     query_and_wait 
     ( 
     QUERY 
     ) 
     # API request. 
     row_count 
     = 
     query_job 
     . 
     total_rows 
     # Waits for query to finish. 
     print 
     ( 
     f 
     "Query job result row count: 
     { 
     row_count 
     } 
     " 
     ) 
     if 
     ( 
     row_count 
    > 0 
     ): 
     print 
     ( 
     "Pipeline has FAILED and rerun recently..." 
     ) 
     global 
     has_pipeline_failed_and_rerun_recently 
     has_pipeline_failed_and_rerun_recently 
     = 
     True 
     
    
  4. If the failed pipeline hasn't run recently, the following example function reruns the failed pipeline.

      if 
     not 
     has_pipeline_failed_and_rerun_recently 
     : 
     applicationName 
     = 
     applicationName 
     auth_token 
     = 
     get_access_token 
     () 
     post_headers 
     = 
     { 
     "Authorization" 
     : 
     "Bearer " 
     + 
     auth_token 
     , 
     "Accept" 
     : 
     "application/json" 
     } 
     cdap_endpoint 
     = 
     "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" 
     run_pipeline_endpoint 
     = 
     cdap_endpoint 
     + 
     "/v3/namespaces/ 
     {} 
     /apps/ 
     {} 
     /workflows/DataPipelineWorkflow/start" 
     . 
     format 
     ( 
     namespace 
     , 
     applicationName 
     ) 
     # Start the job. 
     response 
     = 
     requests 
     . 
     post 
     ( 
     run_pipeline_endpoint 
     , 
     headers 
     = 
     post_headers 
     ) 
     print 
     ( 
     f 
     "Response for restarting the failed pipeline: 
     { 
     response 
     } 
     " 
     ) 
     global 
     pipeline_rerun_count 
     pipeline_rerun_count 
     = 
     1 
     
    

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: