Run a Data Analytics DAG in Google Cloud using data from AWS

Cloud Composer 3  |  Cloud Composer 2  |  Cloud Composer 1

This tutorial is a modification of Run a Data Analytics DAG in Google Cloud that shows how to connect your Cloud Composer environment to Amazon Web Services to utilize data stored there. It shows how to use Cloud Composer to create an Apache Airflow DAG. The DAG joins data from a BigQuery public dataset and a CSV file stored in an Amazon Web Services (AWS) S3 bucket and then runs a Dataproc Serverless batch job to process the joined data.

The BigQuery public dataset in this tutorial is ghcn_d , an integrated database of climate summaries across the globe. The CSV file contains information about the dates and names of US holidays from 1997 to 2021.

The question we want to answer using the DAG is: "How warm was it in Chicago on Thanksgiving for the past 25 years?"

Objectives

  • Create a Cloud Composer environment in the default configuration
  • Create a bucket in AWS S3
  • Create an empty BigQuery dataset
  • Create a new Cloud Storage bucket
  • Create and run a DAG that includes the following tasks:
    • Load an external dataset from S3 to Cloud Storage
    • Load an external dataset from Cloud Storage to BigQuery
    • Join two datasets in BigQuery
    • Run a data analytics PySpark job

Before you begin

Manage permissions in AWS

  1. Create an AWS account .

  2. Follow the "Creating policies with the visual editor section" of the Creating IAM Policies AWS tutorial to create a customized IAM policy for AWS S3 with the following configuration:

    • Service:S3
    • ListAllMyBuckets( s3:ListAllMyBuckets ), for viewing your S3 bucket
    • CreateBucket( s3:CreateBucket ), for creating a bucket
    • PutBucketOwnershipControls( s3:PutBucketOwnershipControls ), for creating a bucket
    • ListBucket( s3:ListBucket ), for granting permission to list objects in a S3 bucket
    • PutObject( s3:PutObject ), for uploading files to a bucket
    • GetBucketVersioning( s3:GetBucketVersioning ), for deleting an object in a bucket
    • DeleteObject( s3:DeleteObject ), for deleting an object in a bucket
    • ListBucketVersions( s3:ListBucketVersions ), for deleting a bucket
    • DeleteBucket( s3:DeleteBucket ), for deleting a bucket
    • Resources:Choose "Any" next to "bucket" and "object" to grant permissions to any resources of that type.
    • Tag:None
    • Name:TutorialPolicy

    Refer to the list of actions supported in Amazon S3 for more information about each configuration found above.

  3. Add the TutorialPolicyIAM policy to your identity

Enable APIs

Enable the following APIs:

Console

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud  
services  
 enable 
  
dataproc.googleapis.com  
  composer.googleapis.com  
  bigquery.googleapis.com  
  storage.googleapis.com

Grant permissions

Grant the following roles and permissions to your user account:

Create and prepare your Cloud Composer environment

  1. Create a Cloud Composer environment with default parameters:

  2. Grant the following roles to the service account used in your Cloud Composer environment in order for the Airflow workers to successfully run DAG tasks:

    • BigQuery User( roles/bigquery.user )
    • BigQuery Data Owner( roles/bigquery.dataOwner )
    • Service Account User( roles/iam.serviceAccountUser )
    • Dataproc Editor( roles/dataproc.editor )
    • Dataproc Worker( roles/dataproc.worker )
  1. Install the apache-airflow-providers-amazon PyPI package in your Cloud Composer environment.

  2. Create an empty BigQuery dataset with the following parameters:

    • Name: holiday_weather
    • Region: US
  3. Create a new Cloud Storage bucket in the US multiregion.

  4. Run the following command to enable private Google access on the default subnet in the region where you would like to run Dataproc Serverless to fulfill networking requirements . We recommend using the same region as your Cloud Composer environment.

     gcloud  
    compute  
    networks  
    subnets  
    update  
    default  
     \ 
      
    --region  
     DATAPROC_SERVERLESS_REGION 
      
     \ 
      
    --enable-private-ip-google-access 
    

Create an S3 bucket with default settings in your preferred region.

Connect to AWS from Cloud Composer

  1. Get your AWS access key ID and secret access key
  2. Add your AWS S3 connection using the Airflow UI:

    1. Go to Admin > Connections.
    2. Create a new connection with the following configuration:

      • Connection Id: aws_s3_connection
      • Connection Type: Amazon S3
      • Extras: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Data processing using Dataproc Serverless

Explore the example PySpark Job

The code shown below is an example PySpark job that converts temperature from tenths of a degree in Celsius to degrees Celsius. This job converts temperature data from the dataset into a different format.

  import 
  
 sys 
 from 
  
 py4j.protocol 
  
 import 
 Py4JJavaError 
 from 
  
 pyspark.sql 
  
 import 
 SparkSession 
 from 
  
 pyspark.sql.functions 
  
 import 
 col 
 if 
 __name__ 
 == 
 "__main__" 
 : 
 BUCKET_NAME 
 = 
 sys 
 . 
 argv 
 [ 
 1 
 ] 
 READ_TABLE 
 = 
 sys 
 . 
 argv 
 [ 
 2 
 ] 
 WRITE_TABLE 
 = 
 sys 
 . 
 argv 
 [ 
 3 
 ] 
 # Create a SparkSession, viewable via the Spark UI 
 spark 
 = 
 SparkSession 
 . 
 builder 
 . 
 appName 
 ( 
 "data_processing" 
 ) 
 . 
 getOrCreate 
 () 
 # Load data into dataframe if READ_TABLE exists 
 try 
 : 
 df 
 = 
 spark 
 . 
 read 
 . 
 format 
 ( 
 "bigquery" 
 ) 
 . 
 load 
 ( 
 READ_TABLE 
 ) 
 except 
 Py4JJavaError 
 as 
 e 
 : 
 raise 
 Exception 
 ( 
 f 
 "Error reading 
 { 
 READ_TABLE 
 } 
 " 
 ) 
 from 
  
 e 
 # Convert temperature from tenths of a degree in celsius to degrees celsius 
 df 
 = 
 df 
 . 
 withColumn 
 ( 
 "value" 
 , 
 col 
 ( 
 "value" 
 ) 
 / 
 10 
 ) 
 # Display sample of rows 
 df 
 . 
 show 
 ( 
 n 
 = 
 20 
 ) 
 # Write results to GCS 
 if 
 "--dry-run" 
 in 
 sys 
 . 
 argv 
 : 
 print 
 ( 
 "Data will not be uploaded to BigQuery" 
 ) 
 else 
 : 
 # Set GCS temp location 
 temp_path 
 = 
 BUCKET_NAME 
 # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector 
 # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run 
 # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes 
 # for other save mode options 
 df 
 . 
 write 
 . 
 format 
 ( 
 "bigquery" 
 ) 
 . 
 option 
 ( 
 "temporaryGcsBucket" 
 , 
 temp_path 
 ) 
 . 
 mode 
 ( 
 "overwrite" 
 ) 
 . 
 save 
 ( 
 WRITE_TABLE 
 ) 
 print 
 ( 
 "Data written to BigQuery" 
 ) 
 

Upload the PySpark file to Cloud Storage

To upload the PySpark file to Cloud Storage:

  1. Save data_analytics_process.py to your local machine.

  2. In the Google Cloud console go to the Cloud Storage browserpage:

    Go to Cloud Storage browser

  3. Click the name of the bucket you created earlier.

  4. In the Objectstab for the bucket, click the Upload filesbutton, select data_analytics_process.py in the dialog that appears, and click Open.

Upload the CSV file to AWS S3

To upload the holidays.csv file:

  1. Save holidays.csv on your local machine.
  2. Follow the AWS guide to upload the file to your bucket.

Data analytics DAG

Explore the example DAG

The DAG uses multiple operators to transform and unify the data:

  import 
  
 datetime 
 from 
  
 airflow 
  
 import 
 models 
 from 
  
 airflow.providers.google.cloud.operators 
  
 import 
 dataproc 
 from 
  
 airflow.providers.google.cloud.operators.bigquery 
  
 import 
 BigQueryInsertJobOperator 
 from 
  
 airflow.providers.google.cloud.transfers.gcs_to_bigquery 
  
 import 
 ( 
 GCSToBigQueryOperator 
 , 
 ) 
 from 
  
 airflow.providers.google.cloud.transfers.s3_to_gcs 
  
 import 
 S3ToGCSOperator 
 from 
  
 airflow.utils.task_group 
  
 import 
 TaskGroup 
 PROJECT_NAME 
 = 
 "{{var.value.gcp_project}}" 
 REGION 
 = 
 "{{var.value.gce_region}}" 
 # BigQuery configs 
 BQ_DESTINATION_DATASET_NAME 
 = 
 "holiday_weather" 
 BQ_DESTINATION_TABLE_NAME 
 = 
 "holidays_weather_joined" 
 BQ_NORMALIZED_TABLE_NAME 
 = 
 "holidays_weather_normalized" 
 # Dataproc configs 
 BUCKET_NAME 
 = 
 "{{var.value.gcs_bucket}}" 
 PYSPARK_JAR 
 = 
 "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar" 
 PROCESSING_PYTHON_FILE 
 = 
 f 
 "gs:// 
 { 
 BUCKET_NAME 
 } 
 /data_analytics_process.py" 
 # S3 configs 
 S3_BUCKET_NAME 
 = 
 "{{var.value.s3_bucket}}" 
 BATCH_ID 
 = 
 "data-processing-{{ ts_nodash | lower}}" 
 # Dataproc serverless only allows lowercase characters 
 BATCH_CONFIG 
 = 
 { 
 "pyspark_batch" 
 : 
 { 
 "jar_file_uris" 
 : 
 [ 
 PYSPARK_JAR 
 ], 
 "main_python_file_uri" 
 : 
 PROCESSING_PYTHON_FILE 
 , 
 "args" 
 : 
 [ 
 BUCKET_NAME 
 , 
 f 
 " 
 { 
 BQ_DESTINATION_DATASET_NAME 
 } 
 . 
 { 
 BQ_DESTINATION_TABLE_NAME 
 } 
 " 
 , 
 f 
 " 
 { 
 BQ_DESTINATION_DATASET_NAME 
 } 
 . 
 { 
 BQ_NORMALIZED_TABLE_NAME 
 } 
 " 
 , 
 ], 
 }, 
 "environment_config" 
 : 
 { 
 "execution_config" 
 : 
 { 
 "service_account" 
 : 
 "{{var.value.dataproc_service_account}}" 
 } 
 }, 
 } 
 yesterday 
 = 
 datetime 
 . 
 datetime 
 . 
 combine 
 ( 
 datetime 
 . 
 datetime 
 . 
 today 
 () 
 - 
 datetime 
 . 
 timedelta 
 ( 
 1 
 ), 
 datetime 
 . 
 datetime 
 . 
 min 
 . 
 time 
 () 
 ) 
 default_dag_args 
 = 
 { 
 # Setting start date as yesterday starts the DAG immediately when it is 
 # detected in the Cloud Storage bucket. 
 "start_date" 
 : 
 yesterday 
 , 
 # To email on failure or retry set 'email' arg to your email and enable 
 # emailing here. 
 "email_on_failure" 
 : 
 False 
 , 
 "email_on_retry" 
 : 
 False 
 , 
 } 
 with 
 models 
 . 
 DAG 
 ( 
 "s3_to_gcs_dag" 
 , 
 # Continue to run DAG once per day 
 schedule_interval 
 = 
 datetime 
 . 
 timedelta 
 ( 
 days 
 = 
 1 
 ), 
 default_args 
 = 
 default_dag_args 
 , 
 ) 
 as 
 dag 
 : 
 s3_to_gcs_op 
 = 
 S3ToGCSOperator 
 ( 
 task_id 
 = 
 "s3_to_gcs" 
 , 
 bucket 
 = 
 S3_BUCKET_NAME 
 , 
 gcp_conn_id 
 = 
 "google_cloud_default" 
 , 
 aws_conn_id 
 = 
 "aws_s3_connection" 
 , 
 dest_gcs 
 = 
 f 
 "gs:// 
 { 
 BUCKET_NAME 
 } 
 " 
 , 
 ) 
 create_batch 
 = 
 dataproc 
 . 
 DataprocCreateBatchOperator 
 ( 
 task_id 
 = 
 "create_batch" 
 , 
 project_id 
 = 
 PROJECT_NAME 
 , 
 region 
 = 
 REGION 
 , 
 batch 
 = 
 BATCH_CONFIG 
 , 
 batch_id 
 = 
 BATCH_ID 
 , 
 ) 
 load_external_dataset 
 = 
 GCSToBigQueryOperator 
 ( 
 task_id 
 = 
 "run_bq_external_ingestion" 
 , 
 bucket 
 = 
 BUCKET_NAME 
 , 
 source_objects 
 = 
 [ 
 "holidays.csv" 
 ], 
 destination_project_dataset_table 
 = 
 f 
 " 
 { 
 BQ_DESTINATION_DATASET_NAME 
 } 
 .holidays" 
 , 
 source_format 
 = 
 "CSV" 
 , 
 schema_fields 
 = 
 [ 
 { 
 "name" 
 : 
 "Date" 
 , 
 "type" 
 : 
 "DATE" 
 }, 
 { 
 "name" 
 : 
 "Holiday" 
 , 
 "type" 
 : 
 "STRING" 
 }, 
 ], 
 skip_leading_rows 
 = 
 1 
 , 
 write_disposition 
 = 
 "WRITE_TRUNCATE" 
 , 
 ) 
 with 
 TaskGroup 
 ( 
 "join_bq_datasets" 
 ) 
 as 
 bq_join_group 
 : 
 for 
 year 
 in 
 range 
 ( 
 1997 
 , 
 2022 
 ): 
 BQ_DATASET_NAME 
 = 
 f 
 "bigquery-public-data.ghcn_d.ghcnd_ 
 { 
 str 
 ( 
 year 
 ) 
 } 
 " 
 BQ_DESTINATION_TABLE_NAME 
 = 
 "holidays_weather_joined" 
 # Specifically query a Chicago weather station 
 WEATHER_HOLIDAYS_JOIN_QUERY 
 = 
 f 
 """ 
 SELECT Holidays.Date, Holiday, id, element, value 
 FROM ` 
 { 
 PROJECT_NAME 
 } 
 .holiday_weather.holidays` AS Holidays 
 JOIN (SELECT id, date, element, value FROM 
 { 
 BQ_DATASET_NAME 
 } 
 AS Table 
 WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather 
 ON Holidays.Date = Weather.Date; 
 """ 
 # For demo purposes we are using WRITE_APPEND 
 # but if you run the DAG repeatedly it will continue to append 
 # Your use case may be different, see the Job docs 
 # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job 
 # for alternative values for the writeDisposition 
 # or consider using partitioned tables 
 # https://cloud.google.com/bigquery/docs/partitioned-tables 
 bq_join_holidays_weather_data 
 = 
 BigQueryInsertJobOperator 
 ( 
 task_id 
 = 
 f 
 "bq_join_holidays_weather_data_ 
 { 
 str 
 ( 
 year 
 ) 
 } 
 " 
 , 
 configuration 
 = 
 { 
 "query" 
 : 
 { 
 "query" 
 : 
 WEATHER_HOLIDAYS_JOIN_QUERY 
 , 
 "useLegacySql" 
 : 
 False 
 , 
 "destinationTable" 
 : 
 { 
 "projectId" 
 : 
 PROJECT_NAME 
 , 
 "datasetId" 
 : 
 BQ_DESTINATION_DATASET_NAME 
 , 
 "tableId" 
 : 
 BQ_DESTINATION_TABLE_NAME 
 , 
 }, 
 "writeDisposition" 
 : 
 "WRITE_APPEND" 
 , 
 } 
 }, 
 location 
 = 
 "US" 
 , 
 ) 
 s3_to_gcs_op 
>> load_external_dataset 
>> bq_join_group 
>> create_batch 
 

Use the Airflow UI to add variables

In Airflow, variables are an universal way to store and retrieve arbitrary settings or configurations as a simple key value store. This DAG uses Airflow variables to store common values. To add them to your environment:

  1. Access the Airflow UI from the Cloud Composer console .

  2. Go to Admin > Variables.

  3. Add the following variables:

    • s3_bucket : the name of the S3 bucket you created earlier.

    • gcp_project : your project ID.

    • gcs_bucket : the name of the bucket you created earlier (without the gs:// prefix).

    • gce_region : the region where you want your Dataproc job that meets Dataproc Serverless networking requirements . This is the region where you enabled private Google access earlier.

    • dataproc_service_account : the service account for your Cloud Composer environment. You can find this service account on the environment configuration tab for your Cloud Composer environment.

Upload the DAG to your environment's bucket

Cloud Composer schedules DAGs that are located in the /dags folder in your environment's bucket. To upload the DAG using the Google Cloud console:

  1. On your local machine, save s3togcsoperator_tutorial.py .

  2. In Google Cloud console, go to the Environmentspage.

    Go to Environments

  3. In the list of environments, in the DAG foldercolumn click the DAGslink. The DAGs folder of your environment opens.

  4. Click Upload files.

  5. Select s3togcsoperator_tutorial.py on your local machine and click Open.

Trigger the DAG

  1. In your Cloud Composer environment, click the DAGstab.

  2. Click into DAG id s3_to_gcs_dag .

  3. Click Trigger DAG.

  4. Wait about five to ten minutes until you see a green check indicating the tasks have been completed successfully.

Validate the DAG's success

  1. In Google Cloud console, go to the BigQuerypage.

    Go to BigQuery

  2. In the Explorerpanel, click your project name.

  3. Click holidays_weather_joined .

  4. Click preview to view the resulting table. Note that the numbers in the value column are in tenths of a degree Celsius.

  5. Click holidays_weather_normalized .

  6. Click preview to view the resulting table. Note that the numbers in the value column are in degree Celsius.

Cleanup

Delete individual resources that you created for this tutorial:

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: