This Colab demonstrates how to generate embeddings from data and ingest them intoAlloyDB, Google Cloud's fully managed, PostgreSQL-compatible database service. We'll use Apache Beam and Dataflow for scalable data processing.
The goal of this notebook is to make it easy for users to get started with generating embeddings at scale using Apache Beam and storing them in AlloyDB. We focus on building efficient ingestion pipelines that can handle various data sources and embedding models.
Example: Furniture Product Catalog
We'll work with a sample e-commerce dataset representing a furniture product catalog. Each product has:
Structured fields:id,name,category,price
Detailed text descriptions:Longer text describing the product's features.
Additional metadata:material,dimensions
Pipeline Overview
We will build a pipeline to:
Read product data
Convert unstructured product data, toChunk[1]type
Generate Embeddings: Use a pre-trained Hugging Face model (via MLTransform) to create vector embeddings
Write to AlloyDB: Store the embeddings in an AlloyDB vector database
-Chunkis the structured input for generating and ingesting embeddings. -chunk.content.textis the field that is embedded. - Converting toChunkdoes not mean breaking data into smaller pieces, it's simply organizing your data in a standard format for the embedding pipeline. -Chunkallows data to flow seamlessly throughout embedding pipelines.
[1]: Chunk represents an embeddable unit of input. It specifies which fields should be embedded and which fields should be treated as metadata. Converting to Chunk does not necessarily mean breaking your text into smaller pieces - it's primarily about structuring your data for the embedding pipeline. For very long texts that exceed the embedding model's maximum input size, you can optionallyuse Langchain TextSplittersto break the text into smallerChunk's.
Execution Environments
This notebook demonstrates two execution environments:
DirectRunner (Local Execution): All examples in this notebook run on DirectRunner by default, which executes the pipeline locally. This is ideal for development, testing, and processing small datasets.
DataflowRunner (Distributed Execution): TheRun on Dataflowsection demonstrates how to execute the same pipeline on Google Cloud Dataflow for scalable, distributed processing. This is recommended for production workloads and large datasets.
All examples in this notebook can be adapted to run on Dataflow by following the pattern shown in the "Run on Dataflow" section.
Setup and Prerequisites
This example requires:
An AlloyDB instance with pgvector extension and PUBLIC IP enabled
Apache Beam 2.64.0 or later
Install Packages and Dependencies
First, let's install the Python packages required for the embedding and ingestion pipeline:
# Apache Beam with GCP supportpipinstallapache_beam[gcp]>=v2.64.0--quiet# Huggingface sentence-transformers for embedding modelspipinstallsentence-transformers--quiet
Next, let's install google-cloud-alloydb-connector to help set up our test database.
# @title SQLAlchemy + AlloyDB Connector helpers for creating tables and verifying dataimportsqlalchemyfromsqlalchemyimporttext# Import text construct explicitlyfromsqlalchemy.excimportSQLAlchemyError# Import specific exception typefromgoogle.cloud.alloydb.connectorimportConnectordefget_alloydb_engine(instance_uri:str,user:str,password:str,db:str,**connect_kwargs)->sqlalchemy.engine.Engine:"""Creates a SQLAlchemy engine configured for AlloyDB."""connector=Connector()connect_kwargs.setdefault('ip_type','PUBLIC')defget_conn()->sqlalchemy.engine.base.Connection:conn=connector.connect(instance_uri,"pg8000",user=user,password=password,db=db,**connect_kwargs# Pass additional options like ip_type='PUBLIC' if needed)returnconn# Create the SQLAlchemy engine using the connection functionengine=sqlalchemy.create_engine("postgresql+pg8000://",creator=get_conn,)engine.pool.dispose=lambda:connector.close()returnenginedefsetup_alloydb_table_sqlalchemy(instance_uri:str,database:str,table_name:str,table_schema:str,user:str,password:str,**connect_kwargs):"""Set up AlloyDB table with vector extension and proper schema using SQLAlchemy.Args:instance_uri: AlloyDB instance URI (e.g., projects/.../locations/.../clusters/.../instances/...)database: Database nametable_name: Name of the table to create.table_schema: SQL string defining the table columns (e.g., "id SERIAL PRIMARY KEY, embedding VECTOR(768)")user: Database userpassword: Database passwordconnect_kwargs: Additional keyword arguments passed to connector.connect() (e.g., ip_type="PUBLIC")"""engine=Nonetry:engine=get_alloydb_engine(instance_uri,user,password,database,**connect_kwargs)# Use a connection from the poolwithengine.connect()asconnection:# Use execution options for autocommit for DDL statements# Alternatively, execute outside an explicit transaction block (begin())withconnection.execution_options(isolation_level="AUTOCOMMIT"):print("Connected to AlloyDB successfully via SQLAlchemy!")# Create pgvector extension if it doesn't existprint("Creating pgvector extension...")connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))# Drop the table if it existsprint(f"Dropping table{table_name}if exists...")# Use f-string for table name (generally okay for DDL if source is trusted)connection.execute(text(f"DROP TABLE IF EXISTS{table_name};"))# Create the tableprint(f"Creating table{table_name}...")# Use f-string for table name and schema (validate input if necessary)create_sql=f"""CREATE TABLE IF NOT EXISTS{table_name}({table_schema});"""connection.execute(text(create_sql))# Optional: Commit if not using autocommit (SQLAlchemy >= 2.0 often commits implicitly)# connection.commit() # Usually not needed with autocommit or implicit commit behaviorprint("Setup completed successfully using SQLAlchemy!")exceptSQLAlchemyErrorase:print(f"An SQLAlchemy error occurred during setup:{e}")exceptExceptionase:print(f"An unexpected error occurred during setup:{e}")finally:ifengine:engine.dispose()# Close connection pool and connectordeftest_alloydb_connection_sqlalchemy(instance_uri:str,database:str,table_name:str,user:str,password:str,**connect_kwargs):"""Test the AlloyDB connection and verify table/extension using SQLAlchemy.Args:instance_uri: AlloyDB instance URIdatabase: Database nametable_name: Name of the table to check.user: Database userpassword: Database passwordconnect_kwargs: Additional keyword arguments passed to connector.connect()"""engine=Nonetry:engine=get_alloydb_engine(instance_uri,user,password,database,**connect_kwargs)withengine.connect()asconnection:print("Testing connection...")# Simple query to confirm connectionconnection.execute(text("SELECT 1"))print("✓ Connection successful")# Check if table exists using information_schema# Use bind parameters (:tname) for safety, even though it's a table name heretable_exists_query=text("""SELECT EXISTS (SELECT FROM information_schema.tablesWHERE table_schema = 'public' AND table_name = :tname);""")# .scalar() fetches the first column of the first rowtable_exists=connection.execute(table_exists_query,{"tname":table_name}).scalar()iftable_exists:print(f"✓ '{table_name}' table exists")# Check if vector extension is installedext_exists_query=text("""SELECT EXISTS (SELECT FROM pg_extension WHERE extname = 'vector');""")vector_installed=connection.execute(ext_exists_query).scalar()ifvector_installed:print("✓ pgvector extension is installed")else:print("✗ pgvector extension is NOT installed")else:print(f"✗ '{table_name}' table does NOT exist")exceptSQLAlchemyErrorase:print(f"Connection test failed (SQLAlchemy error):{e}")exceptExceptionase:print(f"Connection test failed (Unexpected error):{e}")finally:ifengine:engine.dispose()defverify_embeddings_sqlalchemy(instance_uri:str,database:str,table_name:str,user:str,password:str,**connect_kwargs):"""Connect to AlloyDB using SQLAlchemy and print all rows from the table."""engine=Nonetry:engine=get_alloydb_engine(instance_uri,user,password,database,**connect_kwargs)withengine.connect()asconnection:# Use f-string for table name in SELECT (ensure table_name is controlled)select_query=text(f"SELECT * FROM{table_name};")result=connection.execute(select_query)# Get column names from the result keyscolumns=result.keys()# Fetch all rows as mapping objects (dict-like)rows=result.mappings().all()print(f"\nFound{len(rows)}products in '{table_name}':")print("-"*80)ifnotrows:print("Table is empty.")print("-"*80)else:# Print each rowforrowinrows:forcolincolumns:print(f"{col}:{row[col]}")print("-"*80)exceptSQLAlchemyErrorase:print(f"Failed to verify embeddings (SQLAlchemy error):{e}")# You might want to check specifically for ProgrammingError if the table doesn't exist# from sqlalchemy.exc import ProgrammingError# except ProgrammingError as pe:# print(f"Failed to query table '{table_name}'. Does it exist? Error: {pe}")exceptExceptionase:print(f"Failed to verify embeddings (Unexpected error):{e}")finally:ifengine:engine.dispose()
Create Sample Product Catalog Data
We'll create a typical e-commerce catalog where you might want to:
Generate embeddings for product text
Store vectors alongside product data
Enable vector similarity features
Example product:
{"id":"desk-001","name":"Modern Minimalist Desk","description":"Sleek minimalist desk with clean lines and a spacious work surface. ""Features cable management system and sturdy steel frame. ""Perfect for contemporary home offices and workspaces.","category":"Desks","price":399.99,"material":"Engineered Wood, Steel","dimensions":"60W x 30D x 29H inches"}
Create sample data
PRODUCTS_DATA=[{"id":"desk-001","name":"Modern Minimalist Desk","description":"Sleek minimalist desk with clean lines and a spacious work surface. ""Features cable management system and sturdy steel frame. ""Perfect for contemporary home offices and workspaces.","category":"Desks","price":399.99,"material":"Engineered Wood, Steel","dimensions":"60W x 30D x 29H inches"},{"id":"chair-001","name":"Ergonomic Mesh Office Chair","description":"Premium ergonomic office chair with breathable mesh back, ""adjustable lumbar support, and 4D armrests. Features synchronized ""tilt mechanism and memory foam seat cushion. Ideal for long work hours.","category":"Office Chairs","price":299.99,"material":"Mesh, Metal, Premium Foam","dimensions":"26W x 26D x 48H inches"},{"id":"sofa-001","name":"Contemporary Sectional Sofa","description":"Modern L-shaped sectional with chaise lounge. Upholstered in premium ""performance fabric. Features deep seats, plush cushions, and solid ""wood legs. Perfect for modern living rooms.","category":"Sofas","price":1299.99,"material":"Performance Fabric, Solid Wood","dimensions":"112W x 65D x 34H inches"},{"id":"table-001","name":"Rustic Dining Table","description":"Farmhouse-style dining table with solid wood construction. ""Features distressed finish and trestle base. Seats 6-8 people ""comfortably. Perfect for family gatherings.","category":"Dining Tables","price":899.99,"material":"Solid Pine Wood","dimensions":"72W x 42D x 30H inches"},{"id":"bed-001","name":"Platform Storage Bed","description":"Modern queen platform bed with integrated storage drawers. ""Features upholstered headboard and durable wood slat support. ""No box spring needed. Perfect for maximizing bedroom space.","category":"Beds","price":799.99,"material":"Engineered Wood, Linen Fabric","dimensions":"65W x 86D x 48H inches"}]print(f"""✓ Created PRODUCTS_DATA with{len(PRODUCTS_DATA)}records""")
Importing Pipeline Components
We import the following for configuring our embedding ingestion pipeline:
Chunk, the structured input for generating and ingesting embeddings
AlloyDBConnectionConfigfor configuring database connection information
AlloyDBVectorWriterConfigfor configuring write behavior like schema mapping and conflict resolution
Update embeddings in real-time as information changes
Quick Start: Basic Vector Ingestion
This section shows the simplest way to generate embeddings and store them in AlloyDB.
Create table with default schema
Before running the pipeline, we need a table to store our embeddings:
table_name="default_product_embeddings"table_schema=f"""id VARCHAR PRIMARY KEY,embedding VECTOR(384) NOT NULL,content text,metadata JSONB"""setup_alloydb_table_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,table_schema,DB_USER,DB_PASSWORD)test_alloydb_connection_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,DB_USER,DB_PASSWORD)
Configure Pipeline Components
Now define the components that control the pipeline behavior:
Map products to Chunks
Our data is ingested as product dictionaries
Embedding generation and ingestion processesChunks
We convert each product dictionary to aChunkto configure what text to embed and what to treat as metadata
fromtypingimportDict,Any# The create_chunk function converts our product dictionaries to Chunks.# This doesn't split the text - it simply structures it in the format# expected by the embedding pipeline components.defcreate_chunk(product:Dict[str,Any])->Chunk:"""Convert a product dictionary into a Chunk object.The pipeline components (MLTransform, VectorDatabaseWriteTransform)work with Chunk objects. This function:1. Extracts text we want to embed2. Preserves product data as metadata3. Creates a Chunk in the expected formatArgs:product: Dictionary containing product informationReturns:Chunk: A Chunk object ready for embedding"""returnChunk(content=Content(text=f"{product['name']}:{product['description']}"),# The text that will be embeddedid=product['id'],# Use product ID as chunk IDmetadata=product,# Store all product info in metadata)
Generate embeddings with HuggingFace
We use a local pre-trained Hugging Face model to create vector embeddings from the product descriptions.
The default AlloyDBVectorWriterConfig maps Chunk fields to database columns as:
Database Column
Chunk Field
Description
id
chunk.id
Unique identifier
embedding
chunk.embedding.dense_embedding
Vector representation
content
chunk.content.text
Text that was embedded
metadata
chunk.metadata
Additional data as JSONB
# Configure the language connector so we can connect securlylanguage_connector_config=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI,ip_type="PUBLIC")# Configure the AlloyDBConnectionConfig with language connectorconnection_config=AlloyDBConnectionConfig.with_language_connector(connector_options=language_connector_config,username=DB_USER,password=DB_PASSWORD)alloydb_writer_config=AlloyDBVectorWriterConfig(connection_config=connection_config,table_name=table_name)
Assemble and Run Pipeline
Now we can create our pipeline that:
Takes our product data
Converts each product to a Chunk
Generates embeddings for each Chunk
Stores everything in AlloyDB
importtempfile# Executing on DirectRunner (local execution)withbeam.Pipeline()asp:_=(p|'Create Products'>>beam.Create(PRODUCTS_DATA)|'Convert to Chunks'>>beam.Map(create_chunk)|'Generate Embeddings'>>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(huggingface_embedder)|'Write to AlloyDB'>>VectorDatabaseWriteTransform(alloydb_writer_config))
Verify Embeddings
Let's check what was written to our AlloyDB table:
Convert product data to the Chunk format expected by embedding pipelines
Generate embeddings using a HuggingFace model
Configure and run a basic embedding ingestion pipeline
Store embeddings and metadata in AlloyDB
This basic pattern forms the foundation for all the advanced use cases covered in the following sections.
Quick Start: Run on Dataflow
This section demonstrates how to launch the Quick Start embedding pipeline on Google Cloud Dataflow from the colab. While previous examples used DirectRunner for local execution, Dataflow provides a fully managed, distributed execution environment that is:
Scalable: Automatically scales to handle large datasets
Fault-tolerant: Handles worker failures and ensures exactly-once processing
Fully managed: No need to provision or manage infrastructure
For more in-depth documentation to package your pipeline into a python file and launch a DataFlow job from the command line seeCreate Dataflow pipeline using Python.
Create the AlloyDB table with default schema
Before running the pipeline, we need a table to store our embeddings:
table_name="default_dataflow_product_embeddings"table_schema=f"""id VARCHAR PRIMARY KEY,embedding VECTOR(384) NOT NULL,content text,metadata JSONB"""setup_alloydb_table_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,table_schema,DB_USER,DB_PASSWORD)test_alloydb_connection_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,DB_USER,DB_PASSWORD)
Save our Pipeline to a python file
To launch our pipeline job on DataFlow, we
Add command line arguments for passing pipeline options like AlloyDB credentioals
Save our pipeline code to a local filebasic_ingestion_pipeline.py
file_content="""import apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionsimport argparseimport tempfilefrom apache_beam.ml.transforms.base import MLTransformfrom apache_beam.ml.rag.types import Chunk, Contentfrom apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteTransformfrom apache_beam.ml.rag.ingestion.alloydb import AlloyDBVectorWriterConfig, AlloyDBConnectionConfig, AlloyDBLanguageConnectorConfigfrom apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddingsfrom apache_beam.options.pipeline_options import SetupOptionsPRODUCTS_DATA = [{"id": "desk-001","name": "Modern Minimalist Desk","description": "Sleek minimalist desk with clean lines and a spacious work surface. ""Features cable management system and sturdy steel frame. ""Perfect for contemporary home offices and workspaces.","category": "Desks","price": 399.99,"material": "Engineered Wood, Steel","dimensions": "60W x 30D x 29H inches"},{"id": "chair-001","name": "Ergonomic Mesh Office Chair","description": "Premium ergonomic office chair with breathable mesh back, ""adjustable lumbar support, and 4D armrests. Features synchronized ""tilt mechanism and memory foam seat cushion. Ideal for long work hours.","category": "Office Chairs","price": 299.99,"material": "Mesh, Metal, Premium Foam","dimensions": "26W x 26D x 48H inches"}]def run(argv=None):parser = argparse.ArgumentParser()parser.add_argument('--instance_uri',required=True,help='AlloyDB instance uri')parser.add_argument('--alloydb_database',default='postgres',help='AlloyDB database name')parser.add_argument('--alloydb_table',required=True,help='AlloyDB table name')parser.add_argument('--alloydb_username',required=True,help='AlloyDB user name')parser.add_argument('--alloydb_password',required=True,help='AlloyDB password')known_args, pipeline_args = parser.parse_known_args(argv)pipeline_options = PipelineOptions(pipeline_args)pipeline_options.view_as(SetupOptions).save_main_session = Truewith beam.Pipeline(options=pipeline_options) as p:_ = (p| 'Create Products' >> beam.Create(PRODUCTS_DATA)| 'Convert to Chunks' >> beam.Map(lambda product: Chunk(content=Content(text=f"{product['name']}:{product['description']}"), # The text that will be embeddedid=product['id'], # Use product ID as chunk IDmetadata=product, # Store all product info in metadata))| 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(HuggingfaceTextEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2"))| 'Write to AlloyDB' >> VectorDatabaseWriteTransform(AlloyDBVectorWriterConfig(connection_config=AlloyDBConnectionConfig.with_language_connector(AlloyDBLanguageConnectorConfig(database_name=known_args.alloydb_database, instance_name=known_args.instance_uri),username=known_args.alloydb_username,password=known_args.alloydb_password),table_name=known_args.alloydb_table)))if __name__ == '__main__':run()"""withopen("basic_ingestion_pipeline.py","w")asf:f.write(file_content)
Authenticate with Google Cloud
To launch a pipeline on Google Cloud, authenticate this notebook. Replace<PROJECT_IDwith your Google Cloud project ID
A gcs bucket for staging DataFlow files. Replace<BUCKET_NAME>: the name of a valid Google Cloud Storage bucket. Don't include a gs:// prefix or trailing slashes
Optionally set the Google Cloud region that you want to run Dataflow in. Replace<REGION>with the desired location
AlloyDBprivate IPaddress to which the Datflow worker VM's have access. There are multiple ways toconnectto your AlloyDB instance from Datflow, including
importosBUCKET_NAME=''# @param {type:'string'}REGION='us-central1'# @param {type:'string'}os.environ['BUCKET_NAME']=BUCKET_NAMEos.environ['REGION']=REGION# Save AlloyDB credentioals to environment variablesos.environ['INSTANCE_URI']=INSTANCE_URIos.environ['DATABASE_NAME']=DB_NAMEos.environ['ALLOYDB_USER']=DB_USERos.environ['ALLOYDB_PASSWORD']=DB_PASSWORDNETWORK='default'# @param {type:'string'}SUBNETWORK=''# @param {type:'string'}os.environ['NETWORK']=NETWORKos.environ['SUBNETWORK']=SUBNETWORK
Provide additional Python dependencies to be installed on Worker VM's
We are making use of the HuggingFacesentence-transformerspackage to generate embeddings. Since this package is not installed on Worker VM's by default, we create a requirements.txt file with the additional dependencies to be installed on worker VM's.
Update embeddings in real-time as information changes
Custom Schema with Column Mapping
In this example, we'll create a custom schema that:
Uses different column names
Maps metadata to individual columns
Uses functions to transform values
ColumnSpec and ColumnSpecsBuilder
ColumnSpec specifies how to map data to a database column. For example:
ColumnSpec(column_name="price",# Database columnpython_type=float,# Python Type for the valuevalue_fn=lambdac:c.metadata['price'],# Extract price from Chunk metadata to get actual valuesql_typecast="::decimal"# Optional SQL cast)
creates an INSERT statement like:
INSERTINTOtable(price)VALUES(?::decimal)
where the?placeholder is poulated with the value from our ingested data.
ColumnSpecsBuilderprovides a builder and convenience methods to create theseColumnSpecs:
Core Field Mapping
with_id_spec()=> Insert chunk.id as text in "id" column
Function that returns the model name: "all-MiniLM-L6-v2"
created_at
Function that returns the current timestamp cast to a SQL timestamp
fromapache_beam.ml.rag.ingestion.alloydbimportColumnSpecfromapache_beam.ml.rag.ingestion.alloydbimportColumnSpecsBuilderfromdatetimeimportdatetimecolumn_specs=(ColumnSpecsBuilder()# Write chunk.id to a column named "product_id".with_id_spec(column_name='product_id')# Write chunk.embedding.dense_embedding to a column named "vector_embedding".with_embedding_spec(column_name='vector_embedding')# Write chunk.content.text to a column named "description".with_content_spec(column_name='description')# Write chunk.metadata.['product_name'] to a column named "product_name".add_metadata_field(field='name',column_name='product_name',python_type=str)# Write chunk.metadata.['price'] to a column named "price".add_metadata_field(field='price',column_name='price',python_type=float)# Write chunk.metadata.['category'] to a column named "category".add_metadata_field(field='category',column_name='category',python_type=str)# Write custom field using value_fn to column named "display_text" using# ColumnSpec.text convenience method.add_custom_column_spec(ColumnSpec.text(column_name='display_text',value_fn=lambdachunk:\f"{chunk.metadata['name']}- ${chunk.metadata['price']:.2f}"))# Store model used to generate embedding using ColumnSpec constructor.add_custom_column_spec(ColumnSpec(column_name='model_name',python_type=str,value_fn=lambda_:"all-MiniLM-L6-v2")).add_custom_column_spec(ColumnSpec(column_name='created_at',python_type=str,value_fn=lambda_:datetime.now().isoformat(),sql_typecast="::timestamp")).build())
Assemble and Run Pipeline
Now we can create our pipeline that will:
Take our product data
Convert each product to a Chunk
Generate embeddings for each Chunk
Store everything in AlloyDB with our custom schema configuration
importtempfile# For storing MLTransform artifacts# Executing on DirectRunner (local execution)withbeam.Pipeline()asp:_=(p|'Create Products'>>beam.Create(PRODUCTS_DATA)|'Convert to Chunks'>>beam.Map(lambdaproduct:Chunk(content=Content(text=f"{product['name']}:{product['description']}"),# The text that will be embeddedid=product['id'],# Use product ID as chunk IDmetadata=product,# Store all product info in metadata))|'Generate Embeddings'>>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(HuggingfaceTextEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2"))|'Write to AlloyDB'>>VectorDatabaseWriteTransform(AlloyDBVectorWriterConfig(connection_config=AlloyDBConnectionConfig.with_language_connector(connector_options=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI,ip_type="PUBLIC"),username=DB_USER,password=DB_PASSWORD),table_name=table_name,column_specs=column_specs)))
Verify the Written Embeddings
Let's check what was written to our AlloyDB table:
Update Embeddings and Metadata with Conflict Resolution
This section demonstrates how to handle periodic updates to product descriptions and their embeddings using the default schema. We'll show how embeddings and metadata get updated when product descriptions change.
Create table with desired schema
Let's use the same default schema as in Quick Start:
table_name="mutable_product_embeddings"table_schema=f"""id VARCHAR PRIMARY KEY,embedding VECTOR(384) NOT NULL,content text,metadata JSONB,created_at TIMESTAMP NOT NULL DEFAULT NOW()"""setup_alloydb_table_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,table_schema,DB_USER,DB_PASSWORD)test_alloydb_connection_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,DB_USER,DB_PASSWORD)
Sample Data: Day 1 vs Day 2
PRODUCTS_DATA_DAY1=[{"id":"desk-001","name":"Modern Minimalist Desk","description":"Sleek minimalist desk with clean lines and a spacious work surface. ""Features cable management system and sturdy steel frame.","category":"Desks","price":399.99,"update_timestamp":"2024-02-18"}]PRODUCTS_DATA_DAY2=[{"id":"desk-001",# Same ID as Day 1"name":"Modern Minimalist Desk","description":"Updated: Sleek minimalist desk with built-in wireless charging. ""Features cable management system, sturdy steel frame, and Qi charging pad. ""Perfect for modern tech-enabled workspaces.","category":"Smart Desks",# Category changed"price":449.99,# Price increased"update_timestamp":"2024-02-19"}]
Configure Pipeline Components
Writer with Conflict Resolution
fromapache_beam.ml.rag.ingestion.alloydbimport(AlloyDBVectorWriterConfig,AlloyDBConnectionConfig,ConflictResolution)# Define how to handle conflicts - update all fields when ID matchesconflict_resolution=ConflictResolution(on_conflict_fields="id",# Identify records by IDaction="UPDATE",# Update existing recordsupdate_fields=["embedding","content","metadata"])# Create writer config with conflict resolutionalloydb_writer_config=AlloyDBVectorWriterConfig(connection_config=AlloyDBConnectionConfig.with_language_connector(connector_options=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI,ip_type="PUBLIC"),username=DB_USER,password=DB_PASSWORD),table_name=table_name,conflict_resolution=conflict_resolution,)
# Executing on DirectRunner (local execution)withbeam.Pipeline()asp:_=(p|'Create Day 1 Products'>>beam.Create(PRODUCTS_DATA_DAY1)|'Convert Day 1 to Chunks'>>beam.Map(lambdaproduct:Chunk(content=Content(text=f"{product['name']}:{product['description']}"),# The text that will be embeddedid=product['id'],# Use product ID as chunk IDmetadata=product,# Store all product info in metadata))|'Generate Day1 Embeddings'>>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(HuggingfaceTextEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2"))|'Write Day 1 to AlloyDB'>>VectorDatabaseWriteTransform(alloydb_writer_config))
Verify Initial Data
print("\nAfter Day 1 ingestion:")verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,database=DB_NAME,table_name=table_name,user=DB_USER,password=DB_PASSWORD)
Run Day 2 Pipeline
Now let's process our updated product data:
# Executing on DirectRunner (local execution)withbeam.Pipeline()asp:_=(p|'Create Day 2 Products'>>beam.Create(PRODUCTS_DATA_DAY2)|'Convert Day 2 to Chunks'>>beam.Map(lambdaproduct:Chunk(content=Content(text=f"{product['name']}:{product['description']}"),# The text that will be embeddedid=product['id'],# Use product ID as chunk IDmetadata=product,# Store all product info in metadata))|'Generate Day 2 Embeddings'>>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(HuggingfaceTextEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2"))|'Write Day 2 to AlloyDB'>>VectorDatabaseWriteTransform(alloydb_writer_config))
Verify Updated Data
print("\nAfter Day 2 ingestion:")verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,database=DB_NAME,table_name=table_name,user=DB_USER,password=DB_PASSWORD)
What Changed?
Key points to notice:
The embedding vector changed because the product description was updated
The metadata JSONB field contains the updated category, price, and timestamp
The content field reflects the new description
The original ID remained the same
This pattern allows you to:
Update embeddings when source text changes
Maintain referential integrity with consistent IDs
Track changes through the metadata field
Handle conflicts gracefully using AlloyDB's conflict resolution
Adding Embeddings to Existing Database Records
This section demonstrates how to:
Read existing product data from a database
Generate embeddings for that data
Write the embeddings back to the database
table_name="existing_products"table_schema="""id VARCHAR PRIMARY KEY,title VARCHAR NOT NULL,description TEXT,price DECIMAL,embedding VECTOR(384)"""
Postgres helpers for inserting initial records
importsqlalchemyfromsqlalchemyimporttextfromsqlalchemy.excimportSQLAlchemyErrorfromgoogle.cloud.alloydb.connectorimportConnectordefsetup_initial_data_sqlalchemy(instance_uri:str,database:str,table_name:str,table_schema:str,user:str,password:str,**connect_kwargs):"""Set up table and insert sample product data using SQLAlchemy.(Revised to handle potential connection closing issue after DDL)Args:instance_uri: AlloyDB instance URIdatabase: Database nametable_name: Name of the table to create and populate.table_schema: SQL string defining the table columns.user: Database userpassword: Database passwordconnect_kwargs: Additional keyword arguments for connector.connect()."""engine=Nonetry:engine=get_alloydb_engine(instance_uri,user,password,database,**connect_kwargs)# Use a single connection for both DDL and DMLwithengine.connect()asconnection:print("Connected to AlloyDB successfully via SQLAlchemy!")# === DDL Operations (Relying on implicit autocommit for DDL) ===# Execute DDL directly on the connection outside an explicit transaction.# SQLAlchemy + Postgres drivers usually handle this correctly.print("Ensuring pgvector extension exists...")connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))print(f"Dropping table{table_name}if exists...")connection.execute(text(f"DROP TABLE IF EXISTS{table_name};"))print(f"Creating table{table_name}...")create_sql=f"CREATE TABLE{table_name}({table_schema});"connection.execute(text(create_sql))print(f"Table{table_name}created.")# === DML Operations (Runs in default transaction started by connect()) ===sample_products_dicts=[# (Sample data dictionaries as defined in the previous version){"id":"lamp-001","title":"Artisan Table Lamp","description":"Hand-crafted ceramic...","price":129.99},{"id":"mirror-001","title":"Floating Wall Mirror","description":"Modern circular mirror...","price":199.99},{"id":"vase-001","title":"Contemporary Ceramic Vase","description":"Minimalist vase...","price":79.99}# Add embedding data if needed]insert_sql=text(f"""INSERT INTO{table_name}(id, title, description, price)VALUES (:id, :title, :description, :price)""")# Add other columns if neededprint(f"Inserting sample data into{table_name}...")# Execute DML within the connection's transactionconnection.execute(insert_sql,sample_products_dicts)# Commit the transaction containing the INSERTsprint("Committing transaction...")connection.commit()print("✓ Sample products inserted successfully")print("Initial data setup completed successfully using SQLAlchemy!")exceptSQLAlchemyErrorase:print(f"An SQLAlchemy error occurred during initial data setup:{e}")# Note: If an error occurs *before* commit, the transaction is usually# rolled back automatically when the 'with engine.connect()' block exits.exceptExceptionase:print(f"An unexpected error occurred during initial data setup:{e}")finally:ifengine:print("Disposing engine pool...")engine.dispose()
Now let's create a pipeline to read the existing data, generate embeddings, and write back:
fromapache_beam.io.jdbcimportReadFromJdbcfromapache_beam.io.jdbcimportWriteToJdbcfromapache_beam.ml.rag.ingestion.alloydbimportColumnSpecsBuilder# Configure database writeralloydb_writer_config=AlloyDBVectorWriterConfig(connection_config=AlloyDBConnectionConfig.with_language_connector(connector_options=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI,ip_type="PUBLIC"),username=DB_USER,password=DB_PASSWORD),table_name=table_name,column_specs=(ColumnSpecsBuilder().with_id_spec().with_embedding_spec()# Add a placeholder value for the title column, because it has a# NOT NULL constraint. Insert with Conflict resolution statements in# Postgres requires all NOT NULL fields to have a value, even if the# value will not be updated (the original title is preserved)..add_custom_column_spec(ColumnSpec.text("title",value_fn=lambdax:"")).build()),conflict_resolution=ConflictResolution(on_conflict_fields="id",action="UPDATE",update_fields=["embedding"]# Update the embedding field))# Create and run pipeline on DirectRunner (local execution)withbeam.Pipeline()asp:# Read existing productsrows=(p|"Read Products">>ReadFromJdbc(table_name=table_name,driver_class_name="org.postgresql.Driver",jdbc_url=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI,ip_type="PUBLIC").to_jdbc_url(),username=DB_USER,password=DB_PASSWORD,query=f"SELECT id, title, description FROM{table_name}",classpath=["org.postgresql:postgresql:42.2.16","com.google.cloud:alloydb-jdbc-connector:1.2.0"]))# Generate and write embeddings_=(rows|"Convert to Chunks">>beam.Map(lambdarow:Chunk(id=row.id,content=Content(text=f"{row.title}:{row.description}")))|"Generate Embeddings">>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(HuggingfaceTextEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2"))|"Write Back to AlloyDB">>VectorDatabaseWriteTransform(alloydb_writer_config))
We started with a table containing product data but no embeddings
Read the existing records using ReadFromJdbc
Converted rows to Chunks, combining title and description for embedding
Generated embeddings using our model
Wrote back to the same table, updating only the embedding field
Preserved all other fields (price, etc.)
This pattern is useful when:
You have an existing product database
You want to add embeddings without disrupting current data
You need to maintain existing schema and relationships
Generate Embeddings with VertexAI Text Embeddings
This section demonstrates how to use use the Vertex AI text-embeddings API to generate text embeddings that use Googles large generative artificial intelligence (AI) models.
Vertex AI models are subject toRate Limits and Quotasand Dataflow automatically retries throttled requests with exponential backoff.
To use the Vertex AI API, we authenticate with Google Cloud.
# Replace <PROJECT_ID> with a valid Google Cloud project ID.PROJECT_ID=''# @param {type:'string'}fromgoogle.colabimportauthauth.authenticate_user(project_id=PROJECT_ID)
Create AlloyDB table with default schema
First we create a table to store our embeddings:
table_name="vertex_product_embeddings"table_schema=f"""id VARCHAR PRIMARY KEY,embedding VECTOR(768) NOT NULL,content text,metadata JSONB"""setup_alloydb_table_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,table_schema,DB_USER,DB_PASSWORD)test_alloydb_connection_sqlalchemy(INSTANCE_URI,DB_NAME,table_name,DB_USER,DB_PASSWORD)
Configure Embedding Handler
Import theVertexAITextEmbeddingshandler, and specify the desiredtextembedding-geckomodel.
importtempfile# Executing on DirectRunner (local execution)withbeam.Pipeline()asp:_=(p|'Create Products'>>beam.Create(PRODUCTS_DATA)|'Convert to Chunks'>>beam.Map(lambdaproduct:Chunk(content=Content(text=f"{product['name']}:{product['description']}"),# The text that will be embeddedid=product['id'],# Use product ID as chunk IDmetadata=product,# Store all product info in metadata))|'Generate Embeddings'>>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(vertexai_embedder)|'Write to AlloyDB'>>VectorDatabaseWriteTransform(AlloyDBVectorWriterConfig(connection_config=AlloyDBConnectionConfig.with_language_connector(connector_options=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI,ip_type="PUBLIC"),username=DB_USER,password=DB_PASSWORD),table_name=table_name)))
This section demonstrates how to build a real-time embedding pipeline that continuously processes product updates and maintains fresh embeddings in AlloyDB. This approach is ideal data that changes frequently.
This example runs on Dataflow because streaming with DirectRunner and writing via JDBC is not supported.
Authenticate with Google Cloud
To use the PubSub, we authenticate with Google Cloud.
# Replace <PROJECT_ID> with a valid Google Cloud project ID.PROJECT_ID=''# @param {type:'string'}fromgoogle.colabimportauthauth.authenticate_user(project_id=PROJECT_ID)
Setting Up PubSub Resources
First, let's set up the necessary PubSub topics and subscriptions:
A gcs bucket for staging DataFlow files. Replace<BUCKET_NAME>: the name of a valid Google Cloud Storage bucket. Don't include a gs:// prefix or trailing slashes
Optionally set the Google Cloud region that you want to run Dataflow in. Replace<REGION>with the desired location
AlloyDBprivate IPaddress to which the Datflow worker VM's have access. There are multiple ways toconnectto your AlloyDB instance from Datflow, including
fromapache_beam.options.pipeline_optionsimportPipelineOptions,StandardOptions,SetupOptions,GoogleCloudOptions,WorkerOptionsoptions=PipelineOptions()options.view_as(StandardOptions).streaming=True# Provide required pipeline options for the Dataflow Runner.options.view_as(StandardOptions).runner="DataflowRunner"# Set the Google Cloud region that you want to run Dataflow in.REGION='us-central1'# @param {type:'string'}options.view_as(GoogleCloudOptions).region=REGION# The VPC network to run your Dataflow job in.# Should be the same as the AlloyDB network if using Private services access.NETWORK='default'# @param {type:'string'}options.view_as(WorkerOptions).network=NETWORK# The VPC subnetwork to run your Dataflow job in.# Should be the same as the AlloyDB network if using Private services access.SUBNETWORK=''# @param {type:'string'}options.view_as(WorkerOptions).subnetwork=f"regions/{REGION}/subnetworks/{SUBNETWORK}"options.view_as(SetupOptions).pickle_library="cloudpickle"options.view_as(GoogleCloudOptions).project=PROJECT_IDBUCKET_NAME=''# @param {type:'string'}dataflow_gcs_location="gs://%s/dataflow"%BUCKET_NAME# The Dataflow staging location. This location is used to stage the Dataflow pipeline and the SDK binary.options.view_as(GoogleCloudOptions).staging_location='%s/staging'%dataflow_gcs_location# The Dataflow temp location. This location is used to store temporary files or intermediate results before outputting to the sink.options.view_as(GoogleCloudOptions).temp_location='%s/temp'%dataflow_gcs_locationimportrandomoptions.view_as(GoogleCloudOptions).job_name=f"alloydb-streaming-embedding-ingest{random.randint(0,1000)}"# options.view_as(SetupOptions).save_main_session = Trueoptions.view_as(SetupOptions).requirements_file="./requirements.txt"
Provide additional Python dependencies to be installed on Worker VM's
We are making use of the HuggingFacesentence-transformerspackage to generate embeddings. Since this package is not installed on Worker VM's by default, we create a requirements.txt file with the additional dependencies to be installed on worker VM's.
Windowing: Groups messages into 10-second windows for batch processing
Transformation: Converts JSON messages to Chunk objects for embedding
ML Processing: Generates embeddings using HuggingFace models
Sink: Writes results to AlloyDB with conflict resolution
importapache_beamasbeamimporttempfileimportjsonfromapache_beam.ml.transforms.baseimportMLTransformfromapache_beam.ml.rag.typesimportChunk,Contentfromapache_beam.ml.rag.ingestion.baseimportVectorDatabaseWriteTransformfromapache_beam.ml.rag.ingestion.alloydbimportAlloyDBVectorWriterConfig,AlloyDBConnectionConfig,ConflictResolutionfromapache_beam.ml.rag.embeddings.huggingfaceimportHuggingfaceTextEmbeddingsfromapache_beam.transforms.windowimportFixedWindowsdefparse_message(message):#Parse a message containing product data.product_json=json.loads(message.decode('utf-8'))returnChunk(content=Content(text=f"{product_json.get('name','')}:{product_json.get('description','')}"),id=product_json.get('id',''),metadata=product_json)pipeline=beam.Pipeline(options=options)# Streaming pipeline_=(pipeline|"Read from PubSub">>beam.io.ReadFromPubSub(topic=f"projects/{PROJECT_ID}/topics/{TOPIC}")|"Window">>beam.WindowInto(FixedWindows(10))|"Parse Messages">>beam.Map(parse_message)|"Generate Embeddings">>MLTransform(write_artifact_location=tempfile.mkdtemp()).with_transform(HuggingfaceTextEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2"))|"Write to AlloyDB">>VectorDatabaseWriteTransform(AlloyDBVectorWriterConfig(connection_config=AlloyDBConnectionConfig.with_language_connector(connector_options=AlloyDBLanguageConnectorConfig(database_name=DB_NAME,instance_name=INSTANCE_URI),username=DB_USER,password=DB_PASSWORD),table_name=table_name,conflict_resolution=ConflictResolution(on_conflict_fields="id",action="UPDATE",update_fields=["embedding","content","metadata"]))))
Create Publisher Subprocess
The publisher simulates real-time product updates by:
Publishing sample product data to the PubSub topic every 5 seconds
Modifying prices and descriptions to represent changes
Adding timestamps to track update times
Running for 25 minutes in the background while our pipeline processes the data
Define PubSub publisher function
importthreadingimporttimeimportjsonimportloggingfromgoogle.cloudimportpubsub_v1importdatetimeimportosimportsyslog_file=os.path.join(os.getcwd(),"publisher_log.txt")print(f"Log file will be created at:{log_file}")defpublisher_function(project_id,topic):"""Function that publishes sample product updates to a PubSub topic.This function runs in a separate thread and continuously publishesmessages to simulate real-time product updates."""time.sleep(300)thread_id=threading.current_thread().identprocess_log_file=os.path.join(os.getcwd(),f"publisher_{thread_id}.log")file_handler=logging.FileHandler(process_log_file)file_handler.setFormatter(logging.Formatter('%(asctime)s- ThreadID:%(thread)d-%(levelname)s-%(message)s'))logger=logging.getLogger(f"worker.{thread_id}")logger.setLevel(logging.INFO)logger.addHandler(file_handler)logger.info(f"Publisher thread started with ID:{thread_id}")file_handler.flush()publisher=pubsub_v1.PublisherClient()topic_path=publisher.topic_path(project_id,topic)logger.info("Starting to publish messages...")file_handler.flush()foriinrange(300):message_index=i%len(PRODUCTS_DATA)message=PRODUCTS_DATA[message_index].copy()dynamic_factor=1.05+(0.1*((i%20)/20))message["price"]=round(message["price"]*dynamic_factor,2)message["description"]=f"PRICE UPDATE (factor:{dynamic_factor:.3f}): "+message["description"]message["published_at"]=datetime.datetime.now().isoformat()data=json.dumps(message).encode('utf-8')publish_future=publisher.publish(topic_path,data)try:logger.info(f"Publishing message{message}")file_handler.flush()message_id=publish_future.result()logger.info(f"Published message{i+1}:{message['id']}(Message ID:{message_id})")file_handler.flush()exceptExceptionase:logger.error(f"Error publishing message:{e}")file_handler.flush()time.sleep(5)logger.info("Finished publishing all messages.")file_handler.flush()
Start publishing to PuBSub in background
# Launch publisher in a separate threadprint("Starting publisher thread in 5 minutes...")publisher_thread=threading.Thread(target=publisher_function,args=(PROJECT_ID,TOPIC),daemon=True)publisher_thread.start()print(f"Publisher thread started with ID:{publisher_thread.ident}")print(f"Publisher thread logging to file: publisher_{publisher_thread.ident}.log")
Run Pipeline on Dataflow
We launch the pipeline to run remotely on Dataflow. Once the job is launched, you can monitor its progress in the Google Cloud Console:
Click on the job named "alloydb-streaming-embedding-ingest"
View detailed execution graphs, logs, and metrics
What to Expect
After running this pipeline, you should see:
Continuous updates to product embeddings in the AlloyDB table
Price and description changes reflected in the metadata
New embeddings generated for updated product descriptions
Timestamps showing when each record was last modified
# Run pipelinepipeline.run().wait_until_finish()
Verify data
# Verify the resultsprint("\nAfter embedding generation:")verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI,database=DB_NAME,table_name=table_name,user=DB_USER,password=DB_PASSWORD)
Except as otherwise noted, the content of this page is licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-05-14 UTC.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-05-14 UTC."],[],[],null,[]]