Read from databases to Dataflow

To read from relational databases to Dataflow, use the managed I/O connector .

Dependencies

Add the following dependencies to your project:

Java

 <dependency>  
<groupId>org.apache.beam</groupId>  
<artifactId>beam-sdks-java-managed</artifactId>  
<version>${beam.version}</version>
</dependency>

<dependency>  
<groupId>org.apache.beam</groupId>  
<artifactId>beam-sdks-java-io-jdbc</artifactId>  
<version>${beam.version}</version>
</dependency> 

Example

The following example reads from a PostgreSQL database and writes the data to text files. While this example uses PostgreSQL, configuring other supported databases is similar.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.common.collect.ImmutableMap 
 ; 
 import 
  
 org.apache.beam.sdk.Pipeline 
 ; 
 import 
  
 org.apache.beam.sdk.PipelineResult 
 ; 
 import 
  
 org.apache.beam.sdk.io.TextIO 
 ; 
 import 
  
 org.apache.beam.sdk.managed.Managed 
 ; 
 import 
  
 org.apache.beam.sdk.options.Description 
 ; 
 import 
  
 org.apache.beam.sdk.options.PipelineOptions 
 ; 
 import 
  
 org.apache.beam.sdk.options.PipelineOptionsFactory 
 ; 
 import 
  
 org.apache.beam.sdk.transforms.MapElements 
 ; 
 import 
  
 org.apache.beam.sdk.values.TypeDescriptors 
 ; 
 public 
  
 class 
 PostgresRead 
  
 { 
  
 public 
  
 interface 
 Options 
  
 extends 
  
 PipelineOptions 
  
 { 
  
 @Description 
 ( 
 "The JDBC URL of the PostgreSQL database to read from." 
 ) 
  
 String 
  
 getJdbcUrl 
 (); 
  
 void 
  
 setJdbcUrl 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "The PostgresSQL table to read from." 
 ) 
  
 String 
  
 getTable 
 (); 
  
 void 
  
 setTable 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "The username for the PostgreSQL database." 
 ) 
  
 String 
  
 getUsername 
 (); 
  
 void 
  
 setUsername 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
 "The password for the PostgreSQL database." 
 ) 
  
 String 
  
 getPassword 
 (); 
  
 void 
  
 setPassword 
 ( 
 String 
  
 value 
 ); 
  
 @Description 
 ( 
  
 "The path to write the output file. Can be a local file path, " 
  
 + 
  
 "a GCS path, or a path to any other supported file systems." 
 ) 
  
 String 
  
 getOutputPath 
 (); 
  
 void 
  
 setOutputPath 
 ( 
 String 
  
 value 
 ); 
  
 } 
  
 public 
  
 static 
  
 PipelineResult 
 . 
 State 
  
 main 
 ( 
 String 
 [] 
  
 args 
 ) 
  
 { 
  
 // Parse the pipeline options passed into the application. Example: 
  
 //   --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE 
  
 //   --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE 
  
 // For more information, see 
  
 // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options 
  
 var 
  
 options 
  
 = 
  
 PipelineOptionsFactory 
 . 
 fromArgs 
 ( 
 args 
 ). 
 withValidation 
 (). 
 as 
 ( 
 Options 
 . 
 class 
 ); 
  
 Pipeline 
  
 pipeline 
  
 = 
  
 createPipeline 
 ( 
 options 
 ); 
  
 return 
  
 pipeline 
 . 
 run 
 (). 
 waitUntilFinish 
 (); 
  
 } 
  
 public 
  
 static 
  
 Pipeline 
  
 createPipeline 
 ( 
 Options 
  
 options 
 ) 
  
 { 
  
 // Create configuration parameters for the Managed I/O transform. 
  
 ImmutableMap<String 
 , 
  
 Object 
>  
 config 
  
 = 
  
 ImmutableMap 
 . 
< String 
 , 
  
 Object>builder 
 () 
  
 . 
 put 
 ( 
 "jdbc_url" 
 , 
  
 options 
 . 
 getJdbcUrl 
 ()) 
  
 . 
 put 
 ( 
 "location" 
 , 
  
 options 
 . 
 getTable 
 ()) 
  
 . 
 put 
 ( 
 "username" 
 , 
  
 options 
 . 
 getUsername 
 ()) 
  
 . 
 put 
 ( 
 "password" 
 , 
  
 options 
 . 
 getPassword 
 ()) 
  
 . 
 build 
 (); 
  
 // Build the pipeline. 
  
 var 
  
 pipeline 
  
 = 
  
 Pipeline 
 . 
 create 
 ( 
 options 
 ); 
  
 pipeline 
  
 // Read data from a Postgres database using Managed I/O. 
  
 . 
 apply 
 ( 
 Managed 
 . 
 read 
 ( 
 Managed 
 . 
 POSTGRES 
 ). 
 withConfig 
 ( 
 config 
 )) 
  
 . 
 getSinglePCollection 
 () 
  
 // Convert each row to a string. 
  
 . 
 apply 
 ( 
  
 MapElements 
 . 
 into 
 ( 
 TypeDescriptors 
 . 
 strings 
 ()) 
  
 . 
 via 
 (( 
 row 
  
 - 
>  
 String 
 . 
 format 
 ( 
 "%d,%s" 
 , 
  
 row 
 . 
 getInt32 
 ( 
 "id" 
 ), 
  
 row 
 . 
 getString 
 ( 
 "name" 
 ))))) 
  
 // Write strings to a text file. 
  
 . 
 apply 
 ( 
 TextIO 
 . 
 write 
 (). 
 to 
 ( 
 options 
 . 
 getOutputPath 
 ()). 
 withSuffix 
 ( 
 ".txt" 
 ). 
 withNumShards 
 ( 
 1 
 )); 
  
 return 
  
 pipeline 
 ; 
  
 } 
 } 
 

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: