Use Ray on Vertex AI with BigQuery

When you run a Ray application on Vertex AI, use BigQuery as your cloud database. This section covers how to read from and write to a BigQuery database from your Ray cluster on Vertex AI. The steps in this section assume that you use the Vertex AI SDK for Python.

To read from a BigQuery dataset, create a new BigQuery dataset or use an existing dataset.

Import and initialize Ray on Vertex AI client

If you're connected to your Ray cluster on Vertex AI, restart your kernel and run the following code. The runtime_env variable is necessary at connection time to run BigQuery commands.

 import 
  
 ray 
 from 
  
 google.cloud 
  
 import 
 aiplatform 
 # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. 
 address 
 = 
 'vertex_ray:// 
 {} 
 ' 
 . 
 format 
 ( 
 CLUSTER_RESOURCE_NAME 
 ) 
 runtime_env 
 = 
 { 
 "pip" 
 : 
 [ 
 "google-cloud-aiplatform[ray]" 
 , 
 "ray==2.47.1" 
 ] 
 } 
 ray 
 . 
  init 
 
 ( 
 address 
 = 
 address 
 , 
 runtime_env 
 = 
 runtime_env 
 ) 

Read data from BigQuery

Read data from your BigQuery dataset. A Ray Task must perform the read operation.

 aiplatform 
 . 
 init 
 ( 
 project 
 = 
  PROJECT_ID 
 
 , 
 location 
 = 
  LOCATION 
 
 ) 
 @ray 
 . 
 remote 
 def 
  
 run_remotely 
 (): 
 import 
  
 vertex_ray 
 dataset 
 = 
  DATASET 
 
 parallelism 
 = 
  PARALLELISM 
 
 query 
 = 
  QUERY 
 
 ds 
 = 
 vertex_ray 
 . 
 data 
 . 
 read_bigquery 
 ( 
 dataset 
 = 
 dataset 
 , 
 parallelism 
 = 
 parallelism 
 , 
 query 
 = 
 query 
 ) 
 ds 
 . 
 materialize 
 () 

Where:

  • PROJECT_ID : Google Cloud project ID. Find the project ID in the Google Cloud console welcome page.

  • LOCATION : The location where the Dataset is stored. For example, us-central1 .

  • DATASET : BigQuery dataset. It must be in the format dataset.table . Set to None if you provide a query.

  • PARALLELISM : An integer that influences how many read tasks are created in parallel. There may be fewer read streams created than you requested.

  • QUERY : A string containing a SQL query to read from BigQuery database. Set to None if no query is required.

Transform data

Update and delete rows and columns from your BigQuery tables using pyarrow or pandas . If you want to use pandas transformations, keep the input type as pyarrow and convert to pandas within the user-defined function (UDF) so you can catch any pandas conversion type errors within the UDF. A Ray Task must perform the transformation.

 @ray 
 . 
 remote 
 def 
  
 run_remotely 
 (): 
 # BigQuery Read first 
 import 
  
 pandas 
  
 as 
  
 pd 
 import 
  
 pyarrow 
  
 as 
  
 pa 
 def 
  
 filter_batch 
 ( 
 table 
 : 
 pa 
 . 
 Table 
 ) 
 -> 
 pa 
 . 
 Table 
 : 
 df 
 = 
 table 
 . 
 to_pandas 
 ( 
 types_mapper 
 = 
 { 
 pa 
 . 
 int64 
 (): 
 pd 
 . 
 Int64Dtype 
 ()} 
 . 
 get 
 ) 
 # PANDAS_TRANSFORMATIONS_HERE 
 return 
 pa 
 . 
 Table 
 . 
 from_pandas 
 ( 
 df 
 ) 
 ds 
 = 
 ds 
 . 
 map_batches 
 ( 
 filter_batch 
 , 
 batch_format 
 = 
 "pyarrow" 
 ) 
 . 
 random_shuffle 
 () 
 ds 
 . 
 materialize 
 () 
 # You can repartition before writing to determine the number of write blocks 
 ds 
 = 
 ds 
 . 
 repartition 
 ( 
 4 
 ) 
 ds 
 . 
 materialize 
 () 

Write data to BigQuery

Insert data to your BigQuery dataset. A Ray Task must perform the write.

 @ray 
 . 
 remote 
 def 
  
 run_remotely 
 (): 
 # BigQuery Read and optional data transformation first 
 dataset 
 = 
  DATASET 
 
 vertex_ray 
 . 
 data 
 . 
 write_bigquery 
 ( 
 ds 
 , 
 dataset 
 = 
 dataset 
 ) 

Where:

  • DATASET : BigQuery dataset. The dataset must be in the format dataset.table .

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: