This page shows you how to write, deploy, and trigger a pipeline run using an Event-Driven Cloud Function with a Cloud Pub/Sub trigger . Follow these steps:
-
Define an ML pipeline using the Kubeflow Pipelines (KFP) SDK and compile it into a YAML file.
-
Upload the compiled pipeline definition to a Cloud Storage bucket.
-
Use Cloud Run functions to create, configure, and deploy a function that's triggered by a new or existing Pub/Sub topic.
Define and compile a pipeline
Using Kubeflow Pipelines SDK, build a scheduled pipeline and compile it into a YAML file.
Sample hello-world-scheduled-pipeline
:
from
kfp
import
compiler
from
kfp
import
dsl
# A simple component that prints and returns a greeting string
@dsl
.
component
def
hello_world
(
message
:
str
)
-
> str
:
greeting_str
=
f
'Hello,
{
message
}
'
print
(
greeting_str
)
return
greeting_str
# A simple pipeline that contains a single hello_world task
@dsl
.
pipeline
(
name
=
'hello-world-scheduled-pipeline'
)
def
hello_world_scheduled_pipeline
(
greet_name
:
str
):
hello_world_task
=
hello_world
(
greet_name
)
# Compile the pipeline and generate a YAML file
compiler
.
Compiler
()
.
compile
(
pipeline_func
=
hello_world_scheduled_pipeline
,
package_path
=
'hello_world_scheduled_pipeline.yaml'
)
Upload compiled pipeline YAML to Cloud Storage bucket
-
Open the Cloud Storage browser in the Google Cloud console.
-
Click the Cloud Storage bucket you created when you configured your project .
-
Using either an existing folder or a new folder, upload your compiled pipeline YAML (in this example
hello_world_scheduled_pipeline.yaml) to the selected folder. -
Click the uploaded YAML file to access the details. Copy the gsutil URIfor later use.
Create a Cloud Run functions with a Pub/Sub trigger
-
Visit the Cloud Run functions page in the console.
-
Click the Create functionbutton.
-
In the Basicssection, give your function a name (for example
my-scheduled-pipeline-function). -
In the Triggersection, select Cloud Pub/Subas the Trigger type.

-
In the Select a Cloud Pub/Sub topiclist, click Create a topic.
-
In the Create a topicbox, give your new topic a name (for example
my-scheduled-pipeline-topic), and select Create topic. -
Leave all other fields as default and click Saveto save the Trigger section configuration.
-
Leave all other fields as default and click Nextto proceed to the Code section.
-
Under Runtime, select Python 3.7.
-
In Entrypoint, input "subscribe" (the example code entry point function name).
-
Under Source code, select Inline Editorif it's not already selected.
-
In the
main.pyfile, add in the following code:import base64 import json from google.cloud import aiplatform PROJECT_ID = ' your-project-id ' # <---CHANGE THIS REGION = ' your-region ' # <---CHANGE THIS PIPELINE_ROOT = ' your-cloud-storage-pipeline-root ' # <---CHANGE THIS def subscribe ( event , context ): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64 . b64decode ( event [ 'data' ]) . decode ( 'utf-8' ) # parse payload string into JSON object payload_json = json . loads ( payload_message ) # trigger pipeline run with payload trigger_pipeline_run ( payload_json ) def trigger_pipeline_run ( payload_json ): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json [ 'pipeline_spec_uri' ] parameter_values = payload_json [ 'parameter_values' ] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform . init ( project = PROJECT_ID , location = REGION , ) job = aiplatform . PipelineJob ( display_name = 'hello-world-pipeline-cloud-function-invocation' , template_path = pipeline_spec_uri , pipeline_root = PIPELINE_ROOT , enable_caching = False , parameter_values = parameter_values ) # Submit the PipelineJob job . submit ()Replace the following:
- PROJECT_ID : The Google Cloud project that this pipeline runs in.
- REGION : The region that this pipeline runs in.
- PIPELINE_ROOT : Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored in the pipeline root.
-
In the
requirements.txtfile, replace the contents with the following package requirements:google-api-python-client>=1.7.8,<2 google-cloud-aiplatform -
Click deployto deploy the Function.
What's next
- Learn more about Google Cloud Pub/Sub .
- Visualize and analyze pipeline results .
- Learn how to create triggers in Cloud Runfrom Pub/Sub events .
- To view code samples for using Pub/Sub, refer to the Google Cloud sample browser .

