Trigger DAGs in other environments and projects

Cloud Composer 3 |  Cloud Composer 2  |  Cloud Composer 1

This page demonstrates how to implement a DAG that triggers DAGs in other Cloud Composer environments and projects by using Airflow operators for Cloud Composer.

If you want to trigger DAGs in your environment instead, see Schedule and trigger DAGs .

Configure IAM permissions

If the target environment is located in another project, then the service account of your environment needs roles that allows interacting with environments in that project.

Project Resource Principal Role
Project where the target environment is located
Project Environment's service account of the source environment Composer Workerrole ( composer.worker )
Project where the target environment is located
Project Environment's service account of the source environment A custom role with the composer.environments.executeAirflowCommand permission

Trigger a DAG in another environment

The example DAG described in this section does the following:

  1. Trigger a DAG in another Cloud Composer environment.
  2. Checks if a DAG run is completed.

After the DAG run in another environment is completed, the example DAG is marked as successful.

Run Airflow CLI commands with CloudComposerRunAirflowCLICommandOperator

You can use the CloudComposerRunAirflowCLICommandOperator operator to run Airflow CLI commands in another Cloud Composer environment. The example DAG executes the dags trigger command, which triggers a DAG.

This operator can run in the deferrable mode , you can enable it by setting the deferrable parameter to True .

  run_airflow_cli_cmd 
 = 
 CloudComposerRunAirflowCLICommandOperator 
 ( 
 task_id 
 = 
 "run_airflow_cli_cmd" 
 , 
 project_id 
 = 
 "target-project" 
 , 
 environment_id 
 = 
 "target-composer-environment" 
 , 
 region 
 = 
 "us-central1" 
 , 
 command 
 = 
 "dags trigger -- target_dag" 
 , 
 # You can run this operator in the deferrable mode: 
 # deferrable=True 
 ) 
 

Check if a DAG run is completed

You can use the CloudComposerDAGRunSensor sensor to checks if a DAG run is completed in another Cloud Composer environment.

This sensor can run in the deferrable mode , you can enable it by setting the deferrable parameter to True .

  dag_run_sensor 
 = 
 CloudComposerDAGRunSensor 
 ( 
 task_id 
 = 
 "dag_run_sensor" 
 , 
 project_id 
 = 
 "target-project" 
 , 
 environment_id 
 = 
 "target-composer-environment" 
 , 
 region 
 = 
 "us-central1" 
 , 
 composer_dag_id 
 = 
 "target_dag" 
 , 
 allowed_states 
 = 
 [ 
 "success" 
 ], 
 # You can run this sensor in the deferrable mode: 
 # deferrable=True 
 ) 
 

Full example code

The following is the full code example of a DAG that combines the two previously described tasks.

  from 
  
 datetime 
  
 import 
 datetime 
 , 
 timedelta 
 from 
  
 airflow.models.dag 
  
 import 
 DAG 
 from 
  
 airflow.providers.google.cloud.operators.cloud_composer 
  
 import 
 ( 
 CloudComposerRunAirflowCLICommandOperator 
 , 
 ) 
 from 
  
 airflow.providers.google.cloud.sensors.cloud_composer 
  
 import 
 CloudComposerDAGRunSensor 
 DAG_ID 
 = 
 "trigger_dag_in_another_composer_environment" 
 TARGET_PROJECT_ID 
 = 
 "example-target-project" 
 TARGET_REGION 
 = 
 "example-target-region" 
 TARGET_ENV_ID 
 = 
 "example-target-composer-environment" 
 TARGET_DAG 
 = 
 "example_target_dag_id" 
 COMMAND 
 = 
 f 
 "dags trigger -- 
 { 
 TARGET_DAG 
 } 
 " 
 with 
 DAG 
 ( 
 DAG_ID 
 , 
 schedule 
 = 
 "@once" 
 , 
 start_date 
 = 
 datetime 
 ( 
 2024 
 , 
 1 
 , 
 1 
 ), 
 catchup 
 = 
 False 
 , 
 tags 
 = 
 [ 
 "example" 
 , 
 "composer" 
 ], 
 ) 
 as 
 dag 
 : 
 run_airflow_cli_cmd 
 = 
 CloudComposerRunAirflowCLICommandOperator 
 ( 
 task_id 
 = 
 "run_airflow_cli_cmd" 
 , 
 project_id 
 = 
 TARGET_PROJECT_ID 
 , 
 environment_id 
 = 
 TARGET_ENV_ID 
 , 
 region 
 = 
 TARGET_REGION 
 , 
 command 
 = 
 COMMAND 
 , 
 # You can run this operator in the deferrable mode: 
 # deferrable=True 
 ) 
 dag_run_sensor 
 = 
 CloudComposerDAGRunSensor 
 ( 
 task_id 
 = 
 "dag_run_sensor" 
 , 
 project_id 
 = 
 TARGET_PROJECT_ID 
 , 
 environment_id 
 = 
 TARGET_ENV_ID 
 , 
 region 
 = 
 TARGET_REGION 
 , 
 composer_dag_id 
 = 
 TARGET_DAG 
 , 
 allowed_states 
 = 
 [ 
 "success" 
 ], 
 execution_range 
 = 
 timedelta 
 ( 
 minutes 
 = 
 5 
 ), 
 # You can run this sensor in the deferrable mode: 
 # deferrable=True 
 ) 
 run_airflow_cli_cmd 
>> dag_run_sensor 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: