Access metadata in Apache Spark

This page describes how to create a Dataproc cluster running Spark. You can use this cluster to work with Dataplex Universal Catalog metadata for lakes, zones, and assets.

Overview

You create a cluster after the Dataproc Metastore service instance is associated with the Dataplex Universal Catalog lake to ensure that the cluster can rely on the Hive Metastore endpoint to gain access to Dataplex Universal Catalog metadata.

Metadata managed within Dataplex Universal Catalog can be accessed using standard interfaces, such as Hive Metastore, to power Spark queries. The queries run on the Dataproc cluster.

For Parquet data, set Spark property spark.sql.hive.convertMetastoreParquet to false to avoid execution errors. More details .

Create a Dataproc cluster

Run the following commands to create a Dataproc cluster, specifying the Dataproc Metastore service associated with the Dataplex Universal Catalog lake:

 GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID 
\
    --location LOCATION 
\
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID 
\
    --location LOCATION 
\
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID 
\
    --location LOCATION 
\
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID 
\
    --project PROJECT 
\
    --region LOCATION 
\
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR" 

Run DQL queries to explore the metadata and run Spark queries to query data.

  1. Open an SSH session on the Dataproc cluster's primary node.

      VM_ZONE 
     = 
     $( 
    gcloud  
    dataproc  
    clusters  
    describe  
     CLUSTER_ID 
      
     \ 
      
    --project  
     PROJECT 
      
     \ 
      
    --region  
     LOCATION 
      
     \ 
      
    --format  
     "value(config.gceClusterConfig.zoneUri)" 
     ) 
    gcloud  
    compute  
    ssh  
     CLUSTER_ID 
    -m  
    --project  
     PROJECT 
      
    --zone  
     $VM_ZONE 
     
    
  2. At the primary node command prompt, open a new Python REPL.

     python3 
    

List databases

Each Dataplex Universal Catalog zone within the lake maps to a metastore database.

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "SHOW DATABASES" 
 ) 
 df 
 . 
 show 
 () 
 

List tables

List tables in one of the zones.

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "SHOW TABLES IN ZONE_ID 
" 
 ) 
 df 
 . 
 show 
 () 
 

Query data

Query the data in one of the tables.

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 # Modify the SQL statement to retrieve or filter on table columns. 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "SELECT COLUMNS 
FROM ZONE_ID 
. TABLE_ID 
WHERE QUERY 
LIMIT 10" 
 ) 
 df 
 . 
 show 
 () 
 

Create tables and partitions in metadata

Run DDL queries to create tables and partitions in Dataplex Universal Catalog metadata using Apache Spark.

For more information about the supported data types, file formats, and row formats, see Supported values .

Before you begin

Before you create a table, create a Dataplex Universal Catalog asset that maps to the Cloud Storage bucket containing the underlying data. For more information, see Add an asset .

Create a table

Parquet, ORC, AVRO, CSV, and JSON tables are supported.

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "CREATE TABLE ZONE_ID 
. TABLE_ID 
( COLUMNS 
 DATA_TYPE 
) PARTITIONED BY ( COLUMN 
) STORED AS FILE_FORMAT 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs:// MY_GCP_BUCKET 
/ TABLE_LOCATION 
' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')" 
 ) 
 df 
 . 
 show 
 () 
 

Alter a table

Dataplex Universal Catalog does not allow you to alter the location of a table or edit the partition columns for a table. Altering a table does not automatically set userManaged to true .

In Spark SQL, you can rename a table , add columns , and set the file format of a table.

Rename a table

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "ALTER TABLE OLD_TABLE_NAME 
RENAME TO NEW_TABLE_NAME 
" 
 ) 
 df 
 . 
 show 
 () 
 

Add columns

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "ALTER TABLE TABLE_NAME 
ADD COLUMN ( COLUMN_NAME 
 DATA_TYPE 
" 
 )) 
 df 
 . 
 show 
 () 
 

Set the file format

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "ALTER TABLE TABLE_NAME 
SET FILEFORMAT FILE_FORMAT 
" 
 ) 
 df 
 . 
 show 
 () 
 

Drop a table

Dropping a table from Dataplex Universal Catalog metadata API doesn't delete the underlying data in Cloud Storage.

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "DROP TABLE ZONE_ID 
. TABLE_ID 
" 
 ) 
 df 
 . 
 show 
 () 
 

Add a partition

Dataplex Universal Catalog does not allow altering a partition once created. However, the partition can be dropped.

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "ALTER TABLE ZONE_ID 
. TABLE_ID 
ADD PARTITION ( COLUMN1 
= VALUE1 
) PARTITION ( COLUMN2 
= VALUE2 
)" 
 ) 
 df 
 . 
 show 
 () 
 

You can add multiple partitions of the same partition key and different partition values as shown in the preceding example.

Drop a partition

To drop a partition, run the following command:

  import 
  
 pyspark.sql 
  
 as 
  
 sql 
 session 
 = 
 sql 
 . 
 SparkSession 
 . 
 builder 
 . 
 enableHiveSupport 
 () 
 . 
 getOrCreate 
 () 
 df 
 = 
 session 
 . 
 sql 
 ( 
 "ALTER TABLE ZONE_ID 
. TABLE_ID 
DROP PARTITION ( COLUMN 
= VALUE 
)" 
 ) 
 df 
 . 
 show 
 () 
 

Query Iceberg tables

You can query Iceberg tables using Apache Spark.

Before you begin

Set up a Spark SQL session with Iceberg.

 spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse 

Create an Iceberg table

To create an Iceberg table, run the following command:

   
 CREATE 
  
 TABLE 
  
  ZONE_ID 
 
 . 
  TABLE_ID 
 
  
 ( 
  COLUMNS 
 
  
  DATA_TYPE 
 
 ) 
  
 USING 
  
 ICEBERG 
  
 PARTITIONED 
  
 BY 
  
 ( 
  COLUMN 
 
 ) 
  
 LOCATION 
  
 'gs:// MY_GCP_BUCKET 
/ TABLE_ID 
' 
  
 TBLPROPERTIES 
  
 ( 
 'write.format.default' 
  
 = 
  
 ' TABLE_FORMAT 
' 
 ); 
 

Explore Iceberg snapshot and history

You can get snapshots and history of Iceberg tables using Apache Spark.

Before you begin

Set up a PySpark session with Iceberg support:

 pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse 

Get history of Iceberg tables

To get the history of an Iceberg table, run the following command:

 spark.read.format("iceberg").load(" ZONE_ID 
. TABLE_ID 
.history").show(truncate=False) 

Get snapshots of Iceberg tables

To get a snapshot of an Iceberg table, run the following command:

 spark.read.format("iceberg").load(" ZONE_ID 
. TABLE_ID 
.snapshots").show(truncate=False, vertical=True) 

Supported data types and file formats

The supported data types are defined as follows:

Data type
Values
Primitive
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array
ARRAY < DATA_TYPE >
Structure
STRUCT < COLUMN : DATA_TYPE >

The following are the supported file formats:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

For more information about the file formats, see Storage formats .

The following are the supported row formats:

  • DELIMITED [FIELDS TERMINATED BY CHAR ]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES ( PROPERTY_NAME = PROPERTY_VALUE , PROPERTY_NAME = PROPERTY_VALUE , ...)]

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: