This page shows you how to create a Dataproc cluster that uses the Spark Spanner connector to read data from and write data to Spanner using Apache Spark .
The Spanner connector works with Spark to read data from and write data to the Spanner database using the Spanner Java library . The Spanner connector supports reading Spanner tables and graphs into Spark DataFrames and GraphFrames , and writing DataFrame data into Spanner tables.
Costs
In this document, you use the following billable components of Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
To generate a cost estimate based on your projected usage, use the pricing calculator .
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project : Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project
: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles .
-
Verify that billing is enabled for your Google Cloud project .
-
Enable the Spanner, Dataproc, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles . -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project : Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project
: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles .
-
Verify that billing is enabled for your Google Cloud project .
-
Enable the Spanner, Dataproc, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles . - Grant required roles .
- Set up a Dataproc cluster .
- Set up a Spanner instance with a Singers database table .
Grant required roles
Certain IAM roles are required to run the examples on this page. Depending on organization policies, these roles may have already been granted. To check role grants, see Do you need to grant roles? .
For more information about granting roles, see Manage access to projects, folders, and organizations .
To ensure that the Compute Engine default service account has the necessary permissions to create a Dataproc cluster, ask your administrator to grant the following IAM roles to the Compute Engine default service account on the project:
- Dataproc Worker
(
roles/dataproc.worker) - Cloud Spanner Database User
(
roles/spanner.databaseUser) - Cloud Spanner Database Reader with DataBoost
(
roles/spanner.databaseReaderWithDataBoost)
Set up a Dataproc cluster
Create a Dataproc cluster
or use an existing Dataproc cluster that was created with the 2.1
or later Dataproc image
or, if the
cluster was created with the 2.0
or earlier image, it mus have been created with
the scope
property set to cloud-platform
scope
.
Set up a Spanner instance with a Singers database table
Create a Spanner instance
with a database that contains a Singers
table. Note the Spanner
instance ID and database ID.
Use the Spanner connector with Spark
The Spanner connector is available for Spark versions 3.1+
. You
specify the connector version
as part of the Cloud Storage connector JAR file specification when you submit a job
to a
Dataproc cluster.
Example:gcloud CLI Spark job submission with the Spanner connector.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner- CONNECTOR_VERSION .jar \ ... [other job submission flags]
Replace the following:
CONNECTOR_VERSION
: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub GoogleCloudDataproc/spark-spanner-connector
repository.
Read Spanner tables
You can use Python or Scala to read Spanner table data into a Spark Dataframe using the Spark data source API .
PySpark
You can run the example PySpark code in this section on your cluster by submitting the job to the
Dataproc service or by running the job from the spark-submit
REPL
on the cluster master node.
Dataproc job
- Create a
singers.pyfile in using a local text editor or in Cloud Shell using the pre-installedvi,vim, ornanotext editor. - After populating the placehoder variables, paste the following code
into the
singers.pyfile. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ . builder \ . master ( 'yarn' ) \ . appName ( 'spark-spanner-demo' ) \ . getOrCreate () # Load data from Spanner. singers = spark . read . format ( 'cloud-spanner' ) \ . option ( "projectId" , " PROJECT_ID " ) \ . option ( "instanceId" , " INSTANCE_ID " ) \ . option ( "databaseId" , " DATABASE_ID " ) \ . option ( "table" , " TABLE_NAME " ) \ . option ( "enableDataBoost" , "true" ) \ . load () singers . createOrReplaceTempView ( 'Singers' ) # Read from Singers result = spark . sql ( 'SELECT * FROM Singers' ) result . show () result . printSchema ()
Replace the following:
- PROJECT_ID : Your Google Cloud project ID. Project IDs are listed in the Project infosection on the Google Cloud console Dashboard .
- INSTANCE_ID
, DATABASE_ID
, and TABLE_NAME
: See Set up a Spanner instance with
Singersdatabase table .
- Save the
singers.pyfile. - Submit the job
to the Dataproc service using the Google Cloud console, gcloud CLI or
Dataproc API.
Example:gcloud CLI job submission with the Spanner connector.
gcloud dataproc jobs submit pyspark singers.py \ --cluster= CLUSTER_NAME \ --region= REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner- CONNECTOR_VERSION .jar
Replace the following:
- CLUSTER_NAME : The name of the new cluster.
- REGION : An available Compute Engine region to run the workload.
- CONNECTOR_VERSION
: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connectorrepository.
spark-submit job
- Connect to the Dataproc cluster master node using SSH.
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster.
- On the Cluster detailspage, select the VM Instances tab. Then click
SSHto the right of the name of the cluster master node.
A browser window opens at your home directory on the master node.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create a
singers.pyfile on the master node using the pre-installedvi,vim, ornanotext editor.- Paste the following code into the
singers.pyfile after populating the placehoder variables into thesingers.pyfile. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ . builder \ . master ( 'yarn' ) \ . appName ( 'spark-spanner-demo' ) \ . getOrCreate () # Load data from Spanner. singers = spark . read . format ( 'cloud-spanner' ) \ . option ( "projectId" , " PROJECT_ID " ) \ . option ( "instanceId" , " INSTANCE_ID " ) \ . option ( "databaseId" , " DATABASE_ID " ) \ . option ( "table" , " TABLE_NAME " ) \ . option ( "enableDataBoost" , "true" ) \ . load () singers . createOrReplaceTempView ( 'Singers' ) # Read from Singers result = spark . sql ( 'SELECT * FROM Singers' ) result . show () result . printSchema ()
Replace the following:
- PROJECT_ID : Your Google Cloud project ID. Project IDs are listed in the Project infosection on the Google Cloud console Dashboard .
- INSTANCE_ID
, DATABASE_ID
, and TABLE_NAME
: See Set up a Spanner instance with
Singersdatabase table .
- Save the
singers.pyfile.
- Paste the following code into the
- Run
singers.pywithspark-submitto create the SpannerSingerstable.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner- CONNECTOR_VERSION .jar singers.py
Replace the following:
- CONNECTOR_VERSION
: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connectorrepository.
The output is:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION
: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
Scala
To run the example Scala code on your cluster, complete the following steps:
- Connect to the Dataproc cluster master node using SSH.
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster.
- On the Cluster detailspage, select the VM Instances tab. Then click
SSHto the right of the name of the cluster master node.
A browser window opens at your home directory on the master node.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create a
singers.scalafile on the master node using the pre-installedvi,vim, ornanotext editor.- Paste the following code into the
singers.scalafile. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.object singers { def main (): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = ( spark . read . format ( "cloud-spanner" ) . option ( "projectId" , " PROJECT_ID " ) . option ( "instanceId" , " INSTANCE_ID " ) . option ( "databaseId" , " DATABASE_ID " ) . option ( "table" , " TABLE_NAME " ) . option ( "enableDataBoost" , true ) . load () . cache ()) singersDF . createOrReplaceTempView ( "Singers" ) // Load the Singers table. val result = spark . sql ( "SELECT * FROM Singers" ) result . show () result . printSchema () } }
Replace the following:
- PROJECT_ID : Your Google Cloud project ID. Project IDs are listed in the Project infosection on the Google Cloud console Dashboard .
- INSTANCE_ID
, DATABASE_ID
, and TABLE_NAME
: See Set up a Spanner instance with
Singersdatabase table .
- Save the
singers.scalafile.
- Paste the following code into the
- Launch the
spark-shellREPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner- CONNECTOR_VERSION .jar
Replace the following:
CONNECTOR_VERSION : Spanner connector version. Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connectorrepository. - Run
singers.scalawith the:load singers.scalacommand to create the SpannerSingerstable. The output listing displays examplesfrom the Singers output.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Read Spanner graphs
The Spanner connector supports exporting the graph into separate
node and edge DataFrames
as well as exporting into GraphFrames
directly.
The following example exports a Spanner into a GraphFrame
. It
uses the Python SpannerGraphConnector
class, included in the
Spanner connector jar, to read the Spanner Graph
.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner- CONNECTOR_VERSION .jar" spark = ( SparkSession . builder . appName ( "spanner-graphframe-graphx-example" ) . config ( "spark.jars.packages" , "graphframes:graphframes:0.8.4-spark3.5-s_2.12" ) . config ( "spark.jars" , connector_jar ) . getOrCreate ()) spark . sparkContext . addPyFile ( connector_jar ) from spannergraph import SpannerGraphConnector connector = ( SpannerGraphConnector () . spark ( spark ) . project ( " PROJECT_ID " ) . instance ( " INSTANCE_ID " ) . database ( " DATABASE_ID " ) . graph ( " GRAPH_ID " )) g = connector . load_graph () g . vertices . show () g . edges . show ()
Replace the following:
- CONNECTOR_VERSION
: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connectorrepository. - PROJECT_ID : Your Google Cloud project ID. Project IDs are listed in the Project infosection on the Google Cloud console Dashboard .
- INSTANCE_ID , DATABASE_ID , and TABLE_NAME Insert the instance, database, and graph IDs.
To export node and edge DataFrames
instead of GraphFrames, use load_dfs
instead:
df_vertices , df_edges , df_id_map = connector . load_dfs ()
Write Spanner tables
The Spanner connector supports writing a Spark Dataframe
to a Spanner table using the Spark data source API
.
During preview, the Spark Spanner connector supports append
save mode only.
Write DataFrame to Spanner table example
Populate the variables before saving and running the code.
"""Spanner PySpark write example.""" from pyspark.sql import SparkSession spark = SparkSession . builder . appName ( 'Spanner Write App' ) . getOrCreate () columns = [ 'id' , 'name' , 'email' ] data = [( 1 , 'John Doe' , 'john.doe@example.com' ), ( 2 , 'Jane Doe' , 'jane.doe@example.com' )] df = spark . createDataFrame ( data , columns ) df . write . format ( 'cloud-spanner' ) \ . option ( "projectId" , " PROJECT_ID " ) . option ( "instanceId" , " INSTANCE_ID " ) . option ( "databaseId" , " DATABASE_ID " ) . option ( "table" , " TABLE_NAME " ) . mode ( "append" ) \ . save ()
Replace the following.
- PROJECT_ID : The Google Cloud project ID. Project IDs are listed in the Project infosection on the Google Cloud console Dashboard .
- INSTANCE_ID , DATABASE_ID , and TABLE_NAME Insert the instance, database, and table IDs.
Clean up
To avoid incurring ongoing charges to your Google Cloud account, you can stop or delete your Dataproc cluster and delete your Spanner instance .
What's next
- Refer to the
pyspark.sql.DataFrameexamples . - For Spark DataFrame language support, see the following:
- Refer to the Spark Spanner Connector repository on GitHub.
- See the Spark job tuning tips .

