Use Apache Iceberg tables with Dataproc Metastore

This page explains how to use Apache Iceberg tables with a Dataproc Metastore service attached to a Dataproc cluster. Apache Iceberg is an open table format for large analytical datasets.

Compatibilities

Iceberg tables support the following features.

Drivers Select Insert Create Table
Spark
✓ ✓ ✓
Hive
✓ ✓
Presto
✓ ✓ ✓

Before you begin

Use Iceberg table with Spark

The following example shows you should to use Iceberg tables with Spark.

Iceberg tables support read and write operations. For more information, see Apache Iceberg - Spark .

Spark Configurations

First, start the Spark shell and use a Cloud Storage bucket to store data. In order to include Iceberg in the Spark installation, add the Iceberg Spark Runtime JAR file to the Spark's JARs folder. To download the JAR file, see Apache Iceberg Downloads . The following command starts the Spark shell with support for Apache Iceberg:

 $ spark-shell --conf spark.sql.warehouse.dir=gs:// BUCKET_NAME 
/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar 

Use Hive Catalog to create Iceberg tables

  1. Set up Hive Catalog configurations to create Iceberg tables in the spark scala:

      import 
      
     org.apache.iceberg.hive.HiveCatalog 
     import 
      
     org.apache.iceberg.catalog._ 
     import 
      
     org.apache.iceberg.Schema 
     import 
      
     org.apache.iceberg.types.Types._ 
     import 
      
     org.apache.iceberg.PartitionSpec 
     import 
      
     org.apache.iceberg.spark.SparkSchemaUtil 
     import 
      
     org.apache.spark.sql._ 
     import 
      
     java.util.HashMap 
     
    
  2. Create a table to insert and update data. The following is an example.

    1. Create a table called example under default database:

       val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example"); 
      
    2. Insert sample data:

       val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema); 
      
    3. Specify partition strategy based on column id :

       val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build; 
      
    4. Create the table:

       val table=catalog.createTable(name,df1_schema,partition_spec); 
      
    5. Add the Iceberg Storage Handler and SerDe as the table property:

       table.updateProperties().set("engine.hive.enabled", "true").commit(); 
      
    6. Write the data to the table:

       df1.write.format("iceberg").mode("overwrite").save("default.example"); 
      
    7. Read the data:

        val 
        
       read_df1 
       = 
       spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "default.example" 
       ); 
       read_df1 
       . 
       show 
       ; 
       
      
  3. Change the table schema. The following is an example.

    1. Get the table and add a new column grade :

        val 
        
       table 
        
       = 
        
       catalog 
       . 
       loadTable 
       ( 
       TableIdentifier 
       . 
       of 
       ( 
       "default" 
       , 
        
       "example" 
       )); 
       table 
       . 
       updateSchema 
       . 
       addColumn 
       ( 
       "grade" 
       , 
        
       StringType 
       . 
       get 
       ()) 
       . 
       commit 
       (); 
       
      
    2. Check the new table schema:

       table.schema.toString; 
      
  4. Insert more data and view the schema evolution. The following is an example.

    1. Add new data to the table:

       val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example"); 
      
    2. Check the inserted new data:

        val 
        
       read_df2 
       = 
       spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "default.example" 
       ); 
       read_df2 
       . 
       show 
       ; 
       
      
    3. View the table history:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "default.example.history" 
       ) 
       . 
       show 
       ( 
       truncate 
        
       = 
        
       false 
       ); 
       
      
    4. View the snapshots:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "default.example.snapshots" 
       ) 
       . 
       show 
       ( 
       truncate 
        
       = 
        
       false 
       ); 
       
      
    5. View the manifest files:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "default.example.manifests" 
       ) 
       . 
       show 
       ( 
       truncate 
        
       = 
        
       false 
       ); 
       
      
    6. View the data files:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "default.example.files" 
       ) 
       . 
       show 
       ( 
       truncate 
        
       = 
        
       false 
       ); 
       
      
    7. Assume you made a mistake by adding the row with the value of id=6 and want to go back to see a correct version of the table:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       option 
       ( 
       "snapshot-id" 
       , 
       "2273922295095144317" 
       ) 
       . 
       load 
       ( 
       "default.example" 
       ) 
       . 
       show 
       (); 
       
      

      Replace snapshot-id with the version you want to go back to.

Use Hadoop Tables to create Iceberg tables

  1. Set up Hadoop Table configurations to create Iceberg tables in the spark scala:

      import 
      
     org.apache.hadoop.conf.Configuration 
     import 
      
     org.apache.iceberg.hadoop.HadoopTables 
     import 
      
     org.apache.iceberg.Table 
     import 
      
     org.apache.iceberg.Schema 
     import 
      
     org.apache.iceberg.types.Types._ 
     import 
      
     org.apache.iceberg.PartitionSpec 
     import 
      
     org.apache.iceberg.spark.SparkSchemaUtil 
     import 
      
     org.apache.spark.sql._ 
     
    
  2. Create a table to insert and update data. The following is an example.

    1. Create a table called example under default database:

       val conf = new Configuration();
      val tables = new HadoopTables(conf); 
      
    2. Insert sample data:

       val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema); 
      
    3. Specify partition strategy based on column id :

       val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build; 
      
    4. Create the table:

        val 
        
       table_location 
        
       = 
        
       "gs://<gcs-bucket-name>/hive-warehouse/<database-name>" 
       ; 
       val 
        
       table 
        
       = 
        
       tables 
       . 
       create 
       ( 
       df1_schema 
       , 
        
       partition_spec 
       , 
        
       table_location 
       ); 
       
      
    5. Write the data to the table:

       df1.write.format("iceberg").mode("overwrite").save(table_location); 
      
    6. Read the data:

        val 
        
       read_df1 
       = 
       spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       table_location 
       ); 
       read_df1 
       . 
       show 
       ; 
       
      
  3. Change the table schema. The following is an example.

    1. Get the table and add a new column grade :

        val 
        
       table 
        
       = 
        
       tables 
       . 
       load 
       ( 
       table_location 
       ); 
       table 
       . 
       updateSchema 
       . 
       addColumn 
       ( 
       "grade" 
       , 
        
       StringType 
       . 
       get 
       ()) 
       . 
       commit 
       (); 
       
      
    2. Check the new table schema:

       table.schema.toString; 
      
  4. Insert more data and view the schema evolution. The following is an example.

    1. Add new data to the table:

       val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location); 
      
    2. Check the inserted new data:

        val 
        
       read_df2 
       = 
       spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       table_location 
       ); 
       read_df2 
       . 
       show 
       ; 
       
      
    3. View the table history:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history" 
       ) 
       . 
       show 
       ( 
       truncate 
       = 
       false 
       ); 
       
      
    4. View the snapshots:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots" 
       ) 
       . 
       show 
       ( 
       truncate 
       = 
       false 
       ); 
       
      
    5. View the manifest files:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests" 
       ) 
       . 
       show 
       ( 
       truncate 
       = 
       false 
       ); 
       
      
    6. View the data files:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       "gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files" 
       ) 
       . 
       show 
       ( 
       truncate 
       = 
       false 
       ); 
       
      
    7. Go back to see a specific version of the table:

        spark 
       . 
       read 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       option 
       ( 
       "snapshot-id" 
       , 
       "3943776515926014142L" 
       ) 
       . 
       format 
       ( 
       "iceberg" 
       ) 
       . 
       load 
       ( 
       table_location 
       ) 
       . 
       show 
       ; 
       
      

      Replace snapshot-id with the version you want to go back to and add "L" to the end. For example, "3943776515926014142L" .

Use Iceberg table on Hive

Iceberg supports tables read using Hive by using a StorageHandler . Note that only Hive 2.x and 3.1.2 versions are supported. For more information, see Apache Iceberg - Hive . In addition, add the Iceberg Hive Runtime JAR file to the Hive classpath. To download the JAR file, see Apache Iceberg Downloads .

In order to overlay a Hive table on top of an Iceberg table, you must create the Iceberg table using either a Hive Catalog or a Hadoop Table. In addition, you must configure Hive accordingly to read data from the Iceberg table.

Read Iceberg table (Hive Catalog) on Hive

  1. Open the Hive client and set up configurations to read Iceberg tables on Hive client session:

     add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false; 
    
  2. Read table schema and data. The following is an example.

    1. Check the table schema and whether the table format is Iceberg:

       describe formatted example; 
      
    2. Read the data from the table:

       select * from example; 
      

Read Iceberg table (Hadoop Table) on Hive

  1. Open the Hive client and set up configurations to read Iceberg tables on Hive client session:

     add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false; 
    
  2. Read table schema and data. The following is an example.

    1. Create an external table (overlay a Hive table on top of the Iceberg table):

        CREATE 
        
       EXTERNAL 
        
       TABLE 
        
       hadoop_table 
       STORED 
        
       BY 
        
       'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
       LOCATION 
        
       'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' 
       TBLPROPERTIES 
        
       ( 
       'iceberg.catalog' 
       = 
       'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' 
       ); 
       
      
    2. Check the table schema and whether the table format is Iceberg:

       describe formatted hadoop_table; 
      
    3. Read the data from the table:

       select * from hadoop_table; 
      

Use Iceberg table on Presto

Presto queries use the Hive connector to get partition locations, so you must configure Presto accordingly to read and write data on the Iceberg table. For more information, see Presto/Trino - Hive Connector and Presto/Trino - Iceberg Connector .

Presto Configurations

  1. Under each Dataproc cluster node, create a file named iceberg.properties /etc/presto/conf/catalog/iceberg.properties and configure the hive.metastore.uri as follows:

     connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083> 
    

    Replace example.net:9083 with the correct host and port for your Hive metastore Thrift service.

  2. Restart the Presto service to push the configurations:

     sudo systemctl restart presto.service 
    

Create Iceberg table on Presto

  1. Open the Presto client and use the "Iceberg" connector to get the metastore:

     --catalog iceberg --schema default 
    
  2. Create a table to insert and update data. The following is an example.

    1. Create a table called example under default database:

       CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']); 
      
    2. Insert sample data:

       INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman'); 
      
    3. Read data from the table:

       SELECT * FROM iceberg.default.example; 
      
    4. Insert more new data to check snapshots:

       INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore'); 
      
    5. View the snapshots:

       SELECT snapshot_id FROM iceberg.default."example$snapshots"; 
      

      By adding the command ORDER BY committed_at DESC LIMIT 1; , you can find the latest snapshot ID.

    6. Roll back to a specific version of the table:

       CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448); 
      

      Replace snapshot-id with the version you want to go back to.

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: