Use Apache Spark with HBase on Dataproc


Objectives

This tutorial shows you how to:

  1. Create a Dataproc cluster, installing Apache HBase and Apache ZooKeeper on the cluster
  2. Create an HBase table using the HBase shell running on the master node of the Dataproc cluster
  3. Use Cloud Shell to submit a Java or PySpark Spark job to the Dataproc service that writes data to, then reads data from, the HBase table

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator .

New Google Cloud users might be eligible for a free trial .

Before you begin

If you haven't already done so, create a Google Cloud Platform project.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project .

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project .

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

Create a Dataproc cluster

  1. Run the following command in a Cloud Shell session terminal to:

    • Install the HBase and ZooKeeper components
    • Provision three worker nodes (three to five workers are recommended to run the code in this tutorial)
    • Enable the Component Gateway
    • Use image version 2.0
    • Use the --properties flag to add the HBase config and HBase library to the Spark driver and executor classpaths.
gcloud dataproc clusters create cluster-name 
\
    --region= region 
\
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verify connector installation

  1. From the Google Cloud console or a Cloud Shell session terminal, SSH into the Dataproc cluster master node .

  2. Verify the installation of the Apache HBase Spark connector on the master node:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    Sample output:
    -rw-r--r-- 1 root root size date time 
    hbase-spark- connector.version 
    .jar
  3. Keep the SSH session terminal open to:

    1. Create an HBase table
    2. (Java users): run commands on the master node of the cluster to determine the versions of components installed on the cluster
    3. Scan your Hbase table after you run the code

Create an HBase table

Run the commands listed in this section in the master node SSH session terminal that you opened in the previous step.

  1. Open the HBase shell:

    hbase shell
  2. Create an HBase 'my-table' with a 'cf' column family:

    create 'my_table','cf'
    1. To confirm table creation, in the Google Cloud console, click HBasein the Google Cloud console Component Gateway links to open the Apache HBase UI. my-table is listed in the Tablessection on the Homepage.

View the Spark code

Java

  package 
  
 hbase 
 ; 
 import 
  
 org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog 
 ; 
 import 
  
 org.apache.spark.sql.Dataset 
 ; 
 import 
  
 org.apache.spark.sql.Row 
 ; 
 import 
  
 org.apache.spark.sql.SparkSession 
 ; 
 import 
  
 java.io.Serializable 
 ; 
 import 
  
 java.util.Arrays 
 ; 
 import 
  
 java.util.HashMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 public 
  
 class 
 SparkHBaseMain 
  
 { 
  
 public 
  
 static 
  
 class 
 SampleData 
  
 implements 
  
 Serializable 
  
 { 
  
 private 
  
 String 
  
 key 
 ; 
  
 private 
  
 String 
  
 name 
 ; 
  
 public 
  
 SampleData 
 ( 
 String 
  
 key 
 , 
  
 String 
  
 name 
 ) 
  
 { 
  
 this 
 . 
 key 
  
 = 
  
 key 
 ; 
  
 this 
 . 
 name 
  
 = 
  
 name 
 ; 
  
 } 
  
 public 
  
 SampleData 
 () 
  
 { 
  
 } 
  
 public 
  
 String 
  
 getName 
 () 
  
 { 
  
 return 
  
 name 
 ; 
  
 } 
  
 public 
  
 void 
  
 setName 
 ( 
 String 
  
 name 
 ) 
  
 { 
  
 this 
 . 
 name 
  
 = 
  
 name 
 ; 
  
 } 
  
 public 
  
 String 
  
 getKey 
 () 
  
 { 
  
 return 
  
 key 
 ; 
  
 } 
  
 public 
  
 void 
  
 setKey 
 ( 
 String 
  
 key 
 ) 
  
 { 
  
 this 
 . 
 key 
  
 = 
  
 key 
 ; 
  
 } 
  
 } 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 { 
  
 // Init SparkSession 
  
 SparkSession 
  
 spark 
  
 = 
  
 SparkSession 
  
 . 
 builder 
 () 
  
 . 
 master 
 ( 
 "yarn" 
 ) 
  
 . 
 appName 
 ( 
 "spark-hbase-tutorial" 
 ) 
  
 . 
 getOrCreate 
 (); 
  
 // Data Schema 
  
 String 
  
 catalog 
  
 = 
  
 "{" 
 + 
 "\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," 
  
 + 
  
 "\"rowkey\":\"key\"," 
  
 + 
  
 "\"columns\":{" 
  
 + 
  
 "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," 
  
 + 
  
 "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" 
  
 + 
  
 "}" 
  
 + 
  
 "}" 
 ; 
  
 Map<String 
 , 
  
 String 
>  
 optionsMap 
  
 = 
  
 new 
  
 HashMap<String 
 , 
  
 String 
> (); 
  
 optionsMap 
 . 
 put 
 ( 
 HBaseTableCatalog 
 . 
 tableCatalog 
 (), 
  
 catalog 
 ); 
  
 Dataset<Row> 
  
 ds 
 = 
  
 spark 
 . 
 createDataFrame 
 ( 
 Arrays 
 . 
 asList 
 ( 
  
 new 
  
 SampleData 
 ( 
 "key1" 
 , 
  
 "foo" 
 ), 
  
 new 
  
 SampleData 
 ( 
 "key2" 
 , 
  
 "bar" 
 )), 
  
 SampleData 
 . 
 class 
 ); 
  
 // Write to HBase 
  
 ds 
 . 
 write 
 () 
  
 . 
 format 
 ( 
 "org.apache.hadoop.hbase.spark" 
 ) 
  
 . 
 options 
 ( 
 optionsMap 
 ) 
  
 . 
 option 
 ( 
 "hbase.spark.use.hbasecontext" 
 , 
  
 "false" 
 ) 
  
 . 
 mode 
 ( 
 "overwrite" 
 ) 
  
 . 
 save 
 (); 
  
 // Read from HBase 
  
 Dataset 
  
 dataset 
  
 = 
  
 spark 
 . 
 read 
 () 
  
 . 
 format 
 ( 
 "org.apache.hadoop.hbase.spark" 
 ) 
  
 . 
 options 
 ( 
 optionsMap 
 ) 
  
 . 
 option 
 ( 
 "hbase.spark.use.hbasecontext" 
 , 
  
 "false" 
 ) 
  
 . 
 load 
 (); 
  
 dataset 
 . 
 show 
 (); 
  
 } 
 } 
 

Python

  from 
  
 pyspark.sql 
  
 import 
 SparkSession 
 # Initialize Spark Session 
 spark 
 = 
 SparkSession 
\ . 
 builder 
\ . 
 master 
 ( 
 'yarn' 
 ) 
\ . 
 appName 
 ( 
 'spark-hbase-tutorial' 
 ) 
\ . 
 getOrCreate 
 () 
 data_source_format 
 = 
 '' 
 # Create some test data 
 df 
 = 
 spark 
 . 
 createDataFrame 
 ( 
 [ 
 ( 
 "key1" 
 , 
 "foo" 
 ), 
 ( 
 "key2" 
 , 
 "bar" 
 ), 
 ], 
 [ 
 "key" 
 , 
 "name" 
 ] 
 ) 
 # Define the schema for catalog 
 catalog 
 = 
 '' 
 . 
 join 
 ( 
 """{ 
 "table":{"namespace":"default", "name":"my_table"}, 
 "rowkey":"key", 
 "columns":{ 
 "key":{"cf":"rowkey", "col":"key", "type":"string"}, 
 "name":{"cf":"cf", "col":"name", "type":"string"} 
 } 
 }""" 
 . 
 split 
 ()) 
 # Write to HBase 
 df 
 . 
 write 
 . 
 format 
 ( 
 'org.apache.hadoop.hbase.spark' 
 ) 
 . 
 options 
 ( 
 catalog 
 = 
 catalog 
 ) 
 . 
 option 
 ( 
 "hbase.spark.use.hbasecontext" 
 , 
 "false" 
 ) 
 . 
 mode 
 ( 
 "overwrite" 
 ) 
 . 
 save 
 () 
 # Read from HBase 
 result 
 = 
 spark 
 . 
 read 
 . 
 format 
 ( 
 'org.apache.hadoop.hbase.spark' 
 ) 
 . 
 options 
 ( 
 catalog 
 = 
 catalog 
 ) 
 . 
 option 
 ( 
 "hbase.spark.use.hbasecontext" 
 , 
 "false" 
 ) 
 . 
 load 
 () 
 result 
 . 
 show 
 () 
 

Run the code

  1. Open a Cloud Shell session terminal.

  2. Clone the GitHub GoogleCloudDataproc/cloud-dataproc repository into your Cloud Shell session terminal:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
  3. Change to the cloud-dataproc/spark-hbase directory:

    cd cloud-dataproc/spark-hbase
    Sample output:
     user-name 
    @cloudshell:~/cloud-dataproc/spark-hbase ( project-id 
    )$
  4. Submit the Dataproc job.

Java

  1. Set component versions in pom.xml file.
    1. The Dataproc 2.0.x release versions page lists the Scala, Spark, and HBase component versions installed with the most recent and last four image 2.0 subminor versions.
      1. To find the subminor version of your 2.0 image version cluster, click the cluster name on the Clusters page in the Google Cloud console to open the Cluster detailspage, where the cluster Image versionis listed.
    2. Alternatively, you can run the following commands in an SSH session terminal from the master node of your cluster to determine component versions:
      1. Check scala version:
        scala -version
      2. Check Spark version (control-D to exit):
        spark-shell
      3. Check HBase version:
        hbase version
      4. Identify the Spark, Scala, and HBase version dependencies in the Maven pom.xml :
        <properties>
          <scala.version> scala full version (for example, 2.12.14) 
        </scala.version>
          <scala.main.version> scala main version (for example, 2.12) 
        </scala.main.version>
          <spark.version> spark version (for example, 3.1.2) 
        </spark.version>
          <hbase.client.version> hbase version (for example, 2.2.7) 
        </hbase.client.version>
          <hbase-spark.version> 1.0.0 
        (the current Apache HBase Spark Connector version)>
        </properties>
        Note: The hbase-spark.version is the current Spark HBase connector version; leave this version number unchanged.
    3. Edit the pom.xml file in the Cloud Shell editor to insert the the correct Scala, Spark, and HBase version numbers. Click Open Terminalwhen you finish editing to return to the Cloud Shell terminal command line.
      cloudshell edit .
    4. Switch to Java 8 in Cloud Shell. This JDK version is needed to build the code (you can ignore any plugin warning messages):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
    5. Verify Java 8 installation:
      java -version
      Sample output:
      openjdk version "1.8..."
  2. Build the jar file:
    mvn clean package
    The .jar file is placed in the /target subdirectory (for example, target/spark-hbase-1.0-SNAPSHOT.jar .
  3. Submit the job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/ filename 
    .jar \
        --region= cluster-region 
    \
        --cluster= cluster-name 
    
    • --jars : Insert the name of your .jar file after "target/" and before ".jar".
    • If you did not set the Spark driver and executor HBase classpaths when you created your cluster , you must set them with each job submission by including the following ‑‑properties flag in you job submit command:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/&ast;,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/&ast;'
  4. View HBase table output in the Cloud Shell session terminal output:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+

Python

  1. Submit the job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region= cluster-region 
    \
        --cluster= cluster-name 
    
    • If you did not set the Spark driver and executor HBase classpaths when you created your cluster , you must set them with each job submission by including the following ‑‑properties flag in you job submit command:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/&ast;,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/&ast;'
  2. View HBase table output in the Cloud Shell session terminal output:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+

Scan the HBase table

You can scan the content of your HBase table by running the following commands in the master node SSH session terminal that you opened in Verify connector installation :

  1. Open the HBase shell:
    hbase shell
  2. Scan 'my-table':
    scan 'my_table'
    Sample output:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds

Clean up

After you finish the tutorial, you can clean up the resources that you created so that they stop using quota and incurring charges. The following sections describe how to delete or turn off these resources.

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the cluster

  • To delete your cluster:
    gcloud dataproc clusters delete cluster-name 
    \
        --region=${REGION}
Design a Mobile Site
View Site in Mobile | Classic
Share by: