Append pending records

Use the JSON stream writer to append pending records.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Java

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Java API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutureCallback 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BatchCommitWriteStreamsRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BatchCommitWriteStreamsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteClient 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. CreateWriteStreamRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
. StorageException 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. FinalizeWriteStreamResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. JsonStreamWriter 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. StorageError 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. TableName 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. WriteStream 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.protobuf.Descriptors.DescriptorValidationException 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.Phaser 
 ; 
 import 
  
 javax.annotation.concurrent.GuardedBy 
 ; 
 import 
  
 org.json.JSONArray 
 ; 
 import 
  
 org.json.JSONObject 
 ; 
 import 
  
 org.threeten.bp.Duration 
 ; 
 public 
  
 class 
 WritePendingStream 
  
 { 
  
 public 
  
 static 
  
 void 
  
 runWritePendingStream 
 () 
  
 throws 
  
 DescriptorValidationException 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "MY_PROJECT_ID" 
 ; 
  
 String 
  
 datasetName 
  
 = 
  
 "MY_DATASET_NAME" 
 ; 
  
 String 
  
 tableName 
  
 = 
  
 "MY_TABLE_NAME" 
 ; 
  
 writePendingStream 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 writePendingStream 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 datasetName 
 , 
  
 String 
  
 tableName 
 ) 
  
 throws 
  
 DescriptorValidationException 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
  BigQueryWriteClient 
 
  
 client 
  
 = 
  
  BigQueryWriteClient 
 
 . 
 create 
 (); 
  
  TableName 
 
  
 parentTable 
  
 = 
  
  TableName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 DataWriter 
  
 writer 
  
 = 
  
 new 
  
 DataWriter 
 (); 
  
 // One time initialization. 
  
 writer 
 . 
 initialize 
 ( 
 parentTable 
 , 
  
 client 
 ); 
  
 try 
  
 { 
  
 // Write two batches of fake data to the stream, each with 10 JSON records.  Data may be 
  
 // batched up to the maximum request size: 
  
 // https://cloud.google.com/bigquery/quotas#write-api-limits 
  
 long 
  
 offset 
  
 = 
  
 0 
 ; 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 2 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 // Create a JSON object that is compatible with the table schema. 
  
 JSONArray 
  
 jsonArr 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 j 
  
 = 
  
 0 
 ; 
  
 j 
 < 
 10 
 ; 
  
 j 
 ++ 
 ) 
  
 { 
  
 JSONObject 
  
 record 
  
 = 
  
 new 
  
 JSONObject 
 (); 
  
 record 
 . 
 put 
 ( 
 "col1" 
 , 
  
 String 
 . 
 format 
 ( 
 "batch-record %03d-%03d" 
 , 
  
 i 
 , 
  
 j 
 )); 
  
 jsonArr 
 . 
 put 
 ( 
 record 
 ); 
  
 } 
  
 writer 
 . 
 append 
 ( 
 jsonArr 
 , 
  
 offset 
 ); 
  
 offset 
  
 += 
  
 jsonArr 
 . 
 length 
 (); 
  
 } 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 // If the wrapped exception is a StatusRuntimeException, check the state of the operation. 
  
 // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: 
  
 // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Failed to append records. \n" 
  
 + 
  
 e 
 ); 
  
 } 
  
 // Final cleanup for the stream. 
  
 writer 
 . 
 cleanup 
 ( 
 client 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Appended records successfully." 
 ); 
  
 // Once all streams are done, if all writes were successful, commit all of them in one request. 
  
 // This example only has the one stream. If any streams failed, their workload may be 
  
 // retried on a new stream, and then only the successful stream should be included in the 
  
 // commit. 
  
  BatchCommitWriteStreamsRequest 
 
  
 commitRequest 
  
 = 
  
  BatchCommitWriteStreamsRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
 parentTable 
 . 
  toString 
 
 ()) 
  
 . 
  addWriteStreams 
 
 ( 
 writer 
 . 
 getStreamName 
 ()) 
  
 . 
 build 
 (); 
  
  BatchCommitWriteStreamsResponse 
 
  
 commitResponse 
  
 = 
  
 client 
 . 
  batchCommitWriteStreams 
 
 ( 
 commitRequest 
 ); 
  
 // If the response does not have a commit time, it means the commit operation failed. 
  
 if 
  
 ( 
 commitResponse 
 . 
  hasCommitTime 
 
 () 
  
 == 
  
 false 
 ) 
  
 { 
  
 for 
  
 ( 
  StorageError 
 
  
 err 
  
 : 
  
 commitResponse 
 . 
  getStreamErrorsList 
 
 ()) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 err 
 . 
 getErrorMessage 
 ()); 
  
 } 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 "Error committing the streams" 
 ); 
  
 } 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Appended and committed records successfully." 
 ); 
  
 } 
  
 // A simple wrapper object showing how the stateful stream writer should be used. 
  
 private 
  
 static 
  
 class 
 DataWriter 
  
 { 
  
 private 
  
  JsonStreamWriter 
 
  
 streamWriter 
 ; 
  
 // Track the number of in-flight requests to wait for all responses before shutting down. 
  
 private 
  
 final 
  
 Phaser 
  
 inflightRequestCount 
  
 = 
  
 new 
  
 Phaser 
 ( 
 1 
 ); 
  
 private 
  
 final 
  
 Object 
  
 lock 
  
 = 
  
 new 
  
 Object 
 (); 
  
 @GuardedBy 
 ( 
 "lock" 
 ) 
  
 private 
  
 RuntimeException 
  
 error 
  
 = 
  
 null 
 ; 
  
 void 
  
 initialize 
 ( 
  TableName 
 
  
 parentTable 
 , 
  
  BigQueryWriteClient 
 
  
 client 
 ) 
  
 throws 
  
 IOException 
 , 
  
 DescriptorValidationException 
 , 
  
 InterruptedException 
  
 { 
  
 // Initialize a write stream for the specified table. 
  
 // For more information on WriteStream.Type, see: 
  
 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html 
  
  WriteStream 
 
  
 stream 
  
 = 
  
  WriteStream 
 
 . 
 newBuilder 
 (). 
 setType 
 ( 
  WriteStream 
 
 . 
 Type 
 . 
 PENDING 
 ). 
 build 
 (); 
  
 // Configure in-stream automatic retry settings. 
  
 // Error codes that are immediately retried: 
  
 // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED 
  
 // Error codes that are retried with exponential backoff: 
  
 // * RESOURCE_EXHAUSTED 
  
  RetrySettings 
 
  
 retrySettings 
  
 = 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setInitialRetryDelay 
 
 ( 
 Duration 
 . 
 ofMillis 
 ( 
 500 
 )) 
  
 . 
  setRetryDelayMultiplier 
 
 ( 
 1.1 
 ) 
  
 . 
  setMaxAttempts 
 
 ( 
 5 
 ) 
  
 . 
  setMaxRetryDelay 
 
 ( 
 Duration 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 build 
 (); 
  
  CreateWriteStreamRequest 
 
  
 createWriteStreamRequest 
  
 = 
  
  CreateWriteStreamRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
 parentTable 
 . 
  toString 
 
 ()) 
  
 . 
 setWriteStream 
 ( 
 stream 
 ) 
  
 . 
 build 
 (); 
  
  WriteStream 
 
  
 writeStream 
  
 = 
  
 client 
 . 
  createWriteStream 
 
 ( 
  createWriteStream 
 
Request ); 
  
 // Use the JSON stream writer to send records in JSON format. 
  
 // For more information about JsonStreamWriter, see: 
  
 // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.JsonStreamWriter 
  
 streamWriter 
  
 = 
  
  JsonStreamWriter 
 
 . 
 newBuilder 
 ( 
 writeStream 
 . 
  getName 
 
 (), 
  
 writeStream 
 . 
  getTableSchema 
 
 ()) 
  
 . 
 setRetrySettings 
 ( 
 retrySettings 
 ) 
  
 . 
 build 
 (); 
  
 } 
  
 public 
  
 void 
  
 append 
 ( 
 JSONArray 
  
 data 
 , 
  
 long 
  
 offset 
 ) 
  
 throws 
  
 DescriptorValidationException 
 , 
  
 IOException 
 , 
  
 ExecutionException 
  
 { 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 // If earlier appends have failed, we need to reset before continuing. 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 // Append asynchronously for increased throughput. 
  
 ApiFuture<AppendRowsResponse> 
  
 future 
  
 = 
  
 streamWriter 
 . 
  append 
 
 ( 
 data 
 , 
  
 offset 
 ); 
  
  ApiFutures 
 
 . 
  addCallback 
 
 ( 
  
 future 
 , 
  
 new 
  
 AppendCompleteCallback 
 ( 
 this 
 ), 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 // Increase the count of in-flight requests. 
  
 inflightRequestCount 
 . 
 register 
 (); 
  
 } 
  
 public 
  
 void 
  
 cleanup 
 ( 
  BigQueryWriteClient 
 
  
 client 
 ) 
  
 { 
  
 // Wait for all in-flight requests to complete. 
  
 inflightRequestCount 
 . 
 arriveAndAwaitAdvance 
 (); 
  
 // Close the connection to the server. 
  
 streamWriter 
 . 
  close 
 
 (); 
  
 // Verify that no error occurred in the stream. 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 // Finalize the stream. 
  
  FinalizeWriteStreamResponse 
 
  
 finalizeResponse 
  
 = 
  
 client 
 . 
  finalizeWriteStream 
 
 ( 
 streamWriter 
 . 
 getStreamName 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Rows written: " 
  
 + 
  
 finalizeResponse 
 . 
  getRowCount 
 
 ()); 
  
 } 
  
 public 
  
 String 
  
 getStreamName 
 () 
  
 { 
  
 return 
  
 streamWriter 
 . 
  getStreamName 
 
 (); 
  
 } 
  
 static 
  
 class 
 AppendCompleteCallback 
  
 implements 
  
 ApiFutureCallback<AppendRowsResponse> 
  
 { 
  
 private 
  
 final 
  
 DataWriter 
  
 parent 
 ; 
  
 public 
  
 AppendCompleteCallback 
 ( 
 DataWriter 
  
 parent 
 ) 
  
 { 
  
 this 
 . 
 parent 
  
 = 
  
 parent 
 ; 
  
 } 
  
 public 
  
 void 
  
 onSuccess 
 ( 
  AppendRowsResponse 
 
  
 response 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 format 
 ( 
 "Append %d success\n" 
 , 
  
 response 
 . 
  getAppendResult 
 
 (). 
 getOffset 
 (). 
 getValue 
 ()); 
  
 done 
 (); 
  
 } 
  
 public 
  
 void 
  
 onFailure 
 ( 
 Throwable 
  
 throwable 
 ) 
  
 { 
  
 synchronized 
  
 ( 
 this 
 . 
 parent 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 parent 
 . 
 error 
  
 == 
  
 null 
 ) 
  
 { 
  
  StorageException 
 
  
 storageException 
  
 = 
  
  Exceptions 
 
 . 
  toStorageException 
 
 ( 
 throwable 
 ); 
  
 this 
 . 
 parent 
 . 
 error 
  
 = 
  
 ( 
 storageException 
  
 != 
  
 null 
 ) 
  
 ? 
  
 storageException 
  
 : 
  
 new 
  
 RuntimeException 
 ( 
 throwable 
 ); 
  
 } 
  
 } 
  
 System 
 . 
 out 
 . 
 format 
 ( 
 "Error: %s\n" 
 , 
  
 throwable 
 . 
 toString 
 ()); 
  
 done 
 (); 
  
 } 
  
 private 
  
 void 
  
 done 
 () 
  
 { 
  
 // Reduce the count of in-flight requests. 
  
 this 
 . 
 parent 
 . 
 inflightRequestCount 
 . 
 arriveAndDeregister 
 (); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Node.js API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  const 
  
 { 
 adapt 
 , 
  
 managedwriter 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/bigquery-storage 
' 
 ); 
 const 
  
 { 
 WriterClient 
 , 
  
 JSONWriter 
 } 
  
 = 
  
 managedwriter 
 ; 
 async 
  
 function 
  
 appendRowsPendingStream 
 () 
  
 { 
  
 /** 
 * TODO(developer): Uncomment the following lines before running the sample. 
 */ 
  
 // projectId = 'my_project'; 
  
 // datasetId = 'my_dataset'; 
  
 // tableId = 'my_table'; 
  
 const 
  
 destinationTable 
  
 = 
  
 `projects/ 
 ${ 
 projectId 
 } 
 /datasets/ 
 ${ 
 datasetId 
 } 
 /tables/ 
 ${ 
 tableId 
 } 
 ` 
 ; 
  
 const 
  
 streamType 
  
 = 
  
 managedwriter 
 . 
  PendingStream 
 
 ; 
  
 const 
  
 writeClient 
  
 = 
  
 new 
  
  WriterClient 
 
 ({ 
 projectId 
 : 
  
 projectId 
 }); 
  
 try 
  
 { 
  
 const 
  
 writeStream 
  
 = 
  
 await 
  
 writeClient 
 . 
  createWriteStreamFullResponse 
 
 ({ 
  
 streamType 
 , 
  
 destinationTable 
 , 
  
 }); 
  
 const 
  
 streamId 
  
 = 
  
 writeStream 
 . 
 name 
 ; 
  
 console 
 . 
 log 
 ( 
 `Stream created: 
 ${ 
 streamId 
 } 
 ` 
 ); 
  
 const 
  
 protoDescriptor 
  
 = 
  
 adapt 
 . 
  convertStorageSchemaToProto2Descriptor 
 
 ( 
  
 writeStream 
 . 
 tableSchema 
 , 
  
 'root' 
 , 
  
 ); 
  
 const 
  
 connection 
  
 = 
  
 await 
  
 writeClient 
 . 
  createStreamConnection 
 
 ({ 
  
 streamId 
 , 
  
 }); 
  
 const 
  
 writer 
  
 = 
  
 new 
  
  JSONWriter 
 
 ({ 
  
 connection 
 , 
  
 protoDescriptor 
 , 
  
 }); 
  
 let 
  
 rows 
  
 = 
  
 []; 
  
 const 
  
 pendingWrites 
  
 = 
  
 []; 
  
 // Row 1 
  
 let 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 1 
 , 
  
 bool_col 
 : 
  
 true 
 , 
  
 bytes_col 
 : 
  
 Buffer 
 . 
 from 
 ( 
 'hello world' 
 ), 
  
 float64_col 
 : 
  
 parseFloat 
 ( 
 '+123.44999694824219' 
 ), 
  
 int64_col 
 : 
  
 123 
 , 
  
 string_col 
 : 
  
 'omg' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 2 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 2 
 , 
  
 bool_col 
 : 
  
 false 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 3 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 3 
 , 
  
 bytes_col 
 : 
  
 Buffer 
 . 
 from 
 ( 
 'later, gator' 
 ), 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 4 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 4 
 , 
  
 float64_col 
 : 
  
 987.6539916992188 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 5 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 5 
 , 
  
 int64_col 
 : 
  
 321 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 6 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 6 
 , 
  
 string_col 
 : 
  
 'octavia' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Set an offset to allow resuming this stream if the connection breaks. 
  
 // Keep track of which requests the server has acknowledged and resume the 
  
 // stream at the first non-acknowledged message. If the server has already 
  
 // processed a message with that offset, it will return an ALREADY_EXISTS 
  
 // error, which can be safely ignored. 
  
 // The first request must always have an offset of 0. 
  
 let 
  
 offsetValue 
  
 = 
  
 0 
 ; 
  
 // Send batch. 
  
 let 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 , 
  
 offsetValue 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 // Reset rows. 
  
 rows 
  
 = 
  
 []; 
  
 // Row 7 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 7 
 , 
  
 date_col 
 : 
  
 new 
  
 Date 
 ( 
 '2019-02-07' 
 ), 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 8 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 8 
 , 
  
 datetime_col 
 : 
  
 new 
  
 Date 
 ( 
 '2019-02-17T11:24:00.000Z' 
 ), 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 9 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 9 
 , 
  
 geography_col 
 : 
  
 'POINT(5 5)' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 10 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 10 
 , 
  
 numeric_col 
 : 
  
 123456 
 , 
  
 bignumeric_col 
 : 
  
 '99999999999999999999999999999.999999999' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 11 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 11 
 , 
  
 time_col 
 : 
  
 '18:00:00' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 12 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 12 
 , 
  
 timestamp_col 
 : 
  
 new 
  
 Date 
 ( 
 '2022-01-09T03:49:46.564Z' 
 ), 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Offset must equal the number of rows that were previously sent. 
  
 offsetValue 
  
 = 
  
 6 
 ; 
  
 // Send batch. 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 , 
  
 offsetValue 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 rows 
  
 = 
  
 []; 
  
 // Row 13 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 13 
 , 
  
 int64_list 
 : 
  
 [ 
 1999 
 , 
  
 2001 
 ], 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 14 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 14 
 , 
  
 struct_col 
 : 
  
 { 
  
 sub_int_col 
 : 
  
 99 
 , 
  
 }, 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 15 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 15 
 , 
  
 struct_list 
 : 
  
 [{ 
 sub_int_col 
 : 
  
 100 
 }, 
  
 { 
 sub_int_col 
 : 
  
 101 
 }], 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 16 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 16 
 , 
  
 range_col 
 : 
  
 { 
  
 start 
 : 
  
 new 
  
 Date 
 ( 
 '2022-01-09T03:49:46.564Z' 
 ), 
  
 end 
 : 
  
 new 
  
 Date 
 ( 
 '2022-01-09T04:49:46.564Z' 
 ), 
  
 }, 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 offsetValue 
  
 = 
  
 12 
 ; 
  
 // Send batch. 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 , 
  
 offsetValue 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 const 
  
 results 
  
 = 
  
 await 
  
 Promise 
 . 
 all 
 ( 
  
 pendingWrites 
 . 
 map 
 ( 
 pw 
  
 = 
>  
 pw 
 . 
 getResult 
 ()), 
  
 ); 
  
 console 
 . 
 log 
 ( 
 'Write results:' 
 , 
  
 results 
 ); 
  
 const 
  
 { 
 rowCount 
 } 
  
 = 
  
 await 
  
 connection 
 . 
 finalize 
 (); 
  
 console 
 . 
 log 
 ( 
 `Row count: 
 ${ 
 rowCount 
 } 
 ` 
 ); 
  
 const 
  
 response 
  
 = 
  
 await 
  
 writeClient 
 . 
  batchCommitWriteStream 
 
 ({ 
  
 parent 
 : 
  
 destinationTable 
 , 
  
 writeStreams 
 : 
  
 [ 
 streamId 
 ], 
  
 }); 
  
 console 
 . 
 log 
 ( 
 response 
 ); 
  
 } 
  
 catch 
  
 ( 
 err 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 err 
 . 
 message 
 , 
  
 err 
 ); 
  
 } 
  
 finally 
  
 { 
  
 writeClient 
 . 
 close 
 (); 
  
 } 
 } 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Design a Mobile Site
View Site in Mobile | Classic
Share by: