This page describes how to create a Dataproc cluster running Spark. You can use this cluster to work with Dataplex Universal Catalog metadata for lakes, zones, and assets.
Overview
You create a cluster after the Dataproc Metastore service instance is associated with the Dataplex Universal Catalog lake to ensure that the cluster can rely on the Hive Metastore endpoint to gain access to Dataplex Universal Catalog metadata.
Metadata managed within Dataplex Universal Catalog can be accessed using standard interfaces, such as Hive Metastore, to power Spark queries. The queries run on the Dataproc cluster.
For Parquet data, set Spark property spark.sql.hive.convertMetastoreParquet
to false
to avoid execution errors. More details
.
Create a Dataproc cluster
Run the following commands to create a Dataproc cluster, specifying the Dataproc Metastore service associated with the Dataplex Universal Catalog lake:
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID
\
--location LOCATION
\
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID
\
--location LOCATION
\
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID
\
--location LOCATION
\
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID
\
--project PROJECT
\
--region LOCATION
\
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
Explore metadata
Run DQL queries to explore the metadata and run Spark queries to query data.
Before you begin
-
Open an SSH session on the Dataproc cluster's primary node.
VM_ZONE = $( gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)" ) gcloud compute ssh CLUSTER_ID -m --project PROJECT --zone $VM_ZONE
-
At the primary node command prompt, open a new Python REPL.
python3
List databases
Each Dataplex Universal Catalog zone within the lake maps to a metastore database.
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"SHOW DATABASES"
)
df
.
show
()
List tables
List tables in one of the zones.
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"SHOW TABLES IN ZONE_ID
"
)
df
.
show
()
Query data
Query the data in one of the tables.
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
# Modify the SQL statement to retrieve or filter on table columns.
df
=
session
.
sql
(
"SELECT COLUMNS
FROM ZONE_ID
. TABLE_ID
WHERE QUERY
LIMIT 10"
)
df
.
show
()
Create tables and partitions in metadata
Run DDL queries to create tables and partitions in Dataplex Universal Catalog metadata using Apache Spark.
For more information about the supported data types, file formats, and row formats, see Supported values .
Before you begin
Before you create a table, create a Dataplex Universal Catalog asset that maps to the Cloud Storage bucket containing the underlying data. For more information, see Add an asset .
Create a table
Parquet, ORC, AVRO, CSV, and JSON tables are supported.
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"CREATE TABLE ZONE_ID
. TABLE_ID
( COLUMNS
DATA_TYPE
) PARTITIONED BY ( COLUMN
) STORED AS FILE_FORMAT
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs:// MY_GCP_BUCKET
/ TABLE_LOCATION
' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')"
)
df
.
show
()
Alter a table
Dataplex Universal Catalog does not allow you to alter the location of a table or
edit the partition columns for a table. Altering a table does not automatically
set userManaged
to true
.
In Spark SQL, you can rename a table , add columns , and set the file format of a table.
Rename a table
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"ALTER TABLE OLD_TABLE_NAME
RENAME TO NEW_TABLE_NAME
"
)
df
.
show
()
Add columns
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"ALTER TABLE TABLE_NAME
ADD COLUMN ( COLUMN_NAME
DATA_TYPE
"
))
df
.
show
()
Set the file format
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"ALTER TABLE TABLE_NAME
SET FILEFORMAT FILE_FORMAT
"
)
df
.
show
()
Drop a table
Dropping a table from Dataplex Universal Catalog metadata API doesn't delete the underlying data in Cloud Storage.
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"DROP TABLE ZONE_ID
. TABLE_ID
"
)
df
.
show
()
Add a partition
Dataplex Universal Catalog does not allow altering a partition once created. However, the partition can be dropped.
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"ALTER TABLE ZONE_ID
. TABLE_ID
ADD PARTITION ( COLUMN1
= VALUE1
) PARTITION ( COLUMN2
= VALUE2
)"
)
df
.
show
()
You can add multiple partitions of the same partition key and different partition values as shown in the preceding example.
Drop a partition
To drop a partition, run the following command:
import
pyspark.sql
as
sql
session
=
sql
.
SparkSession
.
builder
.
enableHiveSupport
()
.
getOrCreate
()
df
=
session
.
sql
(
"ALTER TABLE ZONE_ID
. TABLE_ID
DROP PARTITION ( COLUMN
= VALUE
)"
)
df
.
show
()
Query Iceberg tables
You can query Iceberg tables using Apache Spark.
Before you begin
Set up a Spark SQL session with Iceberg.
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Create an Iceberg table
To create an Iceberg table, run the following command:
CREATE
TABLE
ZONE_ID
.
TABLE_ID
(
COLUMNS
DATA_TYPE
)
USING
ICEBERG
PARTITIONED
BY
(
COLUMN
)
LOCATION
'gs:// MY_GCP_BUCKET
/ TABLE_ID
'
TBLPROPERTIES
(
'write.format.default'
=
' TABLE_FORMAT
'
);
Explore Iceberg snapshot and history
You can get snapshots and history of Iceberg tables using Apache Spark.
Before you begin
Set up a PySpark session with Iceberg support:
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Get history of Iceberg tables
To get the history of an Iceberg table, run the following command:
spark.read.format("iceberg").load(" ZONE_ID
. TABLE_ID
.history").show(truncate=False)
Get snapshots of Iceberg tables
To get a snapshot of an Iceberg table, run the following command:
spark.read.format("iceberg").load(" ZONE_ID
. TABLE_ID
.snapshots").show(truncate=False, vertical=True)
Supported data types and file formats
The supported data types are defined as follows:
-
TINYINT
-
SMALLINT
-
INT
-
BIGINT
-
BOOLEAN
-
FLOAT
-
DOUBLE
-
DOUBLE PRECISION
-
STRING
-
BINARY
-
TIMESTAMP
-
DECIMAL
-
DATE
ARRAY < DATA_TYPE >
STRUCT < COLUMN : DATA_TYPE >
The following are the supported file formats:
-
TEXTFILE
-
ORC
-
PARQUET
-
AVRO
-
JSONFILE
For more information about the file formats, see Storage formats .
The following are the supported row formats:
- DELIMITED [FIELDS TERMINATED BY CHAR ]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES ( PROPERTY_NAME = PROPERTY_VALUE , PROPERTY_NAME = PROPERTY_VALUE , ...)]
What's next
- Learn more about managing metadata for lakes, zones, and assets .