Append buffered records

Use the JSON stream writer to append records in buffered mode

Code sample

Java

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Java API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteClient 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. CreateWriteStreamRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. FinalizeWriteStreamRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. FlushRowsRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. FlushRowsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. JsonStreamWriter 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. TableName 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. WriteStream 
 
 ; 
 import 
  
 com.google.protobuf. Descriptors 
. DescriptorValidationException 
 
 ; 
 import 
  
 com.google.protobuf. Int64Value 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 org.json.JSONArray 
 ; 
 import 
  
 org.json.JSONObject 
 ; 
 import 
  
 org.threeten.bp. Duration 
 
 ; 
 public 
  
 class 
 WriteBufferedStream 
  
 { 
  
 public 
  
 static 
  
 void 
  
 runWriteBufferedStream 
 () 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "MY_PROJECT_ID" 
 ; 
  
 String 
  
 datasetName 
  
 = 
  
 "MY_DATASET_NAME" 
 ; 
  
 String 
  
 tableName 
  
 = 
  
 "MY_TABLE_NAME" 
 ; 
  
 writeBufferedStream 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 writeBufferedStream 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 datasetName 
 , 
  
 String 
  
 tableName 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
 try 
  
 ( 
  BigQueryWriteClient 
 
  
 client 
  
 = 
  
  BigQueryWriteClient 
 
 . 
 create 
 ()) 
  
 { 
  
 // Initialize a write stream for the specified table. 
  
 // For more information on WriteStream.Type, see: 
  
 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html 
  
  WriteStream 
 
  
 stream 
  
 = 
  
  WriteStream 
 
 . 
 newBuilder 
 (). 
 setType 
 ( 
  WriteStream 
 
 . 
 Type 
 . 
 BUFFERED 
 ). 
 build 
 (); 
  
  TableName 
 
  
 parentTable 
  
 = 
  
  TableName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
  CreateWriteStreamRequest 
 
  
 createWriteStreamRequest 
  
 = 
  
  CreateWriteStreamRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
 parentTable 
 . 
  toString 
 
 ()) 
  
 . 
 setWriteStream 
 ( 
 stream 
 ) 
  
 . 
 build 
 (); 
  
  WriteStream 
 
  
 writeStream 
  
 = 
  
 client 
 . 
 createWriteStream 
 ( 
 createWriteStreamRequest 
 ); 
  
 // Configure in-stream automatic retry settings. 
  
 // Error codes that are immediately retried: 
  
 // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED 
  
 // Error codes that are retried with exponential backoff: 
  
 // * RESOURCE_EXHAUSTED 
  
  RetrySettings 
 
  
 retrySettings 
  
 = 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setInitialRetryDelay 
 
 ( 
  Duration 
 
 . 
 ofMillis 
 ( 
 500 
 )) 
  
 . 
  setRetryDelayMultiplier 
 
 ( 
 1.1 
 ) 
  
 . 
  setMaxAttempts 
 
 ( 
 5 
 ) 
  
 . 
  setMaxRetryDelay 
 
 ( 
  Duration 
 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 build 
 (); 
  
 // Use the JSON stream writer to send records in JSON format. 
  
 // For more information about JsonStreamWriter, see: 
  
 // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.JsonStreamWriter 
  
 try 
  
 ( 
  JsonStreamWriter 
 
  
 writer 
  
 = 
  
  JsonStreamWriter 
 
 . 
 newBuilder 
 ( 
 writeStream 
 . 
  getName 
 
 (), 
  
 writeStream 
 . 
  getTableSchema 
 
 ()) 
  
 . 
 setRetrySettings 
 ( 
 retrySettings 
 ) 
  
 . 
 build 
 ()) 
  
 { 
  
 // Write two batches to the stream, each with 10 JSON records. 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 2 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 JSONArray 
  
 jsonArr 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 j 
  
 = 
  
 0 
 ; 
  
 j 
 < 
 10 
 ; 
  
 j 
 ++ 
 ) 
  
 { 
  
 // Create a JSON object that is compatible with the table schema. 
  
 JSONObject 
  
 record 
  
 = 
  
 new 
  
 JSONObject 
 (); 
  
 record 
 . 
 put 
 ( 
 "col1" 
 , 
  
 String 
 . 
 format 
 ( 
 "buffered-record %03d" 
 , 
  
 i 
 )); 
  
 jsonArr 
 . 
 put 
 ( 
 record 
 ); 
  
 } 
  
 ApiFuture<AppendRowsResponse> 
  
 future 
  
 = 
  
 writer 
 . 
 append 
 ( 
 jsonArr 
 ); 
  
  AppendRowsResponse 
 
  
 response 
  
 = 
  
 future 
 . 
 get 
 (); 
  
 } 
  
 // Flush the buffer. 
  
  FlushRowsRequest 
 
  
 flushRowsRequest 
  
 = 
  
  FlushRowsRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setWriteStream 
 ( 
 writeStream 
 . 
  getName 
 
 ()) 
  
 . 
 setOffset 
 ( 
  Int64Value 
 
 . 
 of 
 ( 
 10 
  
 * 
  
 2 
  
 - 
  
 1 
 )) 
  
 // Advance the cursor to the latest record. 
  
 . 
 build 
 (); 
  
  FlushRowsResponse 
 
  
 flushRowsResponse 
  
 = 
  
 client 
 . 
 flushRows 
 ( 
 flushRowsRequest 
 ); 
  
 // You can continue to write to the stream after flushing the buffer. 
  
 } 
  
 // Finalize the stream after use. 
  
  FinalizeWriteStreamRequest 
 
  
 finalizeWriteStreamRequest 
  
 = 
  
  FinalizeWriteStreamRequest 
 
 . 
 newBuilder 
 (). 
 setName 
 ( 
 writeStream 
 . 
  getName 
 
 ()). 
 build 
 (); 
  
 client 
 . 
 finalizeWriteStream 
 ( 
 finalizeWriteStreamRequest 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Appended and committed records successfully." 
 ); 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 // If the wrapped exception is a StatusRuntimeException, check the state of the operation. 
  
 // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: 
  
 // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 e 
 ); 
  
 } 
  
 } 
 } 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Design a Mobile Site
View Site in Mobile | Classic
Share by: