Run Spark on Ray cluster on Vertex AI

The RayDP Python library makes it possible to run Spark on a Ray cluster. This document covers installing, configuring and running RayDP on Ray on Vertex AI (Ray cluster on Vertex AI).

Installation

Ray on Vertex AI enables users to run their applications using the open source Ray framework . RayDP provides APIs for running Spark on Ray. The prebuilt container images available to create a Ray cluster on Vertex AI don't come with RayDP pre-installed. This means you must create a custom Ray cluster on Vertex AI image for your Ray cluster on Vertex AI to run RayDP applications on Ray cluster on Vertex AI. The following section explains how to build a RayDP custom image.

Build a Ray on Vertex AI custom container image

Use this dockerfile to create a custom container image for Ray on Vertex AI that has RayDP installed.

FROM  
us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN  
apt-get  
update  
-y  
 \ 
  
 && 
  
pip  
install  
--no-cache-dir  
raydp  
 pyarrow 
 == 
 14 
.0

You can use the latest Ray cluster on Vertex AI prebuilt image for creating the RayDP custom image. You can also install other Python packages that you anticipate you'll use in your applications. The pyarrow==14.0 is due to a dependency constraint of Ray 2.9.3.

Build and push the custom container image

Create a Docker repository in Artifact Registry before you build your custom image (see Work with container images for how to create and configure your Docker repository). After you create the docker repository, build and push the custom container image using the Dockerfile.

docker  
build  
.  
-t  
 [ 
LOCATION ] 
-docker.pkg.dev/ [ 
PROJECT_ID ] 
/ [ 
DOCKER_REPOSITORY ] 
/ [ 
IMAGE_NAME ] 
docker  
push  
 [ 
LOCATION ] 
-docker.pkg.dev/ [ 
PROJECT_ID ] 
/ [ 
DOCKER_REPOSITORY ] 
/ [ 
IMAGE_NAME ] 

Where:

  • LOCATION : The Cloud Storage location (for example, us-central1) that you created in your Artifact Registry.
  • PROJECT_ID : Your Google Cloud project ID.
  • DOCKER_REPOSITORY : The name of the docker repository that you created.
  • IMAGE_NAME : The name of your custom container images.

Create a Ray cluster on Vertex AI

Use the custom container image built in the previous step to create a Ray cluster on Vertex AI. You can use the Vertex AI SDK for Python for creating a Ray cluster on Vertex AI.

If you haven't done so yet, install the required Python libraries.

pip  
install  
--quiet  
google-cloud-aiplatform  
 \ 
  
ray [ 
all ]== 
 2 
.9.3  
 \ 
  
google-cloud-aiplatform [ 
ray ] 

Configure Head and Worker nodes and create the cluster using Vertex AI SDK for Python. For example:

import  
logging
import  
ray
from  
google.cloud  
import  
aiplatform
from  
google.cloud.aiplatform  
import  
vertex_ray
from  
vertex_ray  
import  
Resources head_node_type 
  
 = 
  
Resources ( 
  
 machine_type 
 = 
 "n1-standard-16" 
,  
 node_count 
 = 
 1 
,  
 custom_image 
 =[ 
CUSTOM_CONTAINER_IMAGE_URI ] 
, ) 
 worker_node_types 
  
 = 
  
 [ 
Resources ( 
  
 machine_type 
 = 
 "n1-standard-8" 
,  
 node_count 
 = 
 2 
,  
 custom_image 
 =[ 
CUSTOM_CONTAINER_IMAGE_URI ] 
, )] 
 ray_cluster_resource_name 
  
 = 
  
vertex_ray.create_ray_cluster ( 
  
 head_node_type 
 = 
head_node_type,  
 worker_node_types 
 = 
worker_node_types,  
 cluster_name 
 =[ 
CLUSTER_NAME ] 
, ) 

Where:

  • CUSTOM_CONTAINER_IMAGE_URI : The URI of the custom container image pushed to Artifact Registry.
  • CLUSTER_NAME : The name of your Ray cluster on Vertex AI.

Spark on Ray cluster on Vertex AI

Before you run your Spark application, create a Spark session using the RayDP API. You can use the Ray client for doing this interactively or use the Ray job API. The Ray job API is recommended, especially for production and long-running applications. The RayDP API provides parameters to configure the Spark session, as well as supporting Spark Configuration . Learn more about the RayDP API for creating Spark Session see Spark master actors node affinity .

RayDP with Ray client

You can use Ray Task or Actor to create a Spark cluster and session on the Ray cluster on Vertex AI. Ray Task, or Actor, is required to use a Ray Client to create a Spark session on the Ray cluster on Vertex AI. The following code shows how a Ray Actor can create a Spark Session, run a Spark application, and stop a Spark cluster on a Ray cluster on Vertex AI using RayDP.

To interactively connect to the Ray cluster on Vertex AI, see Connect to a Ray cluster through Ray Client

@ray.remote
class  
SparkExecutor:  
import  
pyspark  
spark:  
pyspark.sql.SparkSession  
 = 
  
None  
def  
__init__ ( 
self ) 
:  
import  
ray  
import  
raydp  
self.spark  
 = 
  
raydp.init_spark ( 
  
 app_name 
 = 
 "RAYDP ACTOR EXAMPLE" 
,  
 num_executors 
 = 
 1 
,  
 executor_cores 
 = 
 1 
,  
 executor_memory 
 = 
 "500M" 
,  
 ) 
  
def  
get_data ( 
self ) 
:  
 df 
  
 = 
  
self.spark.createDataFrame ( 
  
 [ 
  
 ( 
 "sue" 
,  
 32 
 ) 
,  
 ( 
 "li" 
,  
 3 
 ) 
,  
 ( 
 "bob" 
,  
 75 
 ) 
,  
 ( 
 "heo" 
,  
 13 
 ) 
,  
 ] 
,  
 [ 
 "first_name" 
,  
 "age" 
 ] 
,  
 ) 
  
 return 
  
df.toJSON () 
.collect () 
  
def  
stop_spark ( 
self ) 
:  
import  
raydp  
raydp.stop_spark () 
 s 
  
 = 
  
SparkExecutor.remote () 
 data 
  
 = 
  
ray.get ( 
s.get_data.remote ()) 
print ( 
data ) 
ray.get ( 
s.stop_spark.remote ()) 

RayDP with Ray Job API

Ray client is useful for small experiments that require interactive connection with the Ray cluster. The Ray Job API is the recommended way to run long-running and production jobs on a Ray cluster. This also applies to running Spark applications on the Ray cluster on Vertex AI.

Create a Python script that contains your Spark application code. For example:

import  
pyspark
import  
raydp

def  
get_data ( 
spark:  
pyspark.sql.SparkSession ) 
:  
 df 
  
 = 
  
spark.createDataFrame ( 
  
 [ 
  
 ( 
 "sue" 
,  
 32 
 ) 
,  
 ( 
 "li" 
,  
 3 
 ) 
,  
 ( 
 "bob" 
,  
 75 
 ) 
,  
 ( 
 "heo" 
,  
 13 
 ) 
,  
 ] 
,  
 [ 
 "first_name" 
,  
 "age" 
 ] 
,  
 ) 
  
 return 
  
df.toJSON () 
.collect () 
def  
stop_spark () 
:  
raydp.stop_spark () 
 if 
  
 __name__ 
  
 == 
  
 '__main__' 
:  
 spark 
  
 = 
  
raydp.init_spark ( 
  
 app_name 
 = 
 "RAYDP JOB EXAMPLE" 
,  
 num_executors 
 = 
 1 
,  
 executor_cores 
 = 
 1 
,  
 executor_memory 
 = 
 "500M" 
,  
 ) 
  
print ( 
get_data ( 
spark )) 
  
stop_spark () 

Submit the job to run the python script using Ray Job API. For example:

from  
ray.job_submission  
import  
JobSubmissionClient client 
  
 = 
  
JobSubmissionClient ( 
RAY_ADDRESS ) 
 job_id 
  
 = 
  
client.submit_job ( 
  
 # Entrypoint shell command to execute 
  
 entrypoint 
 = 
 "python [SCRIPT_NAME].py" 
,  
 # Path to the local directory that contains the python script file. 
  
 runtime_env 
 ={ 
  
 "working_dir" 
:  
 "." 
,  
 } 
 ) 

Where:

  • SCRIPT_NAME : The filename of the script that you created.

Reading Cloud Storage files from Spark application

It's common practice to store data files in a Google Cloud Storage bucket. You can read these files in multiple ways from a Spark application that's running on the Ray cluster on Vertex AI. This section explains two techniques for reading Cloud Storage files from Spark applications running on Ray Cluster on Vertex AI.

Use the Google Cloud Storage Connector

You can use the Google Cloud Connector for Hadoop to read files from a Cloud Storage bucket from your Spark application. After you create a Spark session using RayDP, you can read files using a few configuration parameter. The following code shows how to read a CSV file stored in a Cloud Storage bucket from a Spark application on the Ray cluster on Vertex AI.

import  
raydp spark 
  
 = 
  
raydp.init_spark ( 
  
 app_name 
 = 
 "RayDP Cloud Storage Example 1" 
,  
 configs 
 ={ 
  
 "spark.jars" 
:  
 "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar" 
,  
 "spark.hadoop.fs.AbstractFileSystem.gs.impl" 
:  
 "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" 
,  
 "spark.hadoop.fs.gs.impl" 
:  
 "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" 
,  
 } 
,  
 num_executors 
 = 
 2 
,  
 executor_cores 
 = 
 4 
,  
 executor_memory 
 = 
 "500M" 
, ) 
 spark_df 
  
 = 
  
spark.read.csv ([ 
GCS_FILE_URI ] 
,  
 header 
  
 = 
  
True,  
 inferSchema 
  
 = 
  
True ) 

Where:

  • GCS_FILE_URI : The URI of a file stored in a Cloud Storage bucket. For example: gs://my-bucket/my-file.csv.

Use Ray data

The Google Cloud connector provides a way to read files from a Google Cloud bucket and it may be sufficient for most use cases. You might want to use Ray Data to read files from the Google Cloud bucket when you need to use Ray's distributed processing for reading data, or when you face issues reading Google Cloud file with Google Cloud connector. This could possibly happen because of Java dependency conflicts when some other application dependencies are added to the Spark Java classpath using either spark.jars.packages or spark.jars .

import  
raydp
import  
ray spark 
  
 = 
  
raydp.init_spark ( 
  
 app_name 
 = 
 "RayDP Cloud Storage Example 2" 
,  
 configs 
 ={ 
  
 "spark.jars.packages" 
:  
 "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3" 
,  
 "spark.jars.repositories" 
:  
 "https://mmlspark.azureedge.net/maven" 
,  
 "spark.jars" 
:  
 "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar" 
,  
 "spark.hadoop.fs.AbstractFileSystem.gs.impl" 
:  
 "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" 
,  
 "spark.hadoop.fs.gs.impl" 
:  
 "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" 
,  
 } 
,  
 num_executors 
 = 
 2 
,  
 executor_cores 
 = 
 4 
,  
 executor_memory 
 = 
 "500M" 
, ) 
 # This doesn't work even though the Cloud Storage connector Jar and other parameters have 
been  
added  
to  
the  
Spark  
configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) 
 ray_dataset 
  
 = 
  
ray.data.read_csv ( 
GCS_FILE_URI ) 
 spark_df 
  
 = 
  
ray_dataset.to_spark ( 
spark ) 

Pyspark Pandas UDF on Ray cluster on Vertex AI

Pyspark Pandas UDFs sometimes require additional code when you use them in your Spark application running on a Ray cluster on Vertex AI. This is usually required when the Pandas UDF uses a Python library that isn't available on the Ray cluster on Vertex AI. You can package the Python dependencies of an application using the runtime environment with the Ray Job API. After you submit the Ray job to the cluster, Ray installs those dependencies in the Python virtual environment that it creates for running the job. The Pandas UDFs , however, don't use the same virtual environment. Instead, they use the default Python System environment. If that dependency isn't available in the System environment, you might need to install it within your Pandas UDF. In the following example, install the statsmodels library within the UDF.

import  
pandas  
as  
pd
import  
pyspark
import  
raydp
from  
pyspark.sql.functions  
import  
pandas_udf
from  
pyspark.sql.types  
import  
StringType

def  
test_udf ( 
spark:  
pyspark.sql.SparkSession ) 
:  
import  
pandas  
as  
pd  
 df 
  
 = 
  
spark.createDataFrame ( 
pd.read_csv ( 
 "https://www.datavis.ca/gallery/guerry/guerry.csv" 
 )) 
  
 return 
  
df.select ( 
func ( 
 'Lottery' 
, 'Literacy' 
,  
 'Pop1831' 
 )) 
.collect () 
@pandas_udf ( 
StringType ()) 
def  
func ( 
s1:  
pd.Series,  
s2:  
pd.Series,  
s3:  
pd.Series ) 
  
->  
str:  
import  
numpy  
as  
np  
import  
subprocess  
import  
sys  
subprocess.check_call ([ 
sys.executable,  
 "-m" 
,  
 "pip" 
,  
 "install" 
,  
 "statsmodels" 
 ]) 
  
import  
statsmodels.api  
as  
sm  
import  
statsmodels.formula.api  
as  
smf  
 d 
  
 = 
  
 { 
 'Lottery' 
:  
s1,  
 'Literacy' 
:  
s2,  
 'Pop1831' 
:  
s3 } 
  
 data 
  
 = 
  
pd.DataFrame ( 
d ) 
  
 # Fit regression model (using the natural log of one of the regressors) 
  
 results 
  
 = 
  
smf.ols ( 
 'Lottery ~ Literacy + np.log(Pop1831)' 
,  
 data 
 = 
data ) 
.fit () 
  
 return 
  
results.summary () 
.as_csv () 
 if 
  
 __name__ 
  
 == 
  
 '__main__' 
:  
 spark 
  
 = 
  
raydp.init_spark ( 
  
 app_name 
 = 
 "RayDP UDF Example" 
,  
 num_executors 
 = 
 2 
,  
 executor_cores 
 = 
 4 
,  
 executor_memory 
 = 
 "1500M" 
,  
 ) 
  
print ( 
test_udf ( 
spark )) 
  
raydp.stop_spark () 
Create a Mobile Website
View Site in Mobile | Classic
Share by: