The template for Bigtable to Vertex AI Vector Search files on Cloud Storage creates a batch pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in the JSON format. Use this template for vector embeddings.
Pipeline requirements
- The Bigtable table must exist.
- The output Cloud Storage bucket must exist before you run the pipeline.
Template parameters
Required parameters
- bigtableProjectId: The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
- bigtableInstanceId: The ID of the Bigtable instance that contains the table.
- bigtableTableId: The ID of the Bigtable table to read from.
- outputDirectory: The Cloud Storage path where the output JSON files are stored. For example,
gs://your-bucket/your-path/
. - idColumn: The fully qualified column name where the ID is stored. In the format
cf:col
or_key
. - embeddingColumn: The fully qualified column name where the embeddings are stored. In the format
cf:col
or_key
.
Optional parameters
- filenamePrefix: The prefix of the JSON filename. For example:
table1-
. If no value is provided, defaults topart
. - crowdingTagColumn: The fully qualified column name where the crowding tag is stored. In the format
cf:col
or_key
. - embeddingByteSize: The byte size of each entry in the embeddings array. For float, use the value
4
. For double, use the value8
. Defaults to4
. - allowRestrictsMappings: The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format
cf:col->alias
. - denyRestrictsMappings: The comma-separated, fully qualified column names for the columns to use as the deny restricts, with their aliases. In the format
cf:col->alias
. - intNumericRestrictsMappings: The comma-separated, fully qualified column names of the columns to use as integer numeric_restricts, with their aliases. In the format
cf:col->alias
. - floatNumericRestrictsMappings: The comma-separated, fully qualified column names of the columns to use as float (4 bytes) numeric_restricts, with their aliases. In the format
cf:col->alias
. - doubleNumericRestrictsMappings: The comma-separated, fully qualified column names of the columns to use as double (8 bytes) numeric_restricts, with their aliases. In the format
cf:col->alias
. - bigtableAppProfileId: The ID of the Cloud Bigtable app profile to be used for the export. Defaults to: default.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint
, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations .
- From the Dataflow template drop-down menu, select the Cloud Bigtable to Vector Embeddings template.
- In the provided parameter fields, enter your parameter values.
- Click Run job .
gcloud CLI
In your shell or terminal, run the template:
gcloud dataflow jobs run JOB_NAME \ --gcs-location = gs://dataflow-templates- REGION_NAME / VERSION /Cloud_Bigtable_to_Vector_Embeddings \ --project = PROJECT_ID \ --region = REGION_NAME \ --parameters \ bigtableProjectId = BIGTABLE_PROJECT_ID , \ bigtableInstanceId = BIGTABLE_INSTANCE_ID , \ bigtableTableId = BIGTABLE_TABLE_ID , \ filenamePrefix = FILENAME_PREFIX , \ idColumn = ID_COLUMN , \ embeddingColumn = EMBEDDING_COLUMN , \
Replace the following:
-
JOB_NAME
: a unique job name of your choice -
VERSION
: the version of the template that you want to useYou can use the following values:
-
latest
to use the latest version of the template, which is available in the non-datedparent folder in the bucket— gs://dataflow-templates- REGION_NAME /latest/ - the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates- REGION_NAME /
-
-
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
-
BIGTABLE_PROJECT_ID
: the project ID -
BIGTABLE_INSTANCE_ID
: the instance ID -
BIGTABLE_TABLE_ID
: the table ID -
FILENAME_PREFIX
: the JSON file prefix -
ID_COLUMN
: the ID column -
EMBEDDING_COLUMN
: the embeddings column
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see projects.templates.launch
.
POST h tt ps : //dataflow.googleapis.com/v1b3/projects/ PROJECT_ID /locations/ LOCATION /templates:launch?gcsPath=gs://dataflow-templates- LOCATION / VERSION /Cloud_Bigtable_to_Vector_Embeddings { "jobName" : " JOB_NAME " , "parameters" : { "bigtableProjectId" : " BIGTABLE_PROJECT_ID " , "bigtableInstanceId" : " BIGTABLE_INSTANCE_ID " , "bigtableTableId" : " BIGTABLE_TABLE_ID " , "filenamePrefix" : " FILENAME_PREFIX " , "idColumn" : " ID_COLUMN " , "embeddingColumn" : " EMBEDDING_COLUMN " , }, "environment" : { "maxWorkers" : "10" } }
Replace the following:
-
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow job -
JOB_NAME
: a unique job name of your choice -
VERSION
: the version of the template that you want to useYou can use the following values:
-
latest
to use the latest version of the template, which is available in the non-datedparent folder in the bucket— gs://dataflow-templates- REGION_NAME /latest/ - the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates- REGION_NAME /
-
-
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
-
BIGTABLE_PROJECT_ID
: the project ID -
BIGTABLE_INSTANCE_ID
: the instance ID -
BIGTABLE_TABLE_ID
: the table ID -
FILENAME_PREFIX
: the JSON file prefix -
ID_COLUMN
: the ID column -
EMBEDDING_COLUMN
: the embeddings column
Template source code
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package
com.google.cloud.teleport.bigtable
;
import
com.google.bigtable.v2.Cell
;
import
com.google.bigtable.v2.Column
;
import
com.google.bigtable.v2.Family
;
import
com.google.bigtable.v2.Row
;
import
com.google.bigtable.v2.RowFilter
;
import
com.google.cloud.teleport.bigtable.BigtableToVectorEmbeddings.Options
;
import
com.google.cloud.teleport.metadata.Template
;
import
com.google.cloud.teleport.metadata.TemplateCategory
;
import
com.google.cloud.teleport.metadata.TemplateParameter
;
import
com.google.cloud.teleport.util.DualInputNestedValueProvider
;
import
com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput
;
import
com.google.gson.stream.JsonWriter
;
import
com.google.protobuf. ByteString
;
import
java.io.IOException
;
import
java.io.StringWriter
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
;
import
org.apache.beam.sdk.Pipeline
;
import
org.apache.beam.sdk.PipelineResult
;
import
org.apache.beam.sdk.io.FileSystems
;
import
org.apache.beam.sdk.io.TextIO
;
import
org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
;
import
org.apache.beam.sdk.io.gcp.bigtable.BigtableIO
;
import
org.apache.beam.sdk.options.Default
;
import
org.apache.beam.sdk.options.PipelineOptions
;
import
org.apache.beam.sdk.options.PipelineOptionsFactory
;
import
org.apache.beam.sdk.options.Validation.Required
;
import
org.apache.beam.sdk.options.ValueProvider
;
import
org.apache.beam.sdk.transforms.MapElements
;
import
org.apache.beam.sdk.transforms.SerializableFunction
;
import
org.apache.beam.sdk.transforms.SimpleFunction
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.hadoop.hbase.util.Bytes
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to JSON files in GCS,
* specifically for Vector Embedding purposes. Currently, filtering on Cloud Bigtable table is not
* supported.
*
* <p>Check out <a href=
* "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_Vector_Embeddings.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template
(
name
=
"Cloud_Bigtable_to_Vector_Embeddings"
,
category
=
TemplateCategory
.
BATCH
,
displayName
=
"Cloud Bigtable to Vector Embeddings"
,
description
=
"The Bigtable to Vector Embedding template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in JSON format, for vector embeddings"
,
optionsClass
=
Options
.
class
,
documentation
=
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-vector-embeddings"
,
contactInformation
=
"https://cloud.google.com/support"
,
requirements
=
{
"The Bigtable table must exist."
,
"The output Cloud Storage bucket must exist before running the pipeline."
})
public
class
BigtableToVectorEmbeddings
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
BigtableToVectorEmbeddings
.
class
);
/** Options for the export pipeline. */
public
interface
Options
extends
PipelineOptions
{
@TemplateParameter.ProjectId
(
order
=
1
,
groupName
=
"Source"
,
description
=
"Project ID"
,
helpText
=
"The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from."
)
ValueProvider<String>
getBigtableProjectId
();
@SuppressWarnings
(
"unused"
)
void
setBigtableProjectId
(
ValueProvider<String>
projectId
);
@TemplateParameter.Text
(
order
=
2
,
groupName
=
"Source"
,
regexes
=
{
"[a-z][a-z0-9\\-]+[a-z0-9]"
},
description
=
"Instance ID"
,
helpText
=
"The ID of the Bigtable instance that contains the table."
)
ValueProvider<String>
getBigtableInstanceId
();
@SuppressWarnings
(
"unused"
)
void
setBigtableInstanceId
(
ValueProvider<String>
instanceId
);
@TemplateParameter.Text
(
order
=
3
,
groupName
=
"Source"
,
regexes
=
{
"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"
},
description
=
"Table ID"
,
helpText
=
"The ID of the Bigtable table to read from."
)
ValueProvider<String>
getBigtableTableId
();
@SuppressWarnings
(
"unused"
)
void
setBigtableTableId
(
ValueProvider<String>
tableId
);
@TemplateParameter.GcsWriteFolder
(
order
=
4
,
groupName
=
"Target"
,
description
=
"Cloud Storage directory for storing JSON files"
,
helpText
=
"The Cloud Storage path where the output JSON files are stored."
,
example
=
"gs://your-bucket/your-path/"
)
@Required
ValueProvider<String>
getOutputDirectory
();
@SuppressWarnings
(
"unused"
)
void
setOutputDirectory
(
ValueProvider<String>
outputDirectory
);
@TemplateParameter.Text
(
order
=
5
,
groupName
=
"Target"
,
optional
=
true
,
description
=
"JSON file prefix"
,
helpText
=
"The prefix of the JSON filename. For example: `table1-`. If no value is provided, defaults to `part`."
)
@Default.String
(
"part"
)
ValueProvider<String>
getFilenamePrefix
();
@SuppressWarnings
(
"unused"
)
void
setFilenamePrefix
(
ValueProvider<String>
filenamePrefix
);
@TemplateParameter.Text
(
order
=
6
,
description
=
"ID column"
,
helpText
=
"The fully qualified column name where the ID is stored. In the format `cf:col` or `_key`."
)
ValueProvider<String>
getIdColumn
();
@SuppressWarnings
(
"unused"
)
void
setIdColumn
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
7
,
description
=
"Embedding column"
,
helpText
=
"The fully qualified column name where the embeddings are stored. In the format `cf:col` or `_key`."
)
ValueProvider<String>
getEmbeddingColumn
();
@SuppressWarnings
(
"unused"
)
void
setEmbeddingColumn
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
8
,
optional
=
true
,
description
=
"Crowding tag column"
,
helpText
=
"The fully qualified column name where the crowding tag is stored. In the format `cf:col` or `_key`."
)
ValueProvider<String>
getCrowdingTagColumn
();
@SuppressWarnings
(
"unused"
)
void
setCrowdingTagColumn
(
ValueProvider<String>
value
);
@TemplateParameter.Integer
(
order
=
9
,
optional
=
true
,
description
=
"The byte size of the embeddings array. Can be 4 or 8."
,
helpText
=
"The byte size of each entry in the embeddings array. For float, use the value `4`. For double, use the value `8`. Defaults to `4`."
)
@Default.Integer
(
4
)
ValueProvider<Integer>
getEmbeddingByteSize
();
@SuppressWarnings
(
"unused"
)
void
setEmbeddingByteSize
(
ValueProvider<Integer>
value
);
@TemplateParameter.Text
(
order
=
10
,
optional
=
true
,
description
=
"Allow restricts mappings"
,
helpText
=
"The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format `cf:col->alias`."
)
ValueProvider<String>
getAllowRestrictsMappings
();
@SuppressWarnings
(
"unused"
)
void
setAllowRestrictsMappings
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
11
,
optional
=
true
,
description
=
"Deny restricts mappings"
,
helpText
=
"The comma-separated, fully qualified column names for the columns to use as the deny restricts, with their aliases. In the format `cf:col->alias`."
)
ValueProvider<String>
getDenyRestrictsMappings
();
@SuppressWarnings
(
"unused"
)
void
setDenyRestrictsMappings
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
12
,
optional
=
true
,
description
=
"Integer numeric restricts mappings"
,
helpText
=
"The comma-separated, fully qualified column names of the columns to use as integer numeric_restricts, with their aliases. In the format `cf:col->alias`."
)
ValueProvider<String>
getIntNumericRestrictsMappings
();
@SuppressWarnings
(
"unused"
)
void
setIntNumericRestrictsMappings
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
13
,
optional
=
true
,
description
=
"Float numeric restricts mappings"
,
helpText
=
"The comma-separated, fully qualified column names of the columns to use as float (4 bytes) numeric_restricts, with their aliases. In the format `cf:col->alias`."
)
ValueProvider<String>
getFloatNumericRestrictsMappings
();
@SuppressWarnings
(
"unused"
)
void
setFloatNumericRestrictsMappings
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
14
,
optional
=
true
,
description
=
"Double numeric restricts mappings"
,
helpText
=
"The comma-separated, fully qualified column names of the columns to use as double (8 bytes) numeric_restricts, with their aliases. In the format `cf:col->alias`."
)
ValueProvider<String>
getDoubleNumericRestrictsMappings
();
@SuppressWarnings
(
"unused"
)
void
setDoubleNumericRestrictsMappings
(
ValueProvider<String>
value
);
@TemplateParameter.Text
(
order
=
15
,
regexes
=
{
"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"
},
optional
=
true
,
description
=
"App Profile ID"
,
helpText
=
"The ID of the Cloud Bigtable app profile to be used for the export"
)
@Default.String
(
"default"
)
ValueProvider<String>
getBigtableAppProfileId
();
@SuppressWarnings
(
"unused"
)
void
setBigtableAppProfileId
(
ValueProvider<String>
value
);
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to JSON files in GCS in JSON format,
* for use of Vertex Vector Search.
*
* @param args arguments to the pipeline
*/
public
static
void
main
(
String
[]
args
)
{
Options
options
=
PipelineOptionsFactory
.
fromArgs
(
args
).
withValidation
().
as
(
Options
.
class
);
PipelineResult
result
=
run
(
options
);
// Wait for pipeline to finish only if it is not constructing a template.
if
(
options
.
as
(
DataflowPipelineOptions
.
class
).
getTemplateLocation
()
==
null
)
{
result
.
waitUntilFinish
();
}
LOG
.
info
(
"Completed pipeline setup"
);
}
public
static
PipelineResult
run
(
Options
options
)
{
Pipeline
pipeline
=
Pipeline
.
create
(
PipelineUtils
.
tweakPipelineOptions
(
options
));
BigtableIO
.
Read
read
=
BigtableIO
.
read
()
.
withProjectId
(
options
.
getBigtableProjectId
())
.
withInstanceId
(
options
.
getBigtableInstanceId
())
.
withAppProfileId
(
options
.
getBigtableAppProfileId
())
.
withTableId
(
options
.
getBigtableTableId
())
.
withRowFilter
(
RowFilter
.
newBuilder
().
setCellsPerColumnLimitFilter
(
1
).
build
());
// Do not validate input fields if it is running as a template.
if
(
options
.
as
(
DataflowPipelineOptions
.
class
).
getTemplateLocation
()
!=
null
)
{
read
=
read
.
withoutValidation
();
}
// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String>
filePathPrefix
=
DualInputNestedValueProvider
.
of
(
options
.
getOutputDirectory
(),
options
.
getFilenamePrefix
(),
new
SerializableFunction<TranslatorInput<String
,
String
> ,
String
> ()
{
@Override
public
String
apply
(
TranslatorInput<String
,
String
>
input
)
{
return
FileSystems
.
matchNewResource
(
input
.
getX
(),
true
)
.
resolve
(
input
.
getY
(),
StandardResolveOptions
.
RESOLVE_FILE
)
.
toString
();
}
});
pipeline
.
apply
(
"Read from Bigtable"
,
read
)
.
apply
(
"Transform to JSON"
,
MapElements
.
via
(
new
BigtableToVectorEmbeddingsFn
(
options
.
getIdColumn
(),
options
.
getEmbeddingColumn
(),
options
.
getEmbeddingByteSize
(),
options
.
getCrowdingTagColumn
(),
options
.
getAllowRestrictsMappings
(),
options
.
getDenyRestrictsMappings
(),
options
.
getIntNumericRestrictsMappings
(),
options
.
getFloatNumericRestrictsMappings
(),
options
.
getDoubleNumericRestrictsMappings
())))
.
apply
(
"Write to storage"
,
TextIO
.
write
().
to
(
filePathPrefix
).
withSuffix
(
".json"
));
return
pipeline
.
run
();
}
/** Translates Bigtable {@link Row} to Vector Embeddings JSON. */
static
class
BigtableToVectorEmbeddingsFn
extends
SimpleFunction<Row
,
String
>
{
private
static
final
String
ID_KEY
=
"id"
;
private
static
final
String
EMBEDDING_KEY
=
"embedding"
;
private
static
final
String
RESTRICTS_KEY
=
"restricts"
;
private
static
final
String
NUMERIC_RESTRICTS_KEY
=
"numeric_restricts"
;
private
static
final
String
CROWDING_TAG_KEY
=
"crowding_tag"
;
private
static
final
String
NAMESPACE_KEY
=
"namespace"
;
private
static
final
String
ALLOW_KEY
=
"allow"
;
private
static
final
String
DENY_KEY
=
"deny"
;
private
static
final
String
VALUE_INT_KEY
=
"value_int"
;
private
static
final
String
VALUE_FLOAT_KEY
=
"value_float"
;
private
static
final
String
VALUE_DOUBLE_KEY
=
"value_double"
;
private
String
idColumn
;
private
String
embeddingsColumn
;
private
Integer
embeddingByteSize
;
private
String
crowdingTagColumn
;
private
Map<String
,
String
>
allowRestricts
;
private
Map<String
,
String
>
denyRestricts
;
private
Map<String
,
String
>
intNumericRestricts
;
private
Map<String
,
String
>
floatNumericRestricts
;
private
Map<String
,
String
>
doubleNumericRestricts
;
private
ValueProvider<Integer>
embeddingByteSizeProvider
;
private
ValueProvider<String>
idColumnProvider
;
private
ValueProvider<String>
embeddingsColumnProvider
;
private
ValueProvider<String>
crowdingTagColumnProvider
;
private
ValueProvider<String>
allowRestrictsMappingsProvider
;
private
ValueProvider<String>
denyRestrictsMappingsProvider
;
private
ValueProvider<String>
intNumericRestrictsMappingsProvider
;
private
ValueProvider<String>
floatNumericRestrictsMappingsProvider
;
private
ValueProvider<String>
doubleNumericRestrictsMappingsProvider
;
public
BigtableToVectorEmbeddingsFn
(
ValueProvider<String>
idColumnProvider
,
ValueProvider<String>
embeddingsColumnProvider
,
ValueProvider<Integer>
embeddingByteSizeProvider
,
ValueProvider<String>
crowdingTagColumnProvider
,
ValueProvider<String>
allowRestrictsMappingsProvider
,
ValueProvider<String>
denyRestrictsMappingsProvider
,
ValueProvider<String>
intNumericRestrictsMappingsProvider
,
ValueProvider<String>
floatNumericRestrictsMappingsProvider
,
ValueProvider<String>
doubleNumericRestrictsMappingsProvider
)
{
this
.
idColumnProvider
=
idColumnProvider
;
this
.
embeddingsColumnProvider
=
embeddingsColumnProvider
;
this
.
embeddingByteSizeProvider
=
embeddingByteSizeProvider
;
this
.
crowdingTagColumnProvider
=
crowdingTagColumnProvider
;
this
.
allowRestrictsMappingsProvider
=
allowRestrictsMappingsProvider
;
this
.
denyRestrictsMappingsProvider
=
denyRestrictsMappingsProvider
;
this
.
intNumericRestrictsMappingsProvider
=
intNumericRestrictsMappingsProvider
;
this
.
floatNumericRestrictsMappingsProvider
=
floatNumericRestrictsMappingsProvider
;
this
.
doubleNumericRestrictsMappingsProvider
=
doubleNumericRestrictsMappingsProvider
;
}
@Override
public
String
apply
(
Row
row
)
{
this
.
embeddingByteSize
=
this
.
embeddingByteSizeProvider
.
get
();
if
(
this
.
embeddingByteSize
!=
4
&&
this
.
embeddingByteSize
!=
8
)
{
throw
new
RuntimeException
(
"embeddingByteSize can be either 4 or 8"
);
}
this
.
idColumn
=
this
.
idColumnProvider
.
get
();
this
.
embeddingsColumn
=
this
.
embeddingsColumnProvider
.
get
();
this
.
crowdingTagColumn
=
this
.
crowdingTagColumnProvider
.
get
();
this
.
allowRestricts
=
Optional
.
ofNullable
(
this
.
allowRestricts
)
.
orElse
(
extractColumnsAliases
(
this
.
allowRestrictsMappingsProvider
));
this
.
denyRestricts
=
Optional
.
ofNullable
(
this
.
denyRestricts
)
.
orElse
(
extractColumnsAliases
(
this
.
denyRestrictsMappingsProvider
));
this
.
intNumericRestricts
=
Optional
.
ofNullable
(
this
.
intNumericRestricts
)
.
orElse
(
extractColumnsAliases
(
this
.
intNumericRestrictsMappingsProvider
));
this
.
floatNumericRestricts
=
Optional
.
ofNullable
(
this
.
floatNumericRestricts
)
.
orElse
(
extractColumnsAliases
(
this
.
floatNumericRestrictsMappingsProvider
));
this
.
doubleNumericRestricts
=
Optional
.
ofNullable
(
this
.
doubleNumericRestricts
)
.
orElse
(
extractColumnsAliases
(
this
.
doubleNumericRestrictsMappingsProvider
));
StringWriter
stringWriter
=
new
StringWriter
();
JsonWriter
jsonWriter
=
new
JsonWriter
(
stringWriter
);
VectorEmbeddings
vectorEmbeddings
=
buildObject
(
row
);
try
{
serialize
(
jsonWriter
,
vectorEmbeddings
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
return
stringWriter
.
toString
();
}
private
void
serialize
(
JsonWriter
jsonWriter
,
VectorEmbeddings
vectorEmbeddings
)
throws
IOException
{
jsonWriter
.
beginObject
();
// Required fields.
jsonWriter
.
name
(
ID_KEY
).
value
(
vectorEmbeddings
.
id
);
jsonWriter
.
name
(
EMBEDDING_KEY
);
jsonWriter
.
beginArray
();
if
(
this
.
embeddingByteSize
==
4
)
{
for
(
Float
f
:
vectorEmbeddings
.
floatEmbeddings
)
{
jsonWriter
.
value
(
f
);
}
}
else
if
(
this
.
embeddingByteSize
==
8
)
{
for
(
Double
d
:
vectorEmbeddings
.
doubleEmbeddings
)
{
jsonWriter
.
value
(
d
);
}
}
jsonWriter
.
endArray
();
// Optional fields.
if
(
vectorEmbeddings
.
crowdingTag
!=
""
)
{
jsonWriter
.
name
(
CROWDING_TAG_KEY
).
value
(
vectorEmbeddings
.
crowdingTag
);
}
if
(
vectorEmbeddings
.
restricts
!=
null
&&
!
vectorEmbeddings
.
restricts
.
isEmpty
())
{
jsonWriter
.
name
(
RESTRICTS_KEY
);
jsonWriter
.
beginArray
();
for
(
Restrict
r
:
vectorEmbeddings
.
restricts
)
{
jsonWriter
.
beginObject
();
jsonWriter
.
name
(
NAMESPACE_KEY
).
value
(
r
.
namespace
);
if
(
r
.
allow
!=
null
&&
!
r
.
allow
.
isEmpty
())
{
jsonWriter
.
name
(
ALLOW_KEY
);
jsonWriter
.
beginArray
();
for
(
String
a
:
r
.
allow
)
{
jsonWriter
.
value
(
a
);
}
jsonWriter
.
endArray
();
}
else
if
(
r
.
deny
!=
null
&&
!
r
.
deny
.
isEmpty
())
{
jsonWriter
.
name
(
DENY_KEY
);
jsonWriter
.
beginArray
();
for
(
String
d
:
r
.
deny
)
{
jsonWriter
.
value
(
d
);
}
jsonWriter
.
endArray
();
}
jsonWriter
.
endObject
();
}
jsonWriter
.
endArray
();
}
if
(
vectorEmbeddings
.
numericRestricts
!=
null
&&
!
vectorEmbeddings
.
numericRestricts
.
isEmpty
())
{
jsonWriter
.
name
(
NUMERIC_RESTRICTS_KEY
);
jsonWriter
.
beginArray
();
for
(
NumericRestrict
numericRestrict
:
vectorEmbeddings
.
numericRestricts
)
{
jsonWriter
.
beginObject
();
jsonWriter
.
name
(
NAMESPACE_KEY
).
value
(
numericRestrict
.
namespace
);
switch
(
numericRestrict
.
type
)
{
case
INT
:
jsonWriter
.
name
(
VALUE_INT_KEY
).
value
(
numericRestrict
.
valueInt
);
break
;
case
FLOAT
:
jsonWriter
.
name
(
VALUE_FLOAT_KEY
).
value
(
numericRestrict
.
valueFloat
);
break
;
case
DOUBLE
:
jsonWriter
.
name
(
VALUE_DOUBLE_KEY
).
value
(
numericRestrict
.
valueDouble
);
break
;
}
jsonWriter
.
endObject
();
}
jsonWriter
.
endArray
();
}
jsonWriter
.
endObject
();
}
private
VectorEmbeddings
buildObject
(
Row
row
)
{
VectorEmbeddings
vectorEmbeddings
=
new
VectorEmbeddings
();
maybeAddToObject
(
vectorEmbeddings
,
"_key"
,
row
.
getKey
());
for
(
Family
family
:
row
.
getFamiliesList
())
{
String
familyName
=
family
.
getName
();
for
(
Column
column
:
family
.
getColumnsList
())
{
for
(
Cell
cell
:
column
.
getCellsList
())
{
maybeAddToObject
(
vectorEmbeddings
,
familyName
+
":"
+
column
.
getQualifier
().
toStringUtf8
(),
cell
.
getValue
());
}
}
}
// Assert fields
if
(
StringUtils
.
isEmpty
(
vectorEmbeddings
.
id
))
{
throw
new
RuntimeException
(
String
.
format
(
"'%s' value is missing for row '%s'"
,
ID_KEY
,
row
.
getKey
().
toStringUtf8
()));
}
if
(
this
.
embeddingByteSize
==
4
&&
(
vectorEmbeddings
.
floatEmbeddings
==
null
||
vectorEmbeddings
.
floatEmbeddings
.
isEmpty
()))
{
throw
new
RuntimeException
(
String
.
format
(
"'%s' value is missing for row '%s'"
,
EMBEDDING_KEY
,
row
.
getKey
().
toStringUtf8
()));
}
if
(
this
.
embeddingByteSize
==
8
&&
(
vectorEmbeddings
.
doubleEmbeddings
==
null
||
vectorEmbeddings
.
doubleEmbeddings
.
isEmpty
()))
{
throw
new
RuntimeException
(
String
.
format
(
"'%s' value is missing for row '%s'"
,
EMBEDDING_KEY
,
row
.
getKey
().
toStringUtf8
()));
}
return
vectorEmbeddings
;
}
private
void
maybeAddToObject
(
VectorEmbeddings
vectorEmbeddings
,
String
columnQualifier
,
ByteString
value
)
{
if
(
columnQualifier
.
equals
(
this
.
idColumn
))
{
vectorEmbeddings
.
id
=
value
.
toStringUtf8
();
}
else
if
(
columnQualifier
.
equals
(
this
.
crowdingTagColumn
))
{
vectorEmbeddings
.
crowdingTag
=
value
.
toStringUtf8
();
}
else
if
(
columnQualifier
.
equals
(
this
.
embeddingsColumn
))
{
vectorEmbeddings
.
floatEmbeddings
=
new
ArrayList<Float>
();
vectorEmbeddings
.
doubleEmbeddings
=
new
ArrayList<Double>
();
byte
[]
bytes
=
value
.
toByteArray
();
for
(
int
i
=
0
;
i
<
bytes
.
length
;
i
+=
embeddingByteSize
)
{
if
(
embeddingByteSize
==
4
)
{
vectorEmbeddings
.
floatEmbeddings
.
add
(
Bytes
.
toFloat
(
bytes
,
i
));
}
else
if
(
embeddingByteSize
==
8
)
{
vectorEmbeddings
.
doubleEmbeddings
.
add
(
Bytes
.
toDouble
(
bytes
,
i
));
}
}
}
else
if
(
this
.
allowRestricts
.
containsKey
(
columnQualifier
))
{
vectorEmbeddings
.
addRestrict
(
Restrict
.
allowRestrict
(
allowRestricts
.
get
(
columnQualifier
),
value
));
}
else
if
(
this
.
denyRestricts
.
containsKey
(
columnQualifier
))
{
vectorEmbeddings
.
addRestrict
(
Restrict
.
denyRestrict
(
denyRestricts
.
get
(
columnQualifier
),
value
));
}
else
if
(
this
.
intNumericRestricts
.
containsKey
(
columnQualifier
))
{
vectorEmbeddings
.
addNumericRestrict
(
NumericRestrict
.
intValue
(
intNumericRestricts
.
get
(
columnQualifier
),
value
));
}
else
if
(
this
.
floatNumericRestricts
.
containsKey
(
columnQualifier
))
{
vectorEmbeddings
.
addNumericRestrict
(
NumericRestrict
.
floatValue
(
floatNumericRestricts
.
get
(
columnQualifier
),
value
));
}
else
if
(
this
.
doubleNumericRestricts
.
containsKey
(
columnQualifier
))
{
vectorEmbeddings
.
addNumericRestrict
(
NumericRestrict
.
doubleValue
(
doubleNumericRestricts
.
get
(
columnQualifier
),
value
));
}
}
private
Map<String
,
String
>
extractColumnsAliases
(
ValueProvider<String>
restricts
)
{
Map<String
,
String
>
columnsWithAliases
=
new
HashMap
<> ();
if
(
StringUtils
.
isBlank
(
restricts
.
get
()))
{
return
columnsWithAliases
;
}
String
[]
columnsList
=
restricts
.
get
().
split
(
","
);
for
(
String
columnsWithAlias
:
columnsList
)
{
String
[]
columnWithAlias
=
columnsWithAlias
.
split
(
"->"
);
if
(
columnWithAlias
.
length
==
2
)
{
columnsWithAliases
.
put
(
columnWithAlias
[
0
]
,
columnWithAlias
[
1
]
);
}
}
return
columnsWithAliases
;
}
}
}
// Data model classes.
class
Restrict
{
String
namespace
;
List<String>
allow
;
List<String>
deny
;
static
Restrict
allowRestrict
(
String
namespace
,
ByteString
value
)
{
Restrict
restrict
=
new
Restrict
();
restrict
.
namespace
=
namespace
;
restrict
.
allow
=
new
ArrayList<String>
();
restrict
.
allow
.
add
(
value
.
toStringUtf8
());
return
restrict
;
}
static
Restrict
denyRestrict
(
String
namespace
,
ByteString
value
)
{
Restrict
restrict
=
new
Restrict
();
restrict
.
namespace
=
namespace
;
restrict
.
deny
=
new
ArrayList<String>
();
restrict
.
deny
.
add
(
value
.
toStringUtf8
());
return
restrict
;
}
}
class
NumericRestrict
{
enum
Type
{
INT
,
FLOAT
,
DOUBLE
};
String
namespace
;
Type
type
;
Integer
valueInt
;
Float
valueFloat
;
Double
valueDouble
;
static
NumericRestrict
intValue
(
String
namespace
,
ByteString
value
)
{
NumericRestrict
restrict
=
new
NumericRestrict
();
restrict
.
namespace
=
namespace
;
restrict
.
valueInt
=
Bytes
.
toInt
(
value
.
toByteArray
());
restrict
.
type
=
Type
.
INT
;
return
restrict
;
}
static
NumericRestrict
floatValue
(
String
namespace
,
ByteString
value
)
{
NumericRestrict
restrict
=
new
NumericRestrict
();
restrict
.
namespace
=
namespace
;
restrict
.
valueFloat
=
Bytes
.
toFloat
(
value
.
toByteArray
());
restrict
.
type
=
Type
.
FLOAT
;
return
restrict
;
}
static
NumericRestrict
doubleValue
(
String
namespace
,
ByteString
value
)
{
NumericRestrict
restrict
=
new
NumericRestrict
();
restrict
.
namespace
=
namespace
;
restrict
.
valueDouble
=
Bytes
.
toDouble
(
value
.
toByteArray
());
restrict
.
type
=
Type
.
DOUBLE
;
return
restrict
;
}
}
class
VectorEmbeddings
{
String
id
;
String
crowdingTag
;
List<Float>
floatEmbeddings
;
List<Double>
doubleEmbeddings
;
List<Restrict>
restricts
;
List<NumericRestrict>
numericRestricts
;
void
addRestrict
(
Restrict
restrict
)
{
if
(
this
.
restricts
==
null
)
{
this
.
restricts
=
new
ArrayList<Restrict>
();
}
restricts
.
add
(
restrict
);
}
void
addNumericRestrict
(
NumericRestrict
numericRestrict
)
{
if
(
this
.
numericRestricts
==
null
)
{
this
.
numericRestricts
=
new
ArrayList<NumericRestrict>
();
}
numericRestricts
.
add
(
numericRestrict
);
}
}
What's next
- Learn about Dataflow templates .
- See the list of Google-provided templates .