Append records using default client

Use the JSON stream writer to append records using default client.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

Java

Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Java API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutureCallback 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowControlSettings 
 
 ; 
 import 
  
 com.google.api.gax.core. FixedExecutorProvider 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery. BigQuery 
 
 ; 
 import 
  
 com.google.cloud.bigquery. BigQueryOptions 
 
 ; 
 import 
  
 com.google.cloud.bigquery. QueryJobConfiguration 
 
 ; 
 import 
  
 com.google.cloud.bigquery. TableResult 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteClient 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteSettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.AppendSerializationError 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.MaximumRequestCallbackWaitTimeExceededException 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.StorageException 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.StreamWriterClosedException 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. JsonStreamWriter 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. TableName 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.protobuf. Descriptors 
. DescriptorValidationException 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.Map 
 ; 
 import 
  
 java.util.concurrent.Executors 
 ; 
 import 
  
 java.util.concurrent.Phaser 
 ; 
 import 
  
 java.util.concurrent.atomic.AtomicInteger 
 ; 
 import 
  
 javax.annotation.concurrent.GuardedBy 
 ; 
 import 
  
 org.json.JSONArray 
 ; 
 import 
  
 org.json.JSONObject 
 ; 
 import 
  
 org.threeten.bp. Duration 
 
 ; 
 public 
  
 class 
 WriteToDefaultStream 
  
 { 
  
 public 
  
 static 
  
 void 
  
 runWriteToDefaultStream 
 () 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "MY_PROJECT_ID" 
 ; 
  
 String 
  
 datasetName 
  
 = 
  
 "MY_DATASET_NAME" 
 ; 
  
 String 
  
 tableName 
  
 = 
  
 "MY_TABLE_NAME" 
 ; 
  
 writeToDefaultStream 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 } 
  
 private 
  
 static 
  
  ByteString 
 
  
 buildByteString 
 () 
  
 { 
  
 byte 
 [] 
  
 bytes 
  
 = 
  
 new 
  
 byte 
 [] 
  
 { 
 1 
 , 
  
 2 
 , 
  
 3 
 , 
  
 4 
 , 
  
 5 
 }; 
  
 return 
  
  ByteString 
 
 . 
  copyFrom 
 
 ( 
 bytes 
 ); 
  
 } 
  
 // Create a JSON object that is compatible with the table schema. 
  
 private 
  
 static 
  
 JSONObject 
  
 buildRecord 
 ( 
 int 
  
 i 
 , 
  
 int 
  
 j 
 ) 
  
 { 
  
 JSONObject 
  
 record 
  
 = 
  
 new 
  
 JSONObject 
 (); 
  
 StringBuilder 
  
 sbSuffix 
  
 = 
  
 new 
  
 StringBuilder 
 (); 
  
 for 
  
 ( 
 int 
  
 k 
  
 = 
  
 0 
 ; 
  
 k 
 < 
 j 
 ; 
  
 k 
 ++ 
 ) 
  
 { 
  
 sbSuffix 
 . 
 append 
 ( 
 k 
 ); 
  
 } 
  
  record 
 
 . 
 put 
 ( 
 "test_string" 
 , 
  
 String 
 . 
 format 
 ( 
 "record %03d-%03d %s" 
 , 
  
 i 
 , 
  
 j 
 , 
  
 sbSuffix 
 . 
 toString 
 ())); 
  
  ByteString 
 
  
 byteString 
  
 = 
  
 buildByteString 
 (); 
  
  record 
 
 . 
 put 
 ( 
 "test_bytes" 
 , 
  
 byteString 
 ); 
  
  record 
 
 . 
 put 
 ( 
  
 "test_geo" 
 , 
  
 "POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))" 
 ); 
  
 return 
  
 record 
 ; 
  
 } 
  
 public 
  
 static 
  
 void 
  
 writeToDefaultStream 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 datasetName 
 , 
  
 String 
  
 tableName 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
  TableName 
 
  
 parentTable 
  
 = 
  
  TableName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 DataWriter 
  
 writer 
  
 = 
  
 new 
  
 DataWriter 
 (); 
  
 // One time initialization for the worker. 
  
  writer 
 
 . 
 initialize 
 ( 
 parentTable 
 ); 
  
 // Write two batches of fake data to the stream, each with 10 JSON records.  Data may be 
  
 // batched up to the maximum request size: 
  
 // https://cloud.google.com/bigquery/quotas#write-api-limits 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 2 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 JSONArray 
  
 jsonArr 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 j 
  
 = 
  
 0 
 ; 
  
 j 
 < 
 10 
 ; 
  
 j 
 ++ 
 ) 
  
 { 
  
 JSONObject 
  
 record 
  
 = 
  
 buildRecord 
 ( 
 i 
 , 
  
 j 
 ); 
  
 jsonArr 
 . 
 put 
 ( 
 record 
 ); 
  
 } 
  
  writer 
 
 . 
 append 
 ( 
 new 
  
 AppendContext 
 ( 
 jsonArr 
 )); 
  
 } 
  
 // Final cleanup for the stream during worker teardown. 
  
  writer 
 
 . 
 cleanup 
 (); 
  
 verifyExpectedRowCount 
 ( 
 parentTable 
 , 
  
 12 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Appended records successfully." 
 ); 
  
 } 
  
 private 
  
 static 
  
 void 
  
 verifyExpectedRowCount 
 ( 
  TableName 
 
  
 parentTable 
 , 
  
 int 
  
 expectedRowCount 
 ) 
  
 throws 
  
 InterruptedException 
  
 { 
  
 String 
  
 queryRowCount 
  
 = 
  
 "SELECT COUNT(*) FROM `" 
  
 + 
  
 parentTable 
 . 
  getProject 
 
 () 
  
 + 
  
 "." 
  
 + 
  
 parentTable 
 . 
  getDataset 
 
 () 
  
 + 
  
 "." 
  
 + 
  
 parentTable 
 . 
  getTable 
 
 () 
  
 + 
  
 "`" 
 ; 
  
  QueryJobConfiguration 
 
  
 queryConfig 
  
 = 
  
  QueryJobConfiguration 
 
 . 
 newBuilder 
 ( 
 queryRowCount 
 ). 
 build 
 (); 
  
  BigQuery 
 
  
 bigquery 
  
 = 
  
  BigQueryOptions 
 
 . 
 getDefaultInstance 
 (). 
 getService 
 (); 
  
  TableResult 
 
  
 results 
  
 = 
  
 bigquery 
 . 
  query 
 
 ( 
 queryConfig 
 ); 
  
 int 
  
 countRowsActual 
  
 = 
  
 Integer 
 . 
 parseInt 
 ( 
 results 
 . 
  getValues 
 
 (). 
 iterator 
 (). 
 next 
 (). 
 get 
 ( 
 "f0_" 
 ). 
 getStringValue 
 ()); 
  
 if 
  
 ( 
 countRowsActual 
  
 != 
  
 expectedRowCount 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
  
 "Unexpected row count. Expected: " 
  
 + 
  
 expectedRowCount 
  
 + 
  
 ". Actual: " 
  
 + 
  
 countRowsActual 
 ); 
  
 } 
  
 } 
  
 private 
  
 static 
  
 class 
 AppendContext 
  
 { 
  
 JSONArray 
  
 data 
 ; 
  
 AppendContext 
 ( 
 JSONArray 
  
 data 
 ) 
  
 { 
  
 this 
 . 
 data 
  
 = 
  
 data 
 ; 
  
 } 
  
 } 
  
 private 
  
 static 
  
 class 
 DataWriter 
  
 { 
  
 private 
  
 static 
  
 final 
  
 int 
  
 MAX_RECREATE_COUNT 
  
 = 
  
 3 
 ; 
  
 private 
  
  BigQueryWriteClient 
 
  
 client 
 ; 
  
 // Track the number of in-flight requests to wait for all responses before shutting down. 
  
 private 
  
 final 
  
 Phaser 
  
 inflightRequestCount 
  
 = 
  
 new 
  
 Phaser 
 ( 
 1 
 ); 
  
 private 
  
 final 
  
 Object 
  
 lock 
  
 = 
  
 new 
  
 Object 
 (); 
  
 private 
  
  JsonStreamWriter 
 
  
 streamWriter 
 ; 
  
 @GuardedBy 
 ( 
 "lock" 
 ) 
  
 private 
  
 RuntimeException 
  
 error 
  
 = 
  
 null 
 ; 
  
 private 
  
 AtomicInteger 
  
 recreateCount 
  
 = 
  
 new 
  
 AtomicInteger 
 ( 
 0 
 ); 
  
 private 
  
  JsonStreamWriter 
 
  
 createStreamWriter 
 ( 
 String 
  
 tableName 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 // Configure in-stream automatic retry settings. 
  
 // Error codes that are immediately retried: 
  
 // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED 
  
 // Error codes that are retried with exponential backoff: 
  
 // * RESOURCE_EXHAUSTED 
  
  RetrySettings 
 
  
 retrySettings 
  
 = 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setInitialRetryDelay 
 
 ( 
  Duration 
 
 . 
 ofMillis 
 ( 
 500 
 )) 
  
 . 
  setRetryDelayMultiplier 
 
 ( 
 1.1 
 ) 
  
 . 
  setMaxAttempts 
 
 ( 
 5 
 ) 
  
 . 
  setMaxRetryDelay 
 
 ( 
  Duration 
 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 build 
 (); 
  
 // Use the JSON stream writer to send records in JSON format. Specify the table name to write 
  
 // to the default stream. 
  
 // For more information about JsonStreamWriter, see: 
  
 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html 
  
 return 
  
  JsonStreamWriter 
 
 . 
 newBuilder 
 ( 
 tableName 
 , 
  
 client 
 ) 
  
 . 
 setExecutorProvider 
 ( 
  FixedExecutorProvider 
 
 . 
 create 
 ( 
 Executors 
 . 
 newScheduledThreadPool 
 ( 
 100 
 ))) 
  
 . 
 setChannelProvider 
 ( 
  
  BigQueryWriteSettings 
 
 . 
 defaultGrpcTransportProviderBuilder 
 () 
  
 . 
 setKeepAliveTime 
 ( 
 org 
 . 
 threeten 
 . 
 bp 
 . 
 Duration 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 setKeepAliveTimeout 
 ( 
 org 
 . 
 threeten 
 . 
 bp 
 . 
 Duration 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 setKeepAliveWithoutCalls 
 ( 
 true 
 ) 
  
 . 
 setChannelsPerCpu 
 ( 
 2 
 ) 
  
 . 
 build 
 ()) 
  
 . 
 setEnableConnectionPool 
 ( 
 true 
 ) 
  
 // This will allow connection pool to scale up better. 
  
 . 
 setFlowControlSettings 
 ( 
  
  FlowControlSettings 
 
 . 
 newBuilder 
 (). 
 setMaxOutstandingElementCount 
 ( 
 100L 
 ). 
 build 
 ()) 
  
 // If value is missing in json and there is a default value configured on bigquery 
  
 // column, apply the default value to the missing value field. 
  
 . 
 setDefaultMissingValueInterpretation 
 ( 
  
  AppendRowsRequest 
 
 . 
 MissingValueInterpretation 
 . 
 DEFAULT_VALUE 
 ) 
  
 . 
 setRetrySettings 
 ( 
 retrySettings 
 ) 
  
 . 
 build 
 (); 
  
 } 
  
 public 
  
 void 
  
 initialize 
 ( 
  TableName 
 
  
 parentTable 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 // Initialize client without settings, internally within stream writer a new client will be 
  
 // created with full settings. 
  
 client 
  
 = 
  
  BigQueryWriteClient 
 
 . 
 create 
 (); 
  
 streamWriter 
  
 = 
  
 createStreamWriter 
 ( 
 parentTable 
 . 
  toString 
 
 ()); 
  
 } 
  
 public 
  
 void 
  
 append 
 ( 
 AppendContext 
  
 appendContext 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 ! 
 streamWriter 
 . 
  isUserClosed 
 
 () 
 && 
 streamWriter 
 . 
  isClosed 
 
 () 
 && 
 recreateCount 
 . 
 getAndIncrement 
 () 
 < 
 MAX_RECREATE_COUNT 
 ) 
  
 { 
  
 streamWriter 
  
 = 
  
 createStreamWriter 
 ( 
 streamWriter 
 . 
  getStreamName 
 
 ()); 
  
 this 
 . 
 error 
  
 = 
  
 null 
 ; 
  
 } 
  
 // If earlier appends have failed, we need to reset before continuing. 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 // Append asynchronously for increased throughput. 
  
 ApiFuture<AppendRowsResponse> 
  
 future 
  
 = 
  
 streamWriter 
 . 
  append 
 
 ( 
  append 
 
Context . 
 data 
 ); 
  
  ApiFutures 
 
 . 
  addCallback 
 
 ( 
  
 future 
 , 
  
 new 
  
 AppendCompleteCallback 
 ( 
 this 
 , 
  
 appendContext 
 ), 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 // Increase the count of in-flight requests. 
  
 inflightRequestCount 
 . 
 register 
 (); 
  
 } 
  
 public 
  
 void 
  
 cleanup 
 () 
  
 { 
  
 // Wait for all in-flight requests to complete. 
  
 inflightRequestCount 
 . 
 arriveAndAwaitAdvance 
 (); 
  
 client 
 . 
  close 
 
 (); 
  
 // Close the connection to the server. 
  
 streamWriter 
 . 
  close 
 
 (); 
  
 // Verify that no error occurred in the stream. 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 } 
  
 static 
  
 class 
 AppendCompleteCallback 
  
 implements 
  
 ApiFutureCallback<AppendRowsResponse> 
  
 { 
  
 private 
  
 final 
  
 DataWriter 
  
 parent 
 ; 
  
 private 
  
 final 
  
 AppendContext 
  
 appendContext 
 ; 
  
 public 
  
 AppendCompleteCallback 
 ( 
 DataWriter 
  
 parent 
 , 
  
 AppendContext 
  
 appendContext 
 ) 
  
 { 
  
 this 
 . 
 parent 
  
 = 
  
 parent 
 ; 
  
 this 
 . 
 appendContext 
  
 = 
  
 appendContext 
 ; 
  
 } 
  
 public 
  
 void 
  
 onSuccess 
 ( 
  AppendRowsResponse 
 
  
 response 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 format 
 ( 
 "Append success\n" 
 ); 
  
 this 
 . 
 parent 
 . 
 recreateCount 
 . 
 set 
 ( 
 0 
 ); 
  
 done 
 (); 
  
 } 
  
 public 
  
 void 
  
 onFailure 
 ( 
 Throwable 
  
 throwable 
 ) 
  
 { 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 AppendSerializationError 
 ) 
  
 { 
  
 AppendSerializationError 
  
 ase 
  
 = 
  
 ( 
 AppendSerializationError 
 ) 
  
 throwable 
 ; 
  
 Map<Integer 
 , 
  
 String 
>  
 rowIndexToErrorMessage 
  
 = 
  
 ase 
 . 
 getRowIndexToErrorMessage 
 (); 
  
 if 
  
 ( 
 rowIndexToErrorMessage 
 . 
 size 
 () 
 > 
 0 
 ) 
  
 { 
  
 // Omit the faulty rows 
  
 JSONArray 
  
 dataNew 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 appendContext 
 . 
 data 
 . 
 length 
 (); 
  
 i 
 ++ 
 ) 
  
 { 
  
 if 
  
 ( 
 ! 
 rowIndexToErrorMessage 
 . 
 containsKey 
 ( 
 i 
 )) 
  
 { 
  
 dataNew 
 . 
 put 
 ( 
 appendContext 
 . 
 data 
 . 
 get 
 ( 
 i 
 )); 
  
 } 
  
 else 
  
 { 
  
 // process faulty rows by placing them on a dead-letter-queue, for instance 
  
 } 
  
 } 
  
 // Retry the remaining valid rows, but using a separate thread to 
  
 // avoid potentially blocking while we are in a callback. 
  
 if 
  
 ( 
 dataNew 
 . 
 length 
 () 
 > 
 0 
 ) 
  
 { 
  
 try 
  
 { 
  
 this 
 . 
 parent 
 . 
 append 
 ( 
 new 
  
 AppendContext 
 ( 
 dataNew 
 )); 
  
 } 
  
 catch 
  
 ( 
  DescriptorValidationException 
 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 InterruptedException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 } 
  
 // Mark the existing attempt as done since we got a response for it 
  
 done 
 (); 
  
 return 
 ; 
  
 } 
  
 } 
  
 boolean 
  
 resendRequest 
  
 = 
  
 false 
 ; 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 MaximumRequestCallbackWaitTimeExceededException 
 ) 
  
 { 
  
 resendRequest 
  
 = 
  
 true 
 ; 
  
 } 
  
 else 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 StreamWriterClosedException 
 ) 
  
 { 
  
 if 
  
 ( 
 ! 
 parent 
 . 
 streamWriter 
 . 
 isUserClosed 
 ()) 
  
 { 
  
 resendRequest 
  
 = 
  
 true 
 ; 
  
 } 
  
 } 
  
 if 
  
 ( 
 resendRequest 
 ) 
  
 { 
  
 // Retry this request. 
  
 try 
  
 { 
  
 this 
 . 
 parent 
 . 
 append 
 ( 
 new 
  
 AppendContext 
 ( 
 appendContext 
 . 
 data 
 )); 
  
 } 
  
 catch 
  
 ( 
  DescriptorValidationException 
 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 InterruptedException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 // Mark the existing attempt as done since we got a response for it 
  
 done 
 (); 
  
 return 
 ; 
  
 } 
  
 synchronized 
  
 ( 
 this 
 . 
 parent 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 parent 
 . 
 error 
  
 == 
  
 null 
 ) 
  
 { 
  
 StorageException 
  
 storageException 
  
 = 
  
  Exceptions 
 
 . 
 toStorageException 
 ( 
 throwable 
 ); 
  
 this 
 . 
 parent 
 . 
 error 
  
 = 
  
 ( 
 storageException 
  
 != 
  
 null 
 ) 
  
 ? 
  
 storageException 
  
 : 
  
 new 
  
 RuntimeException 
 ( 
 throwable 
 ); 
  
 } 
  
 } 
  
 done 
 (); 
  
 } 
  
 private 
  
 void 
  
 done 
 () 
  
 { 
  
 // Reduce the count of in-flight requests. 
  
 this 
 . 
 parent 
 . 
 inflightRequestCount 
 . 
 arriveAndDeregister 
 (); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Node.js API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  const 
  
 { 
 adapt 
 , 
  
 managedwriter 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/bigquery-storage 
' 
 ); 
 const 
  
 { 
 WriterClient 
 , 
  
 JSONWriter 
 } 
  
 = 
  
 managedwriter 
 ; 
 async 
  
 function 
  
 appendJSONRowsDefaultStream 
 () 
  
 { 
  
 /** 
 * TODO(developer): Uncomment the following lines before running the sample. 
 */ 
  
 // projectId = 'my_project'; 
  
 // datasetId = 'my_dataset'; 
  
 // tableId = 'my_table'; 
  
 const 
  
 destinationTable 
  
 = 
  
 `projects/ 
 ${ 
 projectId 
 } 
 /datasets/ 
 ${ 
 datasetId 
 } 
 /tables/ 
 ${ 
 tableId 
 } 
 ` 
 ; 
  
 const 
  
 writeClient 
  
 = 
  
 new 
  
  WriterClient 
 
 ({ 
 projectId 
 }); 
  
 try 
  
 { 
  
 const 
  
 writeStream 
  
 = 
  
 await 
  
 writeClient 
 . 
 getWriteStream 
 ({ 
  
 streamId 
 : 
  
 ` 
 ${ 
 destinationTable 
 } 
 /streams/_default` 
 , 
  
 view 
 : 
  
 ' FULL 
' 
 , 
  
 }); 
  
 const 
  
 protoDescriptor 
  
 = 
  
 adapt 
 . 
  convertStorageSchemaToProto2Descriptor 
 
 ( 
  
 writeStream 
 . 
 tableSchema 
 , 
  
 'root' 
 , 
  
 ); 
  
 const 
  
 connection 
  
 = 
  
 await 
  
 writeClient 
 . 
  createStreamConnection 
 
 ({ 
  
 streamId 
 : 
  
 managedwriter 
 . 
  DefaultStream 
 
 , 
  
 destinationTable 
 , 
  
 }); 
  
 const 
  
 streamId 
  
 = 
  
 connection 
 . 
 getStreamId 
 (); 
  
 const 
  
 writer 
  
 = 
  
 new 
  
  JSONWriter 
 
 ({ 
  
 streamId 
 , 
  
 connection 
 , 
  
 protoDescriptor 
 , 
  
 }); 
  
 let 
  
 rows 
  
 = 
  
 []; 
  
 const 
  
 pendingWrites 
  
 = 
  
 []; 
  
 // Row 1 
  
 let 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 1 
 , 
  
 customer_name 
 : 
  
 'Octavia' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 2 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 2 
 , 
  
 customer_name 
 : 
  
 'Turing' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Send batch. 
  
 let 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 rows 
  
 = 
  
 []; 
  
 // Row 3 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 3 
 , 
  
 customer_name 
 : 
  
 'Bell' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Send batch. 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 const 
  
 results 
  
 = 
  
 await 
  
 Promise 
 . 
 all 
 ( 
  
 pendingWrites 
 . 
 map 
 ( 
 pw 
  
 = 
>  
 pw 
 . 
 getResult 
 ()), 
  
 ); 
  
 console 
 . 
 log 
 ( 
 'Write results:' 
 , 
  
 results 
 ); 
  
 } 
  
 catch 
  
 ( 
 err 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 err 
 ); 
  
 } 
  
 finally 
  
 { 
  
 writeClient 
 . 
 close 
 (); 
  
 } 
 } 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Design a Mobile Site
View Site in Mobile | Classic
Share by: