Use inline Dataproc workflows

Unlike standard workflows that instantiate a previously created workflow template resource, inline workflows use a YAML file or an embedded WorkflowTemplate definition to run a workflow.

.

Create and run an inline workflow

gcloud

See Instantiate a workflow using a YAML file .

REST

Before using any of the request data, make the following replacements:

HTTP method and URL:

POST https://dataproc.googleapis.com/v1/projects/ project-id 
/regions/ region 
/workflowTemplates:instantiateInline

Request JSON body:

{
  "jobs": [
    {
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "teragen",
          "1000",
          "hdfs:///gen/"
        ]
      },
      "stepId": "teragen"
    },
    {
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "terasort",
          "hdfs:///gen/",
          "hdfs:///sort/"
        ]
      },
      "stepId": "terasort",
      "prerequisiteStepIds": [
        "teragen"
      ]
    }
  ],
  "placement": {
    "managedCluster": {
      "clusterName": " cluster-name 
",
      "config": {
        "gceClusterConfig": {
          "zoneUri": " zone 
"
        }
      }
    }
  }
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
  "name": "projects/project-id/regions/region/operations/2fbd0dad-...",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata",
    "graph": {
      "nodes": [
        {
          "stepId": "teragen",
          "state": "RUNNABLE"
        },
        {
          "stepId": "terasort",
          "prerequisiteStepIds": [
            "teragen"
          ],
          "state": "BLOCKED"
        }
      ]
    },
    "state": "PENDING",
    "startTime": "2020-04-02T22:50:44.826Z"
  }
}

Console

Currently, the creation of inline workflows is not supported in the Google Cloud console. Workflow templates and instantiated workflows can be viewed from Dataproc Workflows page.

Go

  1. Install the client library
  2. Set up application default credentials
  3. Run the code
      import 
      
     ( 
      
     "context" 
      
     "fmt" 
      
     "io" 
      
     dataproc 
      
     "cloud.google.com/go/dataproc/apiv1" 
      
     "cloud.google.com/go/dataproc/apiv1/dataprocpb" 
      
     "google.golang.org/api/option" 
     ) 
     func 
      
     instantiateInlineWorkflowTemplate 
     ( 
     w 
      
     io 
     . 
     Writer 
     , 
      
     projectID 
     , 
      
     region 
      
     string 
     ) 
      
     error 
      
     { 
      
     // projectID := "your-project-id" 
      
     // region := "us-central1" 
      
     ctx 
      
     := 
      
     context 
     . 
     Background 
     () 
      
     // Create the cluster client. 
      
     endpoint 
      
     := 
      
     region 
      
     + 
      
     "-dataproc.googleapis.com:443" 
      
     workflowTemplateClient 
     , 
      
     err 
      
     := 
      
     dataproc 
     . 
      NewWorkflowTemplateClient 
     
     ( 
     ctx 
     , 
      
     option 
     . 
     WithEndpoint 
     ( 
     endpoint 
     )) 
      
     if 
      
     err 
      
     != 
      
     nil 
      
     { 
      
     return 
      
     fmt 
     . 
     Errorf 
     ( 
     "dataproc.NewWorkflowTemplateClient: %w" 
     , 
      
     err 
     ) 
      
     } 
      
     defer 
      
     workflowTemplateClient 
     . 
     Close 
     () 
      
     // Create jobs for the workflow. 
      
     teragenJob 
      
     := 
      
    & dataprocpb 
     . 
     OrderedJob 
     { 
      
     JobType 
     : 
      
    & dataprocpb 
     . 
     OrderedJob_HadoopJob 
     { 
      
     HadoopJob 
     : 
      
    & dataprocpb 
     . 
     HadoopJob 
     { 
      
     Driver 
     : 
      
    & dataprocpb 
     . 
     HadoopJob_MainJarFileUri 
     { 
      
     MainJarFileUri 
     : 
      
     "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar" 
     , 
      
     }, 
      
     Args 
     : 
      
     [] 
     string 
     { 
      
     "teragen" 
     , 
      
     "1000" 
     , 
      
     "hdfs:///gen/" 
     , 
      
     }, 
      
     }, 
      
     }, 
      
     StepId 
     : 
      
     "teragen" 
     , 
      
     } 
      
     terasortJob 
      
     := 
      
    & dataprocpb 
     . 
     OrderedJob 
     { 
      
     JobType 
     : 
      
    & dataprocpb 
     . 
     OrderedJob_HadoopJob 
     { 
      
     HadoopJob 
     : 
      
    & dataprocpb 
     . 
     HadoopJob 
     { 
      
     Driver 
     : 
      
    & dataprocpb 
     . 
     HadoopJob_MainJarFileUri 
     { 
      
     MainJarFileUri 
     : 
      
     "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar" 
     , 
      
     }, 
      
     Args 
     : 
      
     [] 
     string 
     { 
      
     "terasort" 
     , 
      
     "hdfs:///gen/" 
     , 
      
     "hdfs:///sort/" 
     , 
      
     }, 
      
     }, 
      
     }, 
      
     StepId 
     : 
      
     "terasort" 
     , 
      
     PrerequisiteStepIds 
     : 
      
     [] 
     string 
     { 
      
     "teragen" 
     , 
      
     }, 
      
     } 
      
     // Create the cluster placement. 
      
     clusterPlacement 
      
     := 
      
    & dataprocpb 
     . 
     WorkflowTemplatePlacement 
     { 
      
     Placement 
     : 
      
    & dataprocpb 
     . 
     WorkflowTemplatePlacement_ManagedCluster 
     { 
      
     ManagedCluster 
     : 
      
    & dataprocpb 
     . 
     ManagedCluster 
     { 
      
     ClusterName 
     : 
      
     "my-managed-cluster" 
     , 
      
     Config 
     : 
      
    & dataprocpb 
     . 
     ClusterConfig 
     { 
      
     GceClusterConfig 
     : 
      
    & dataprocpb 
     . 
     GceClusterConfig 
     { 
      
     // Leave "ZoneUri" empty for "Auto Zone Placement" 
      
     // ZoneUri: "" 
      
     ZoneUri 
     : 
      
     "us-central1-a" 
     , 
      
     }, 
      
     }, 
      
     }, 
      
     }, 
      
     } 
      
     // Create the Instantiate Inline Workflow Template Request. 
      
     req 
      
     := 
      
    & dataprocpb 
     . 
     InstantiateInlineWorkflowTemplateRequest 
     { 
      
     Parent 
     : 
      
     fmt 
     . 
     Sprintf 
     ( 
     "projects/%s/regions/%s" 
     , 
      
     projectID 
     , 
      
     region 
     ), 
      
     Template 
     : 
      
    & dataprocpb 
     . 
     WorkflowTemplate 
     { 
      
     Jobs 
     : 
      
     [] 
     * 
     dataprocpb 
     . 
     OrderedJob 
     { 
      
     teragenJob 
     , 
      
     terasortJob 
     , 
      
     }, 
      
     Placement 
     : 
      
     clusterPlacement 
     , 
      
     }, 
      
     } 
      
     // Create the cluster. 
      
     op 
     , 
      
     err 
      
     := 
      
     workflowTemplateClient 
     . 
     InstantiateInlineWorkflowTemplate 
     ( 
     ctx 
     , 
      
     req 
     ) 
      
     if 
      
     err 
      
     != 
      
     nil 
      
     { 
      
     return 
      
     fmt 
     . 
     Errorf 
     ( 
     "InstantiateInlineWorkflowTemplate: %w" 
     , 
      
     err 
     ) 
      
     } 
      
     if 
      
     err 
      
     := 
      
     op 
     . 
     Wait 
     ( 
     ctx 
     ); 
      
     err 
      
     != 
      
     nil 
      
     { 
      
     return 
      
     fmt 
     . 
     Errorf 
     ( 
     "InstantiateInlineWorkflowTemplate.Wait: %w" 
     , 
      
     err 
     ) 
      
     } 
      
     // Output a success message. 
      
     fmt 
     . 
     Fprintf 
     ( 
     w 
     , 
      
     "Workflow created successfully." 
     ) 
      
     return 
      
     nil 
     } 
     
    

Java

  1. Install the client library
  2. Set up application default credentials
  3. Run the code
      import 
      
     com.google.api.gax.longrunning. OperationFuture 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. ClusterConfig 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. GceClusterConfig 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. HadoopJob 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. ManagedCluster 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. OrderedJob 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. RegionName 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. WorkflowMetadata 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. WorkflowTemplate 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. WorkflowTemplatePlacement 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. WorkflowTemplateServiceClient 
     
     ; 
     import 
      
     com.google.cloud.dataproc.v1. WorkflowTemplateServiceSettings 
     
     ; 
     import 
      
     com.google.protobuf. Empty 
     
     ; 
     import 
      
     java.io.IOException 
     ; 
     import 
      
     java.util.concurrent.ExecutionException 
     ; 
     public 
      
     class 
     InstantiateInlineWorkflowTemplate 
      
     { 
      
     public 
      
     static 
      
     void 
      
     instantiateInlineWorkflowTemplate 
     () 
      
     throws 
      
     IOException 
     , 
      
     InterruptedException 
      
     { 
      
     // TODO(developer): Replace these variables before running the sample. 
      
     String 
      
     projectId 
      
     = 
      
     "your-project-id" 
     ; 
      
     String 
      
     region 
      
     = 
      
     "your-project-region" 
     ; 
      
     instantiateInlineWorkflowTemplate 
     ( 
     projectId 
     , 
      
     region 
     ); 
      
     } 
      
     public 
      
     static 
      
     void 
      
     instantiateInlineWorkflowTemplate 
     ( 
     String 
      
     projectId 
     , 
      
     String 
      
     region 
     ) 
      
     throws 
      
     IOException 
     , 
      
     InterruptedException 
      
     { 
      
     String 
      
     myEndpoint 
      
     = 
      
     String 
     . 
     format 
     ( 
     "%s-dataproc.googleapis.com:443" 
     , 
      
     region 
     ); 
      
     // Configure the settings for the workflow template service client. 
      
      WorkflowTemplateServiceSettings 
     
      
     workflowTemplateServiceSettings 
      
     = 
      
      WorkflowTemplateServiceSettings 
     
     . 
     newBuilder 
     (). 
     setEndpoint 
     ( 
     myEndpoint 
     ). 
     build 
     (); 
      
     // Create a workflow template service client with the configured settings. The client only 
      
     // needs to be created once and can be reused for multiple requests. Using a try-with-resources 
      
     // closes the client, but this can also be done manually with the .close() method. 
      
     try 
      
     ( 
      WorkflowTemplateServiceClient 
     
      
     workflowTemplateServiceClient 
      
     = 
      
      WorkflowTemplateServiceClient 
     
     . 
     create 
     ( 
     workflowTemplateServiceSettings 
     )) 
      
     { 
      
     // Configure the jobs within the workflow. 
      
      HadoopJob 
     
      
     teragenHadoopJob 
      
     = 
      
      HadoopJob 
     
     . 
     newBuilder 
     () 
      
     . 
     setMainJarFileUri 
     ( 
     "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar" 
     ) 
      
     . 
     addArgs 
     ( 
     "teragen" 
     ) 
      
     . 
     addArgs 
     ( 
     "1000" 
     ) 
      
     . 
     addArgs 
     ( 
     "hdfs:///gen/" 
     ) 
      
     . 
     build 
     (); 
      
      OrderedJob 
     
      
     teragen 
      
     = 
      
      OrderedJob 
     
     . 
     newBuilder 
     (). 
     setHadoopJob 
     ( 
     teragenHadoopJob 
     ). 
     setStepId 
     ( 
     "teragen" 
     ). 
     build 
     (); 
      
      HadoopJob 
     
      
     terasortHadoopJob 
      
     = 
      
      HadoopJob 
     
     . 
     newBuilder 
     () 
      
     . 
     setMainJarFileUri 
     ( 
     "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar" 
     ) 
      
     . 
     addArgs 
     ( 
     "terasort" 
     ) 
      
     . 
     addArgs 
     ( 
     "hdfs:///gen/" 
     ) 
      
     . 
     addArgs 
     ( 
     "hdfs:///sort/" 
     ) 
      
     . 
     build 
     (); 
      
      OrderedJob 
     
      
     terasort 
      
     = 
      
      OrderedJob 
     
     . 
     newBuilder 
     () 
      
     . 
     setHadoopJob 
     ( 
     terasortHadoopJob 
     ) 
      
     . 
     addPrerequisiteStepIds 
     ( 
     "teragen" 
     ) 
      
     . 
     setStepId 
     ( 
     "terasort" 
     ) 
      
     . 
     build 
     (); 
      
     // Configure the cluster placement for the workflow. 
      
     // Leave "ZoneUri" empty for "Auto Zone Placement". 
      
     // GceClusterConfig gceClusterConfig = 
      
     //     GceClusterConfig.newBuilder().setZoneUri("").build(); 
      
      GceClusterConfig 
     
      
     gceClusterConfig 
      
     = 
      
      GceClusterConfig 
     
     . 
     newBuilder 
     (). 
      setZoneUri 
     
     ( 
     "us-central1-a" 
     ). 
     build 
     (); 
      
      ClusterConfig 
     
      
     clusterConfig 
      
     = 
      
      ClusterConfig 
     
     . 
     newBuilder 
     (). 
      setGceClusterConfig 
     
     ( 
     gceClusterConfig 
     ). 
     build 
     (); 
      
      ManagedCluster 
     
      
     managedCluster 
      
     = 
      
      ManagedCluster 
     
     . 
     newBuilder 
     () 
      
     . 
     setClusterName 
     ( 
     "my-managed-cluster" 
     ) 
      
     . 
     setConfig 
     ( 
     clusterConfig 
     ) 
      
     . 
     build 
     (); 
      
      WorkflowTemplatePlacement 
     
      
     workflowTemplatePlacement 
      
     = 
      
      WorkflowTemplatePlacement 
     
     . 
     newBuilder 
     (). 
      setManagedCluster 
     
     ( 
     managedCluster 
     ). 
     build 
     (); 
      
     // Create the inline workflow template. 
      
      WorkflowTemplate 
     
      
     workflowTemplate 
      
     = 
      
      WorkflowTemplate 
     
     . 
     newBuilder 
     () 
      
     . 
     addJobs 
     ( 
     teragen 
     ) 
      
     . 
     addJobs 
     ( 
     terasort 
     ) 
      
     . 
     setPlacement 
     ( 
     workflowTemplatePlacement 
     ) 
      
     . 
     build 
     (); 
      
     // Submit the instantiated inline workflow template request. 
      
     String 
      
     parent 
      
     = 
      
      RegionName 
     
     . 
     format 
     ( 
     projectId 
     , 
      
     region 
     ); 
      
     OperationFuture<Empty 
     , 
      
     WorkflowMetadata 
    >  
     instantiateInlineWorkflowTemplateAsync 
      
     = 
      
     workflowTemplateServiceClient 
     . 
      instantiateInlineWorkflowTemplateAsync 
     
     ( 
      
     parent 
     , 
      
     workflowTemplate 
     ); 
      
      instantiateInlineWorkflowTemplateAsync 
     
     . 
     get 
     (); 
      
     // Print out a success message. 
      
     System 
     . 
     out 
     . 
     printf 
     ( 
     "Workflow ran successfully." 
     ); 
      
     } 
      
     catch 
      
     ( 
     ExecutionException 
      
     e 
     ) 
      
     { 
      
     System 
     . 
     err 
     . 
     println 
     ( 
     String 
     . 
     format 
     ( 
     "Error running workflow: %s " 
     , 
      
     e 
     . 
     getMessage 
     ())); 
      
     } 
      
     } 
     } 
     
    

Node.js

  1. Install the client library
  2. Set up application default credentials
  3. Run the code
  const 
  
 dataproc 
  
 = 
  
 require 
 ( 
 ' @google-cloud/dataproc 
' 
 ); 
 // TODO(developer): Uncomment and set the following variables 
 // projectId = 'YOUR_PROJECT_ID' 
 // region = 'YOUR_REGION' 
 // Create a client with the endpoint set to the desired region 
 const 
  
 client 
  
 = 
  
 new 
  
 dataproc 
 . 
 v1 
 . 
  WorkflowTemplateServiceClient 
 
 ({ 
  
 apiEndpoint 
 : 
  
 ` 
 ${ 
 region 
 } 
 -dataproc.googleapis.com` 
 , 
  
 projectId 
 : 
  
 projectId 
 , 
 }); 
 async 
  
 function 
  
 instantiateInlineWorkflowTemplate 
 () 
  
 { 
  
 // Create the formatted parent. 
  
 const 
  
 parent 
  
 = 
  
 client 
 . 
 regionPath 
 ( 
 projectId 
 , 
  
 region 
 ); 
  
 // Create the template 
  
 const 
  
 template 
  
 = 
  
 { 
  
 jobs 
 : 
  
 [ 
  
 { 
  
 hadoopJob 
 : 
  
 { 
  
 mainJarFileUri 
 : 
  
 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' 
 , 
  
 args 
 : 
  
 [ 
 'teragen' 
 , 
  
 '1000' 
 , 
  
 'hdfs:///gen/' 
 ], 
  
 }, 
  
 stepId 
 : 
  
 'teragen' 
 , 
  
 }, 
  
 { 
  
 hadoopJob 
 : 
  
 { 
  
 mainJarFileUri 
 : 
  
 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' 
 , 
  
 args 
 : 
  
 [ 
 'terasort' 
 , 
  
 'hdfs:///gen/' 
 , 
  
 'hdfs:///sort/' 
 ], 
  
 }, 
  
 stepId 
 : 
  
 'terasort' 
 , 
  
 prerequisiteStepIds 
 : 
  
 [ 
 'teragen' 
 ], 
  
 }, 
  
 ], 
  
 placement 
 : 
  
 { 
  
 managedCluster 
 : 
  
 { 
  
 clusterName 
 : 
  
 'my-managed-cluster' 
 , 
  
 config 
 : 
  
 { 
  
 gceClusterConfig 
 : 
  
 { 
  
 // Leave 'zoneUri' empty for 'Auto Zone Placement' 
  
 // zoneUri: '' 
  
 zoneUri 
 : 
  
 'us-central1-a' 
 , 
  
 }, 
  
 }, 
  
 }, 
  
 }, 
  
 }; 
  
 const 
  
 request 
  
 = 
  
 { 
  
 parent 
 : 
  
 parent 
 , 
  
 template 
 : 
  
 template 
 , 
  
 }; 
  
 // Submit the request to instantiate the workflow from an inline template. 
  
 const 
  
 [ 
 operation 
 ] 
  
 = 
  
 await 
  
 client 
 . 
 instantiateInlineWorkflowTemplate 
 ( 
 request 
 ); 
  
 await 
  
 operation 
 . 
 promise 
 (); 
  
 // Output a success message 
  
 console 
 . 
 log 
 ( 
 'Workflow ran successfully.' 
 ); 
 

Python

  1. Install the client library
  2. Set up application default credentials
  3. Run the code
      from 
      
     google.cloud 
      
     import 
     dataproc_v1 
     as 
     dataproc 
     def 
      
     instantiate_inline_workflow_template 
     ( 
     project_id 
     , 
     region 
     ): 
      
     """This sample walks a user through submitting a workflow 
     for a Cloud Dataproc using the Python client library. 
     Args: 
     project_id (string): Project to use for running the workflow. 
     region (string): Region where the workflow resources should live. 
     """ 
     # Create a client with the endpoint set to the desired region. 
     workflow_template_client 
     = 
     dataproc 
     . 
     WorkflowTemplateServiceClient 
     ( 
     client_options 
     = 
     { 
     "api_endpoint" 
     : 
     f 
     " 
     { 
     region 
     } 
     -dataproc.googleapis.com:443" 
     } 
     ) 
     parent 
     = 
     f 
     "projects/ 
     { 
     project_id 
     } 
     /regions/ 
     { 
     region 
     } 
     " 
     template 
     = 
     { 
     "jobs" 
     : 
     [ 
     { 
     "hadoop_job" 
     : 
     { 
     "main_jar_file_uri" 
     : 
     "file:///usr/lib/hadoop-mapreduce/" 
     "hadoop-mapreduce-examples.jar" 
     , 
     "args" 
     : 
     [ 
     "teragen" 
     , 
     "1000" 
     , 
     "hdfs:///gen/" 
     ], 
     }, 
     "step_id" 
     : 
     "teragen" 
     , 
     }, 
     { 
     "hadoop_job" 
     : 
     { 
     "main_jar_file_uri" 
     : 
     "file:///usr/lib/hadoop-mapreduce/" 
     "hadoop-mapreduce-examples.jar" 
     , 
     "args" 
     : 
     [ 
     "terasort" 
     , 
     "hdfs:///gen/" 
     , 
     "hdfs:///sort/" 
     ], 
     }, 
     "step_id" 
     : 
     "terasort" 
     , 
     "prerequisite_step_ids" 
     : 
     [ 
     "teragen" 
     ], 
     }, 
     ], 
     "placement" 
     : 
     { 
     "managed_cluster" 
     : 
     { 
     "cluster_name" 
     : 
     "my-managed-cluster" 
     , 
     "config" 
     : 
     { 
     "gce_cluster_config" 
     : 
     { 
     # Leave 'zone_uri' empty for 'Auto Zone Placement' 
     # 'zone_uri': '' 
     "zone_uri" 
     : 
     "us-central1-a" 
     } 
     }, 
     } 
     }, 
     } 
     # Submit the request to instantiate the workflow from an inline template. 
     operation 
     = 
     workflow_template_client 
     . 
     instantiate_inline_workflow_template 
     ( 
     request 
     = 
     { 
     "parent" 
     : 
     parent 
     , 
     "template" 
     : 
     template 
     } 
     ) 
     operation 
     . 
     result 
     () 
     # Output a success message. 
     print 
     ( 
     "Workflow ran successfully." 
     ) 
     
    
Design a Mobile Site
View Site in Mobile | Classic
Share by: