Use BigLake metastore with Dataproc

This document explains how to use BigLake metastore with Dataproc on Compute Engine . This connection provides you with a single, shared metastore that works across open source software engines, such as Apache Spark or Apache Flink.

Before you begin

  1. Enable billing for your Google Cloud project. Learn how to check if billing is enabled on a project .
  2. Enable the BigQuery, and Dataproc APIs.

    Enable the APIs

  3. Optional: Understand how BigLake metastore works and why you should use it.

Required roles

To get the permissions that you need to use Spark or Flink and Dataproc with BigLake metastore as a metadata store, ask your administrator to grant you the following IAM roles:

  • Create a Dataproc cluster: Dataproc Worker ( roles/dataproc.worker ) on the Compute Engine default service account in the project
  • Create BigLake metastore tables in Spark or Flink:
    • Dataproc Worker ( roles/dataproc.worker ) on the Dataproc VM service account in the project
    • BigQuery Data Editor ( roles/bigquery.dataEditor ) on the Dataproc VM service account in the project
    • Storage Object Admin ( roles/storage.objectAdmin ) on the Dataproc VM service account in the project
  • Query BigLake metastore tables in BigQuery:

For more information about granting roles, see Manage access to projects, folders, and organizations .

You might also be able to get the required permissions through custom roles or other predefined roles .

General workflow

To use Dataproc on Compute Engine with BigLake metastore, you follow these general steps:

  1. Create a Dataproc cluster or configure an existing cluster.
  2. Connect to your preferred open source software engine, such as Spark or Flink.
  3. Use a JAR file to install the Apache Iceberg catalog plugin on the cluster.
  4. Create and manage your BigLake metastore resources as needed, depending on the open source software engine that you're using.
  5. In BigQuery, access and use your BigLake metastore resources.

Connect BigLake metastore to Spark

The following instructions show you how to connect Dataproc to BigLake metastore using interactive Spark SQL.

Download the Iceberg catalog plugin

To connect BigLake metastore with Dataproc and Spark, you must use the BigLake metastore Iceberg catalog plugin jar file.

This file is included by default in Dataproc image version 2.2. If your Dataproc clusters don't have direct access to the internet, you must download the plugin and upload it to a Cloud Storage bucket that your Dataproc cluster can access.

Download the BigLake metastore Iceberg catalog plugin .

Configure a Dataproc cluster

Before you connect to BigLake metastore, you must set up a Dataproc cluster.

To do this, you can create a new cluster or use an existing cluster. After, you use this cluster to run interactive Spark SQL and manage your BigLake metastore resources.

  • The subnet in the region where the cluster is created must have Private Google Access (PGA) enabled. By default, Dataproc cluster VMs, created with a 2.2 (default) or later image version, have internal IP addresses only . To allow cluster VMs to communicate with Google APIs, enable Private Google Access on the default (or user-specified network name, if applicable) network subnet in the region where the cluster is created.

  • If you want to run the Zeppelin web interface example in this guide, you must use or create a Dataproc cluster with the Zeppelin optional component enabled.

New cluster

To create a new Dataproc cluster, run the following gcloud dataproc clusters create command. This configuration contains the settings that you need to use BigLake metastore.

gcloud  
dataproc  
clusters  
create  
 CLUSTER_NAME 
  
 \ 
  
--project = 
 PROJECT_ID 
  
 \ 
  
--region = 
 LOCATION 
  
 \ 
  
--optional-components = 
ZEPPELIN  
 \ 
  
--enable-component-gateway  
 \ 
  
--single-node

Replace the following:

  • CLUSTER_NAME : a name for your Dataproc cluster.
  • PROJECT_ID : the ID of the Google Cloud project where you're creating the cluster.
  • LOCATION : the Google Cloud region where you're creating the cluster.

Existing cluster

To configure an existing cluster, add the following Iceberg Spark runtime to your cluster.

 org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1 

You can add the runtime using one of the following options:

Submit a Spark job

To submit a Spark job, use one of the following methods:

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project= PROJECT_ID 
\
--cluster= CLUSTER_NAME 
\
--region== REGION 
\
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog. CATALOG_NAME 
=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog. CATALOG_NAME 
.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog. CATALOG_NAME 
.gcp_project= PROJECT_ID 
, \
spark.sql.catalog. CATALOG_NAME 
.gcp_location= LOCATION 
, \
spark.sql.catalog. CATALOG_NAME 
.warehouse= WAREHOUSE_DIRECTORY 
\
--execute=" SPARK_SQL_COMMAND 
"

Replace the following:

  • PROJECT_ID : the ID of the Google Cloud project that contains the Dataproc cluster.
  • CLUSTER_NAME : the name of the Dataproc cluster that you're using to run the Spark SQL job.
  • REGION : the Compute Engine region where your cluster is located.
  • LOCATION : the location of the BigQuery resources.
  • CATALOG_NAME : the name of the Spark catalog to that you're using with your SQL job.
  • WAREHOUSE_DIRECTORY : the Cloud Storage folder that contains your data warehouse. This value starts with gs:// .
  • SPARK_SQL_COMMAND : the Spark SQL query that you want to run. This query includes the commands to create your resources. For example, to create a namespace and table.

Interactive Spark

Connect to Spark and install the catalog plugin

To install the catalog plugin for BigLake metastore, connect to your Dataproc cluster using SSH.

  1. In the Google Cloud console, go to the VM Instances page.
  2. To connect to a Dataproc VM instance, click SSHin the list of virtual machine instances. The output is similar to the following:

     Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$ 
    
  3. In the terminal, run the following BigLake metastore initialization command:

    spark-sql  
     \ 
    --jars  
    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar  
     \ 
    --conf  
    spark.sql.catalog. CATALOG_NAME 
     = 
    org.apache.iceberg.spark.SparkCatalog  
     \ 
    --conf  
    spark.sql.catalog. CATALOG_NAME 
    .catalog-impl = 
    org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog  
     \ 
    --conf  
    spark.sql.catalog. CATALOG_NAME 
    .gcp_project = 
     PROJECT_ID 
      
     \ 
    --conf  
    spark.sql.catalog. CATALOG_NAME 
    .gcp_location = 
     LOCATION 
      
     \ 
    --conf  
    spark.sql.catalog. CATALOG_NAME 
    .warehouse = 
     WAREHOUSE_DIRECTORY 
    

    Replace the following:

    • CATALOG_NAME : the name of the Spark catalog to that you're using with your SQL job.
    • PROJECT_ID : the Google Cloud project ID of the BigLake metastore catalog that your Spark catalog links with.
    • LOCATION : the Google Cloud location of the BigLake metastore.
    • WAREHOUSE_DIRECTORY : the Cloud Storage folder that contains your data warehouse. This value starts with gs:// .

    After you successfully connect to a cluster, your Spark terminal displays the spark-sql prompt.

     spark-sql (default)> 
    

Manage BigLake metastore resources

You are now connected to BigLake metastore. You can view your existing resources or create new resources based on your metadata stored in BigLake metastore.

For example, try running the following commands in the interactive Spark SQL session to create an Iceberg namespace and table.

  • Use the custom Iceberg catalog:

     USE 
      
     ` CATALOG_NAME 
    ` 
     ; 
    
  • Create a namespace:

     CREATE 
      
     NAMESPACE 
      
     IF 
      
     NOT 
      
     EXISTS 
      
      NAMESPACE_NAME 
     
     ; 
    
  • Use the created namespace:

     USE 
      
      NAMESPACE_NAME 
     
     ; 
    
  • Create an Iceberg table:

     CREATE 
      
     TABLE 
      
      TABLE_NAME 
     
      
     ( 
     id 
      
     int 
     , 
      
     data 
      
     string 
     ) 
      
     USING 
      
     ICEBERG 
     ; 
    
  • Insert a table row:

     INSERT 
      
     INTO 
      
      TABLE_NAME 
     
      
     VALUES 
      
     ( 
     1 
     , 
      
     "first row" 
     ); 
    
  • Add a table column:

     ALTER 
      
     TABLE 
      
      TABLE_NAME 
     
      
     ADD 
      
     COLUMNS 
      
     ( 
     newDoubleCol 
      
     double 
     ); 
    
  • View table metadata:

     DESCRIBE 
      
     EXTENDED 
      
      TABLE_NAME 
     
     ; 
    
  • List tables in the namespace:

     SHOW 
      
     TABLES 
     ; 
    

Zeppelin notebook

  1. In the Google Cloud console, go to the Dataproc Clusterspage.

    Go to Dataproc clusters

  2. Click the name of cluster that you want to use.

    The Cluster Detailspage opens.

  3. In the navigation menu, click Web interfaces.

  4. Under Component gateway, click Zeppelin. The Zeppelinnotebook page opens.

  5. In the navigation menu, click Notebookand then click +Create new note.

  6. In the dialog, enter a notebook name. Leave Sparkselected as the default interpreter.

  7. Click Create. A new notebook is created.

  8. In the notebook, click the settings menu and then click Interpreter.

  9. In the Search interpretersfield, search for Spark.

  10. Click Edit.

  11. In the Spark.jarsfield, enter the URI of the Spark jar.

     https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar 
    
  12. Click Save.

  13. Click OK.

  14. Copy the following PySpark code into your Zeppelin notebook .

     % 
     pyspark 
     from 
      
     pyspark 
     . 
     sql 
      
     import 
      
     SparkSession 
     spark 
      
     = 
      
     SparkSession 
     . 
     builder 
      
     \ 
     . 
     appName 
     ( 
     "BigLake Metastore Iceberg" 
     ) 
      
     \ 
     . 
     config 
     ( 
     "spark.sql.catalog. CATALOG_NAME 
    " 
     , 
      
     "org.apache.iceberg.spark.SparkCatalog" 
     ) 
      
     \ 
     . 
     config 
     ( 
     "spark.sql.catalog. CATALOG_NAME 
    .catalog-impl" 
     , 
      
     "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" 
     ) 
      
     \ 
     . 
     config 
     ( 
     "spark.sql.catalog. CATALOG_NAME 
    .gcp_project" 
     , 
      
     " PROJECT_ID 
    " 
     ) 
      
     \ 
     . 
     config 
     ( 
     "spark.sql.catalog. CATALOG_NAME 
    .gcp_location" 
     , 
      
     " LOCATION 
    " 
     ) 
      
     \ 
     . 
     config 
     ( 
     "spark.sql.catalog. CATALOG_NAME 
    .warehouse" 
     , 
      
     " WAREHOUSE_DIRECTORY 
    " 
     ) 
      
     \ 
     . 
     getOrCreate 
     () 
     spark 
     . 
     sql 
     ( 
     "select version()" 
     ). 
     show 
     () 
     spark 
     . 
     sql 
     ( 
     "USE ` CATALOG_NAME 
    `;" 
     ) 
     spark 
     . 
     sql 
     ( 
     "CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME 
    ;" 
     ) 
     spark 
     . 
     sql 
     ( 
     "USE NAMESPACE_NAME 
    ;" 
     ) 
     spark 
     . 
     sql 
     ( 
     "CREATE TABLE TABLE_NAME 
    (id int, data string) USING ICEBERG;" 
     ) 
     spark 
     . 
     sql 
     ( 
     "DESCRIBE TABLE_NAME 
    ;" 
     ). 
     show 
     () 
    

    Replace the following:

    • CATALOG_NAME : the name of the Spark catalog to use for the SQL job.
    • PROJECT_ID : the ID of the Google Cloud project that contains the Dataproc cluster.
    • WAREHOUSE_DIRECTORY : the Cloud Storage folder that contains your data warehouse. This value starts with gs:// .
    • NAMESPACE_NAME : the namespace name that references your Spark table.
    • WAREHOUSE_DIRECTORY : the URI of the Cloud Storage folder where your data warehouse is stored.
    • TABLE_NAME : a table name for your Spark table.
  15. Click the run icon or press Shift-Enter to run the code. When the job completes, the status message shows "Spark Job Finished", and the output displays the table contents:

The following instructions show you how to connect Dataproc to BigLake metastore using the Flink SQL client.

To connect BigLake metastore to Flink, do the following:

  1. Create a Dataproc cluster with the optional Flink component enabled , and ensure that you're using Dataproc 2.2 or later.
  2. In the Google Cloud console, go to the VM instancespage.

    Go to VM instances

  3. In the list of virtual machine instances, click SSHto connect to a Dataproc VM instance.

  4. Configure the Iceberg custom catalog plugin for BigLake metastore:

     FLINK_VERSION 
     = 
     1 
    .17 ICEBERG_VERSION 
     = 
     1 
    .5.2 cd 
      
    /usr/lib/flink
    
    sudo  
    wget  
    -c  
    https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime- ${ 
     FLINK_VERSION 
     } 
    / ${ 
     ICEBERG_VERSION 
     } 
    /iceberg-flink-runtime- ${ 
     FLINK_VERSION 
     } 
    - ${ 
     ICEBERG_VERSION 
     } 
    .jar  
    -P  
    lib
    
    sudo  
    gcloud  
    storage  
    cp  
    gs://spark-lib/bigquery/iceberg-bigquery-catalog- ${ 
     ICEBERG_VERSION 
     } 
    -1.0.1-beta.jar  
    lib/
  5. Start the Flink session on YARN:

     HADOOP_CLASSPATH 
     = 
     ` 
    hadoop  
    classpath ` 
    sudo  
    bin/yarn-session.sh  
    -nm  
    flink-dataproc  
    -d
    
    sudo  
    bin/sql-client.sh  
    embedded  
     \ 
      
    -s  
    yarn-session
  6. Create a catalog in Flink:

     CREATE 
      
     CATALOG 
      
      CATALOG_NAME 
     
      
     WITH 
      
     ( 
      
     'type' 
     = 
     'iceberg' 
     , 
      
     'warehouse' 
     = 
     ' WAREHOUSE_DIRECTORY 
    ' 
     , 
      
     'catalog-impl' 
     = 
     'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog' 
     , 
      
     'gcp_project' 
     = 
     ' PROJECT_ID 
    ' 
     , 
      
     'gcp_location' 
     = 
     ' LOCATION 
    ' 
     ); 
    

    Replace the following:

    • CATALOG_NAME : the Flink catalog identifier, which is linked to a BigLake metastore catalog.
    • WAREHOUSE_DIRECTORY : the base path for the warehouse directory (the Cloud Storage folder where Flink creates files). This value starts with gs:// .
    • PROJECT_ID : the project ID of the BigLake metastore catalog that the Flink catalog links with.
    • LOCATION : the location of the BigQuery resources.

Your Flink session is now connected to BigLake metastore, and you can run Flink SQL commands.

Now that you're connected to BigLake metastore, you can create and view resources based on the metadata stored in BigLake metastore.

For example, try running the following commands in your interactive Flink SQL session to create an Iceberg database and table.

  1. Use the custom Iceberg catalog:

     USE 
      
     CATALOG 
      
      CATALOG_NAME 
     
     ; 
    

    Replace CATALOG_NAME with your Flink catalog identifier.

  2. Create a database, which creates a dataset in BigQuery:

     CREATE 
      
     DATABASE 
      
     IF 
      
     NOT 
      
     EXISTS 
      
      DATABASE_NAME 
     
     ; 
    

    Replace DATABASE_NAME with the name of your new database.

  3. Use the database that you created:

     USE 
      
      DATABASE_NAME 
     
     ; 
    
  4. Create an Iceberg table. The following creates an example sales table:

     CREATE 
      
     TABLE 
      
     IF 
      
     NOT 
      
     EXISTS 
      
      ICEBERG_TABLE_NAME 
     
      
     ( 
      
     order_number 
      
     BIGINT 
     , 
      
     price 
      
     DECIMAL 
     ( 
     32 
     , 
     2 
     ), 
      
     buyer 
      
     ROW 
     < 
     first_name 
      
     STRING 
     , 
      
     last_name 
      
     STRING 
     > 
     , 
      
     order_time 
      
     TIMESTAMP 
     ( 
     3 
     ) 
     ); 
    

    Replace ICEBERG_TABLE_NAME with a name for your new table.

  5. View table metadata:

     DESCRIBE 
      
     EXTENDED 
      
      ICEBERG_TABLE_NAME 
     
     ; 
    
  6. List tables in the database:

     SHOW 
      
     TABLES 
     ; 
    

Ingest data into your table

After you create an Iceberg table in the previous section, you can use Flink DataGen as a data source to ingest real-time data into your table. The following steps are an example of this workflow:

  1. Create a temporary table using DataGen:

     CREATE 
      
     TEMPORARY 
      
     TABLE 
      
      DATABASE_NAME 
     
     . 
      TEMP_TABLE_NAME 
     
     WITH 
      
     ( 
      
     'connector' 
      
     = 
      
     'datagen' 
     , 
      
     'rows-per-second' 
      
     = 
      
     '10' 
     , 
      
     'fields.order_number.kind' 
      
     = 
      
     'sequence' 
     , 
      
     'fields.order_number.start' 
      
     = 
      
     '1' 
     , 
      
     'fields.order_number.end' 
      
     = 
      
     '1000000' 
     , 
      
     'fields.price.min' 
      
     = 
      
     '0' 
     , 
      
     'fields.price.max' 
      
     = 
      
     '10000' 
     , 
      
     'fields.buyer.first_name.length' 
      
     = 
      
     '10' 
     , 
      
     'fields.buyer.last_name.length' 
      
     = 
      
     '10' 
     ) 
     LIKE 
      
      DATABASE_NAME 
     
     . 
      ICEBERG_TABLE_NAME 
     
      
     ( 
     EXCLUDING 
      
     ALL 
     ); 
    

    Replace the following:

    • DATABASE_NAME : the name of the database to store your temporary table.
    • TEMP_TABLE_NAME : a name for your temporary table.
    • ICEBERG_TABLE_NAME : the name of the Iceberg table that you created in the previous section.
  2. Set the parallelism to 1:

     SET 
      
     'parallelism.default' 
      
     = 
      
     '1' 
     ; 
    
  3. Set the checkpoint interval:

     SET 
      
     'execution.checkpointing.interval' 
      
     = 
      
     '10second' 
     ; 
    
  4. Set the checkpoint:

     SET 
      
     'state.checkpoints.dir' 
      
     = 
      
     'hdfs:///flink/checkpoints' 
     ; 
    
  5. Start the real-time streaming job:

     INSERT 
      
     INTO 
      
      ICEBERG_TABLE_NAME 
     
      
     SELECT 
      
     * 
      
     FROM 
      
      TEMP_TABLE_NAME 
     
     ; 
    

    The output is similar to the following:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
  6. To check the status of the streaming job, do the following:

    1. In the Google Cloud console, go to the Clusterspage.

      Go to Clusters

    2. Select your cluster.

    3. Click the Web interfacestab.

    4. Click the YARN ResourceManagerlink.

    5. In the YARN ResourceManagerinterface, find your Flink session, and click the ApplicationMasterlink under Tracking UI.

    6. In the Statuscolumn, confirm that your job status is Running.

  7. Query streaming data in the Flink SQL client:

     SELECT 
      
     * 
      
     FROM 
      
      ICEBERG_TABLE_NAME 
     
     /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ 
     ORDER 
      
     BY 
      
     order_time 
      
     desc 
     LIMIT 
      
     20 
     ; 
    
  8. Query streaming data in BigQuery:

     SELECT 
      
     * 
      
     FROM 
      
     ` DATABASE_NAME 
    . ICEBERG_TABLE_NAME 
    ` 
     ORDER 
      
     BY 
      
     order_time 
      
     desc 
     LIMIT 
      
     20 
     ; 
    
  9. Terminate the streaming job in the Flink SQL client:

     STOP 
      
     JOB 
      
     ' JOB_ID 
    ' 
     ; 
    

    Replace JOB_ID with the job ID that was displayed in the output when you created the streaming job.

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: