Read from Apache Iceberg to Dataflow

To read from Apache Iceberg to Dataflow, use the managed I/O connector .

Managed I/O supports the following capabilities for Apache Iceberg:

Catalogs
  • Hadoop
  • Hive
  • REST-based catalogs
  • BigQuery metastore (requires Apache Beam SDK 2.62.0 or later if not using Runner v2)
Read capabilities
Batch read
Write capabilities

For BigQuery tables for Apache Iceberg , use the BigQueryIO connector with BigQuery Storage API. The table must already exist; dynamic table creation is not supported.

Dependencies

Add the following dependencies to your project:

Java

 <dependency>  
<groupId>org.apache.beam</groupId>  
<artifactId>beam-sdks-java-managed</artifactId>  
<version>${beam.version}</version>
</dependency>

<dependency>  
<groupId>org.apache.beam</groupId>  
<artifactId>beam-sdks-java-io-iceberg</artifactId>  
<version>${beam.version}</version>
</dependency> 

Example

The following example reads from an Apache Iceberg table and writes the data to text files.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.common.collect.ImmutableMap 
 ; 
 import 
  
 java.util.Map 
 ; 
 import 
  
 org.apache.beam.sdk.Pipeline 
 ; 
 import 
  
 org.apache.beam.sdk.io.TextIO 
 ; 
 import 
  
 org.apache.beam.sdk.managed.Managed 
 ; 
 import 
  
 org.apache.beam.sdk.options.Description 
 ; 
 import 
  
 org.apache.beam.sdk.options.PipelineOptions 
 ; 
 import 
  
 org.apache.beam.sdk.options.PipelineOptionsFactory 
 ; 
 import 
  
 org.apache.beam.sdk.transforms.MapElements 
 ; 
 import 
  
 org.apache.beam.sdk.values.PCollectionRowTuple 
 ; 
 import 
  
 org.apache.beam.sdk.values.TypeDescriptors 
 ; 
 public 
  
 class 
 ApacheIcebergRead 
  
 { 
  
 static 
  
 final 
  
 String 
  
 CATALOG_TYPE 
  
 = 
  
 "hadoop" 
 ; 
  
 public 
  
 interface 
 Options 
  
 extends 
  
 PipelineOptions 
  
 { 
  
 @Description 
 ( 
 "The URI of the Apache Iceberg warehouse location" 
 ) 
  
 String 
  
 getWarehouseLocation 
 (); 
  
 void 
  
 setWarehouseLocation 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "Path to write the output file" 
 ) 
  
 String 
  
 getOutputPath 
 (); 
  
 void 
  
 setOutputPath 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "The name of the Apache Iceberg catalog" 
 ) 
  
 String 
  
 getCatalogName 
 (); 
  
 void 
  
 setCatalogName 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "The name of the table to write to" 
 ) 
  
 String 
  
 getTableName 
 (); 
  
 void 
  
 setTableName 
 ( 
 String 
  
 value 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 { 
  
 // Parse the pipeline options passed into the application. Example: 
  
 //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \ 
  
 //   --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE 
  
 // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options 
  
 Options 
  
 options 
  
 = 
  
 PipelineOptionsFactory 
 . 
 fromArgs 
 ( 
 args 
 ). 
 withValidation 
 (). 
 as 
 ( 
 Options 
 . 
 class 
 ); 
  
 Pipeline 
  
 pipeline 
  
 = 
  
 Pipeline 
 . 
 create 
 ( 
 options 
 ); 
  
 // Configure the Iceberg source I/O 
  
 Map 
  
 catalogConfig 
  
 = 
  
 ImmutableMap 
 . 
< String 
 , 
  
 Object>builder 
 () 
  
 . 
 put 
 ( 
 "warehouse" 
 , 
  
 options 
 . 
 getWarehouseLocation 
 ()) 
  
 . 
 put 
 ( 
 "type" 
 , 
  
 CATALOG_TYPE 
 ) 
  
 . 
 build 
 (); 
  
 ImmutableMap<String 
 , 
  
 Object 
>  
 config 
  
 = 
  
 ImmutableMap 
 . 
< String 
 , 
  
 Object>builder 
 () 
  
 . 
 put 
 ( 
 "table" 
 , 
  
 options 
 . 
 getTableName 
 ()) 
  
 . 
 put 
 ( 
 "catalog_name" 
 , 
  
 options 
 . 
 getCatalogName 
 ()) 
  
 . 
 put 
 ( 
 "catalog_properties" 
 , 
  
 catalogConfig 
 ) 
  
 . 
 build 
 (); 
  
 // Build the pipeline. 
  
 pipeline 
 . 
 apply 
 ( 
 Managed 
 . 
 read 
 ( 
 Managed 
 . 
 ICEBERG 
 ). 
 withConfig 
 ( 
 config 
 )) 
  
 . 
 getSinglePCollection 
 () 
  
 // Format each record as a string with the format 'id:name'. 
  
 . 
 apply 
 ( 
 MapElements 
  
 . 
 into 
 ( 
 TypeDescriptors 
 . 
 strings 
 ()) 
  
 . 
 via 
 (( 
 row 
  
 - 
>  
 { 
  
 return 
  
 String 
 . 
 format 
 ( 
 "%d:%s" 
 , 
  
 row 
 . 
 getInt64 
 ( 
 "id" 
 ), 
  
 row 
 . 
 getString 
 ( 
 "name" 
 )); 
  
 }))) 
  
 // Write to a text file. 
  
 . 
 apply 
 ( 
  
 TextIO 
 . 
 write 
 () 
  
 . 
 to 
 ( 
 options 
 . 
 getOutputPath 
 ()) 
  
 . 
 withNumShards 
 ( 
 1 
 ) 
  
 . 
 withSuffix 
 ( 
 ".txt" 
 )); 
  
 pipeline 
 . 
 run 
 (). 
 waitUntilFinish 
 (); 
  
 } 
 } 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: