Integrate with OpenLineage

This document explains how to integrate OpenLineage with Knowledge Catalog (formerly Dataplex Universal Catalog) to import and visualize data lineage from external systems. By acting as an OpenLineage consumer using the ProcessOpenLineageRunEvent REST API, Knowledge Catalog lets you unify custom pipeline lineage alongside built-in lineage from Google Cloud services.

Overview

OpenLineage is an open platform for collecting and analyzing data lineage information. Using an open standard for lineage data, OpenLineage captures lineage events from data pipeline components which use an OpenLineage API to report on runs, jobs, and datasets.

Through the Data Lineage API, you can import OpenLineage events to display in the Knowledge Catalog web interface alongside lineage information from Google Cloud services, such as BigQuery, Managed Service for Apache Airflow, Cloud Data Fusion, and Managed Service for Apache Spark.

To import OpenLineage events that use the OpenLineage specification , use the ProcessOpenLineageRunEvent REST API method, and map OpenLineage facets to Data Lineage API attributes.

OpenLineage integration limitations

  • Supported versions:The Data Lineage API supports OpenLineage major version 1.

  • API actions:The Data Lineage API endpoint ProcessOpenLineageRunEvent only acts as a consumerof OpenLineage messages, not a producer. The API lets you send lineage information generated by any OpenLineage-compliant tool or system into Knowledge Catalog. Some Google Cloud services, such as Managed Service for Apache Spark and Managed Airflow , include built-in OpenLineage producersthat can send events to this endpoint, automating lineage capture from those services.

  • Unsupported features:The Data Lineage API doesn't support the following:

    • Any subsequent OpenLineage release with message format changes
    • DatasetEvent
    • JobEvent
  • Message size:Maximum size of a single message is 5 MB.

  • Name length:Length of each Fully Qualified Name in inputs and outputs is limited to 4000 characters.

  • Link limits: Links are grouped by events, with a maximum of 100 links per event. The maximum aggregate number of table-level links is 1000. If a message contains more than 1500 column-level links, the column-level information is skipped.

  • Graph scope:Knowledge Catalog displays a lineage graph for each job run, showing the inputs and outputs of lineage events. It doesn't support lower-level processes such as Spark stages.

OpenLineage facet attribute mapping

For information about OpenLineage mapping, see OpenLineage mapping .

Import an OpenLineage event

If you haven't yet set up OpenLineage, see Getting started .

To import an OpenLineage event into Knowledge Catalog, call the API method ProcessOpenLineageRunEvent .

C#

C#

Before trying this sample, follow the C# setup instructions in the Knowledge Catalog quickstart using client libraries . For more information, see the Knowledge Catalog C# API reference documentation .

To authenticate to Knowledge Catalog, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Cloud.DataCatalog.Lineage.V1 
 
 ; 
 using 
  
  Google.Protobuf.WellKnownTypes 
 
 ; 
 public 
  
 sealed 
  
 partial 
  
 class 
  
 GeneratedLineageClientSnippets 
 { 
  
 /// <summary>Snippet for ProcessOpenLineageRunEvent</summary> 
  
 /// <remarks> 
  
 /// This snippet has been automatically generated and should be regarded as a code template only. 
  
 /// It will require modifications to work: 
  
 /// - It may require correct/in-range values for request initialization. 
  
 /// - It may require specifying regional endpoints when creating the service client as shown in 
  
 ///   https://cloud.google.com/dotnet/docs/reference/help/client-configuration#endpoint. 
  
 /// </remarks> 
  
 public 
  
 void 
  
 ProcessOpenLineageRunEventRequestObject 
 () 
  
 { 
  
 // Create client 
  
  LineageClient 
 
  
 lineageClient 
  
 = 
  
  LineageClient 
 
 . 
  Create 
 
 (); 
  
 // Initialize request argument(s) 
  
  ProcessOpenLineageRunEventRequest 
 
  
 request 
  
 = 
  
 new 
  
  ProcessOpenLineageRunEventRequest 
 
  
 { 
  
 Parent 
  
 = 
  
 "" 
 , 
  
 OpenLineage 
  
 = 
  
 new 
  
  Struct 
 
 (), 
  
 }; 
  
 // Make the request 
  
  ProcessOpenLineageRunEventResponse 
 
  
 response 
  
 = 
  
 lineageClient 
 . 
  ProcessOpenLineageRunEvent 
 
 ( 
 request 
 ); 
  
 } 
 } 
 

Go

Go

Before trying this sample, follow the Go setup instructions in the Knowledge Catalog quickstart using client libraries . For more information, see the Knowledge Catalog Go API reference documentation .

To authenticate to Knowledge Catalog, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  //go:build examples 
 package 
  
 main 
 import 
  
 ( 
  
 "context" 
  
 lineage 
  
 "cloud.google.com/go/datacatalog/lineage/apiv1" 
  
 lineagepb 
  
 "cloud.google.com/go/datacatalog/lineage/apiv1/lineagepb" 
 ) 
 func 
  
 main 
 () 
  
 { 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 // This snippet has been automatically generated and should be regarded as a code template only. 
  
 // It will require modifications to work: 
  
 // - It may require correct/in-range values for request initialization. 
  
 // - It may require specifying regional endpoints when creating the service client as shown in: 
  
 //   https://pkg.go.dev/cloud.google.com/go#hdr-Client_Options 
  
 c 
 , 
  
 err 
  
 := 
  
 lineage 
 . 
  NewClient 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 defer 
  
 c 
 . 
  Close 
 
 () 
  
 req 
  
 := 
  
& lineagepb 
 . 
 ProcessOpenLineageRunEventRequest 
 { 
  
 // TODO: Fill request struct fields. 
  
 // See https://pkg.go.dev/cloud.google.com/go/datacatalog/lineage/apiv1/lineagepb#ProcessOpenLineageRunEventRequest. 
  
 } 
  
 resp 
 , 
  
 err 
  
 := 
  
 c 
 . 
 ProcessOpenLineageRunEvent 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 // TODO: Handle error. 
  
 } 
  
 // TODO: Use resp. 
  
 _ 
  
 = 
  
 resp 
 } 
 

Java

Java

Before trying this sample, follow the Java setup instructions in the Knowledge Catalog quickstart using client libraries . For more information, see the Knowledge Catalog Java API reference documentation .

To authenticate to Knowledge Catalog, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.datacatalog.lineage.v1. LineageClient 
 
 ; 
 import 
  
 com.google.cloud.datacatalog.lineage.v1. ProcessOpenLineageRunEventRequest 
 
 ; 
 import 
  
 com.google.cloud.datacatalog.lineage.v1. ProcessOpenLineageRunEventResponse 
 
 ; 
 import 
  
 com.google.protobuf. Struct 
 
 ; 
 public 
  
 class 
 SyncProcessOpenLineageRunEvent 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 syncProcessOpenLineageRunEvent 
 (); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 syncProcessOpenLineageRunEvent 
 () 
  
 throws 
  
 Exception 
  
 { 
  
 // This snippet has been automatically generated and should be regarded as a code template only. 
  
 // It will require modifications to work: 
  
 // - It may require correct/in-range values for request initialization. 
  
 // - It may require specifying regional endpoints when creating the service client as shown in 
  
 // https://cloud.google.com/java/docs/setup#configure_endpoints_for_the_client_library 
  
 try 
  
 ( 
  LineageClient 
 
  
 lineageClient 
  
 = 
  
  LineageClient 
 
 . 
 create 
 ()) 
  
 { 
  
  ProcessOpenLineageRunEventRequest 
 
  
 request 
  
 = 
  
  ProcessOpenLineageRunEventRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
 "parent-995424086" 
 ) 
  
 . 
  setOpenLineage 
 
 ( 
  Struct 
 
 . 
 newBuilder 
 (). 
 build 
 ()) 
  
 . 
 setRequestId 
 ( 
 "requestId693933066" 
 ) 
  
 . 
 build 
 (); 
  
  ProcessOpenLineageRunEventResponse 
 
  
 response 
  
 = 
  
 lineageClient 
 . 
 processOpenLineageRunEvent 
 ( 
 request 
 ); 
  
 } 
  
 } 
 } 
 

Python

Python

Before trying this sample, follow the Python setup instructions in the Knowledge Catalog quickstart using client libraries . For more information, see the Knowledge Catalog Python API reference documentation .

To authenticate to Knowledge Catalog, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # This snippet has been automatically generated and should be regarded as a 
 # code template only. 
 # It will require modifications to work: 
 # - It may require correct/in-range values for request initialization. 
 # - It may require specifying regional endpoints when creating the service 
 #   client as shown in: 
 #   https://googleapis.dev/python/google-api-core/latest/client_options.html 
 from 
  
 google.cloud 
  
 import 
  datacatalog_lineage_v1 
 
 def 
  
 sample_process_open_lineage_run_event 
 (): 
 # Create a client 
 client 
 = 
  datacatalog_lineage_v1 
 
 . 
  LineageClient 
 
 () 
 # Initialize request argument(s) 
 request 
 = 
  datacatalog_lineage_v1 
 
 . 
  ProcessOpenLineageRunEventRequest 
 
 ( 
 parent 
 = 
 "parent_value" 
 , 
 ) 
 # Make the request 
 response 
 = 
 client 
 . 
  process_open_lineage_run_event 
 
 ( 
 request 
 = 
 request 
 ) 
 # Handle the response 
 print 
 ( 
 response 
 ) 
 

Ruby

Ruby

Before trying this sample, follow the Ruby setup instructions in the Knowledge Catalog quickstart using client libraries . For more information, see the Knowledge Catalog Ruby API reference documentation .

To authenticate to Knowledge Catalog, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  require 
  
 "google/cloud/data_catalog/lineage/v1" 
 ## 
 # Snippet for the process_open_lineage_run_event call in the Lineage service 
 # 
 # This snippet has been automatically generated and should be regarded as a code 
 # template only. It will require modifications to work: 
 # - It may require correct/in-range values for request initialization. 
 # - It may require specifying regional endpoints when creating the service 
 # client as shown in https://cloud.google.com/ruby/docs/reference. 
 # 
 # This is an auto-generated example demonstrating basic usage of 
 # Google::Cloud::DataCatalog::Lineage::V1::Lineage::Client#process_open_lineage_run_event. 
 # 
 def 
  
 process_open_lineage_run_event 
  
 # Create a client object. The client can be reused for multiple calls. 
  
 client 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
 DataCatalog 
 :: 
 Lineage 
 :: 
 V1 
 :: 
 Lineage 
 :: 
 Client 
 . 
 new 
  
 # Create a request. To set request fields, pass in keyword arguments. 
  
 request 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
 DataCatalog 
 :: 
 Lineage 
 :: 
 V1 
 :: 
 ProcessOpenLineageRunEventRequest 
 . 
 new 
  
 # Call the process_open_lineage_run_event method. 
  
 result 
  
 = 
  
 client 
 . 
 process_open_lineage_run_event 
  
 request 
  
 # The returned object is of type Google::Cloud::DataCatalog::Lineage::V1::ProcessOpenLineageRunEventResponse. 
  
 p 
  
 result 
 end 
 

REST

To import an OpenLineage event, use the processOpenLineageRunEvent method .

Before using any of the request data, make the following replacements:

  • PROJECT_ID : your Google Cloud project ID.
  • LOCATION_ID : the Google Cloud location, such as us-central1 .

HTTP method and URL:

POST https://datalineage.googleapis.com/v1/projects/ PROJECT_ID 
/locations/ LOCATION_ID 
:processOpenLineageRunEvent

Request JSON body:

{
  "eventTime": "2023-04-04T13:21:16.098Z",
  "eventType": "COMPLETE",
  "inputs": [
    {
      "name": "somename",
      "namespace": "customnamespace"
    }
  ],
  "job": {
    "name": "somename",
    "namespace": "customnamespace"
  },
  "outputs": [
    {
      "name": "somename",
      "namespace": "customnamespace"
    }
  ],
  "producer": "someproducer",
  "run": {
    "runId": "somerunid"
  },
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
}

To send your request, expand one of these options:

You should receive a JSON response similar to the following:

{
  "process": "projects/my-project/locations/us-central1/processes/my-process",
  "run": "projects/my-project/locations/us-central1/processes/my-process/runs/my-run",
  "lineageEvents": [
    "projects/my-project/locations/us-central1/processes/my-process/runs/my-run/lineageEvents/my-lineage-event"
  ]
}

Tools for sending OpenLineage messages

To simplify sending events to the Data Lineage API, you can use various tools and libraries:

  • Google Cloud Java Producer Library:Google provides an open-source Java library to help construct and send OpenLineage events to the Data Lineage API. For more information, see the blog post Producer java library for Data Lineage is now open source . The library is available on GitHub and Maven .
  • OpenLineage GCP Transport:For Java-based OpenLineage producers, a dedicated GcpLineage Transport is available. It simplifies integration with Data Lineage API, by minimizing the code needed for sending events to Data Lineage API. The GcpLineageTransport can be configured as the event sink for any existing OpenLineage producer such as Airflow, Spark, and Flink. For more information and examples, see GcpLineage .

Analyze information from OpenLineage

To analyze the imported OpenLineage events, see View lineage graphs in Knowledge Catalog UI .

Stored OpenLineage facet data

The Data Lineage API doesn't store all facets data from the OpenLineage messages. The Data Lineage API stores the following facet fields:

  • spark_version
    • openlineage-spark-version
    • spark-version
  • all spark.logicalPlan.*
  • environment-properties (custom Google Cloud lineage facet)
    • origin.sourcetype and origin.name
    • spark.app.id
    • spark.app.name
    • spark.batch.id
    • spark.batch.uuid
    • spark.cluster.name
    • spark.cluster.region
    • spark.job.id
    • spark.job.uuid
    • spark.project.id
    • spark.query.node.name
    • spark.session.id
    • spark.session.uuid

The Data Lineage API stores the following information:

  • eventTime
  • run.runId
  • job.namespace
  • job.name

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: