Stream data using the Storage Write API

This document describes how to use the BigQuery Storage Write API to stream data into BigQuery.

In streaming scenarios, data arrives continuously and should be available for reads with minimal latency. When using the BigQuery Storage Write API for streaming workloads, consider what guarantees you need:

  • If your application only needs at-least-once semantics, then use the default stream.
  • If you need exactly-once semantics, then create one or more streams in committed typeand use stream offsets to guarantee exactly-once writes.

In committed type, data written to the stream is available for query as soon as the server acknowledges the write request. The default stream also uses committed type, but does not provide exactly-once guarantees.

Use the default stream for at-least-once semantics

If your application can accept the possibility of duplicate records appearing in the destination table, then we recommend using the default stream for streaming scenarios.

The following code shows how to write data to the default stream:

Java

To learn how to install and use the client library for BigQuery, see BigQuery client libraries . For more information, see the BigQuery Java API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutureCallback 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.batching. FlowControlSettings 
 
 ; 
 import 
  
 com.google.api.gax.core. FixedExecutorProvider 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery. BigQuery 
 
 ; 
 import 
  
 com.google.cloud.bigquery. BigQueryOptions 
 
 ; 
 import 
  
 com.google.cloud.bigquery. QueryJobConfiguration 
 
 ; 
 import 
  
 com.google.cloud.bigquery. TableResult 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteClient 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteSettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.AppendSerializationError 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.MaximumRequestCallbackWaitTimeExceededException 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.StorageException 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
.StreamWriterClosedException 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. JsonStreamWriter 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. TableName 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.protobuf. Descriptors 
. DescriptorValidationException 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.Map 
 ; 
 import 
  
 java.util.concurrent.Executors 
 ; 
 import 
  
 java.util.concurrent.Phaser 
 ; 
 import 
  
 java.util.concurrent.atomic.AtomicInteger 
 ; 
 import 
  
 javax.annotation.concurrent.GuardedBy 
 ; 
 import 
  
 org.json.JSONArray 
 ; 
 import 
  
 org.json.JSONObject 
 ; 
 import 
  
 org.threeten.bp. Duration 
 
 ; 
 public 
  
 class 
 WriteToDefaultStream 
  
 { 
  
 public 
  
 static 
  
 void 
  
 runWriteToDefaultStream 
 () 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "MY_PROJECT_ID" 
 ; 
  
 String 
  
 datasetName 
  
 = 
  
 "MY_DATASET_NAME" 
 ; 
  
 String 
  
 tableName 
  
 = 
  
 "MY_TABLE_NAME" 
 ; 
  
 writeToDefaultStream 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 } 
  
 private 
  
 static 
  
  ByteString 
 
  
 buildByteString 
 () 
  
 { 
  
 byte 
 [] 
  
 bytes 
  
 = 
  
 new 
  
 byte 
 [] 
  
 { 
 1 
 , 
  
 2 
 , 
  
 3 
 , 
  
 4 
 , 
  
 5 
 }; 
  
 return 
  
  ByteString 
 
 . 
  copyFrom 
 
 ( 
 bytes 
 ); 
  
 } 
  
 // Create a JSON object that is compatible with the table schema. 
  
 private 
  
 static 
  
 JSONObject 
  
 buildRecord 
 ( 
 int 
  
 i 
 , 
  
 int 
  
 j 
 ) 
  
 { 
  
 JSONObject 
  
 record 
  
 = 
  
 new 
  
 JSONObject 
 (); 
  
 StringBuilder 
  
 sbSuffix 
  
 = 
  
 new 
  
 StringBuilder 
 (); 
  
 for 
  
 ( 
 int 
  
 k 
  
 = 
  
 0 
 ; 
  
 k 
 < 
 j 
 ; 
  
 k 
 ++ 
 ) 
  
 { 
  
 sbSuffix 
 . 
 append 
 ( 
 k 
 ); 
  
 } 
  
  record 
 
 . 
 put 
 ( 
 "test_string" 
 , 
  
 String 
 . 
 format 
 ( 
 "record %03d-%03d %s" 
 , 
  
 i 
 , 
  
 j 
 , 
  
 sbSuffix 
 . 
 toString 
 ())); 
  
  ByteString 
 
  
 byteString 
  
 = 
  
 buildByteString 
 (); 
  
  record 
 
 . 
 put 
 ( 
 "test_bytes" 
 , 
  
 byteString 
 ); 
  
  record 
 
 . 
 put 
 ( 
  
 "test_geo" 
 , 
  
 "POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))" 
 ); 
  
 return 
  
 record 
 ; 
  
 } 
  
 public 
  
 static 
  
 void 
  
 writeToDefaultStream 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 datasetName 
 , 
  
 String 
  
 tableName 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
  TableName 
 
  
 parentTable 
  
 = 
  
  TableName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 DataWriter 
  
 writer 
  
 = 
  
 new 
  
 DataWriter 
 (); 
  
 // One time initialization for the worker. 
  
  writer 
 
 . 
 initialize 
 ( 
 parentTable 
 ); 
  
 // Write two batches of fake data to the stream, each with 10 JSON records.  Data may be 
  
 // batched up to the maximum request size: 
  
 // https://cloud.google.com/bigquery/quotas#write-api-limits 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 2 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 JSONArray 
  
 jsonArr 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 j 
  
 = 
  
 0 
 ; 
  
 j 
 < 
 10 
 ; 
  
 j 
 ++ 
 ) 
  
 { 
  
 JSONObject 
  
 record 
  
 = 
  
 buildRecord 
 ( 
 i 
 , 
  
 j 
 ); 
  
 jsonArr 
 . 
 put 
 ( 
 record 
 ); 
  
 } 
  
  writer 
 
 . 
 append 
 ( 
 new 
  
 AppendContext 
 ( 
 jsonArr 
 )); 
  
 } 
  
 // Final cleanup for the stream during worker teardown. 
  
  writer 
 
 . 
 cleanup 
 (); 
  
 verifyExpectedRowCount 
 ( 
 parentTable 
 , 
  
 12 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Appended records successfully." 
 ); 
  
 } 
  
 private 
  
 static 
  
 void 
  
 verifyExpectedRowCount 
 ( 
  TableName 
 
  
 parentTable 
 , 
  
 int 
  
 expectedRowCount 
 ) 
  
 throws 
  
 InterruptedException 
  
 { 
  
 String 
  
 queryRowCount 
  
 = 
  
 "SELECT COUNT(*) FROM `" 
  
 + 
  
 parentTable 
 . 
  getProject 
 
 () 
  
 + 
  
 "." 
  
 + 
  
 parentTable 
 . 
  getDataset 
 
 () 
  
 + 
  
 "." 
  
 + 
  
 parentTable 
 . 
  getTable 
 
 () 
  
 + 
  
 "`" 
 ; 
  
  QueryJobConfiguration 
 
  
 queryConfig 
  
 = 
  
  QueryJobConfiguration 
 
 . 
 newBuilder 
 ( 
 queryRowCount 
 ). 
 build 
 (); 
  
  BigQuery 
 
  
 bigquery 
  
 = 
  
  BigQueryOptions 
 
 . 
 getDefaultInstance 
 (). 
 getService 
 (); 
  
  TableResult 
 
  
 results 
  
 = 
  
 bigquery 
 . 
  query 
 
 ( 
 queryConfig 
 ); 
  
 int 
  
 countRowsActual 
  
 = 
  
 Integer 
 . 
 parseInt 
 ( 
 results 
 . 
  getValues 
 
 (). 
 iterator 
 (). 
 next 
 (). 
 get 
 ( 
 "f0_" 
 ). 
 getStringValue 
 ()); 
  
 if 
  
 ( 
 countRowsActual 
  
 != 
  
 expectedRowCount 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
  
 "Unexpected row count. Expected: " 
  
 + 
  
 expectedRowCount 
  
 + 
  
 ". Actual: " 
  
 + 
  
 countRowsActual 
 ); 
  
 } 
  
 } 
  
 private 
  
 static 
  
 class 
 AppendContext 
  
 { 
  
 JSONArray 
  
 data 
 ; 
  
 AppendContext 
 ( 
 JSONArray 
  
 data 
 ) 
  
 { 
  
 this 
 . 
 data 
  
 = 
  
 data 
 ; 
  
 } 
  
 } 
  
 private 
  
 static 
  
 class 
 DataWriter 
  
 { 
  
 private 
  
 static 
  
 final 
  
 int 
  
 MAX_RECREATE_COUNT 
  
 = 
  
 3 
 ; 
  
 private 
  
  BigQueryWriteClient 
 
  
 client 
 ; 
  
 // Track the number of in-flight requests to wait for all responses before shutting down. 
  
 private 
  
 final 
  
 Phaser 
  
 inflightRequestCount 
  
 = 
  
 new 
  
 Phaser 
 ( 
 1 
 ); 
  
 private 
  
 final 
  
 Object 
  
 lock 
  
 = 
  
 new 
  
 Object 
 (); 
  
 private 
  
  JsonStreamWriter 
 
  
 streamWriter 
 ; 
  
 @GuardedBy 
 ( 
 "lock" 
 ) 
  
 private 
  
 RuntimeException 
  
 error 
  
 = 
  
 null 
 ; 
  
 private 
  
 AtomicInteger 
  
 recreateCount 
  
 = 
  
 new 
  
 AtomicInteger 
 ( 
 0 
 ); 
  
 private 
  
  JsonStreamWriter 
 
  
 createStreamWriter 
 ( 
 String 
  
 tableName 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 // Configure in-stream automatic retry settings. 
  
 // Error codes that are immediately retried: 
  
 // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED 
  
 // Error codes that are retried with exponential backoff: 
  
 // * RESOURCE_EXHAUSTED 
  
  RetrySettings 
 
  
 retrySettings 
  
 = 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setInitialRetryDelay 
 
 ( 
  Duration 
 
 . 
 ofMillis 
 ( 
 500 
 )) 
  
 . 
  setRetryDelayMultiplier 
 
 ( 
 1.1 
 ) 
  
 . 
  setMaxAttempts 
 
 ( 
 5 
 ) 
  
 . 
  setMaxRetryDelay 
 
 ( 
  Duration 
 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 build 
 (); 
  
 // Use the JSON stream writer to send records in JSON format. Specify the table name to write 
  
 // to the default stream. 
  
 // For more information about JsonStreamWriter, see: 
  
 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html 
  
 return 
  
  JsonStreamWriter 
 
 . 
 newBuilder 
 ( 
 tableName 
 , 
  
 client 
 ) 
  
 . 
 setExecutorProvider 
 ( 
  FixedExecutorProvider 
 
 . 
 create 
 ( 
 Executors 
 . 
 newScheduledThreadPool 
 ( 
 100 
 ))) 
  
 . 
 setChannelProvider 
 ( 
  
  BigQueryWriteSettings 
 
 . 
 defaultGrpcTransportProviderBuilder 
 () 
  
 . 
 setKeepAliveTime 
 ( 
 org 
 . 
 threeten 
 . 
 bp 
 . 
 Duration 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 setKeepAliveTimeout 
 ( 
 org 
 . 
 threeten 
 . 
 bp 
 . 
 Duration 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 setKeepAliveWithoutCalls 
 ( 
 true 
 ) 
  
 . 
 setChannelsPerCpu 
 ( 
 2 
 ) 
  
 . 
 build 
 ()) 
  
 . 
 setEnableConnectionPool 
 ( 
 true 
 ) 
  
 // This will allow connection pool to scale up better. 
  
 . 
 setFlowControlSettings 
 ( 
  
  FlowControlSettings 
 
 . 
 newBuilder 
 (). 
 setMaxOutstandingElementCount 
 ( 
 100L 
 ). 
 build 
 ()) 
  
 // If value is missing in json and there is a default value configured on bigquery 
  
 // column, apply the default value to the missing value field. 
  
 . 
 setDefaultMissingValueInterpretation 
 ( 
  
  AppendRowsRequest 
 
 . 
 MissingValueInterpretation 
 . 
 DEFAULT_VALUE 
 ) 
  
 . 
 setRetrySettings 
 ( 
 retrySettings 
 ) 
  
 . 
 build 
 (); 
  
 } 
  
 public 
  
 void 
  
 initialize 
 ( 
  TableName 
 
  
 parentTable 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 // Initialize client without settings, internally within stream writer a new client will be 
  
 // created with full settings. 
  
 client 
  
 = 
  
  BigQueryWriteClient 
 
 . 
 create 
 (); 
  
 streamWriter 
  
 = 
  
 createStreamWriter 
 ( 
 parentTable 
 . 
  toString 
 
 ()); 
  
 } 
  
 public 
  
 void 
  
 append 
 ( 
 AppendContext 
  
 appendContext 
 ) 
  
 throws 
  
  DescriptorValidationException 
 
 , 
  
 IOException 
 , 
  
 InterruptedException 
  
 { 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 ! 
 streamWriter 
 . 
  isUserClosed 
 
 () 
 && 
 streamWriter 
 . 
  isClosed 
 
 () 
 && 
 recreateCount 
 . 
 getAndIncrement 
 () 
 < 
 MAX_RECREATE_COUNT 
 ) 
  
 { 
  
 streamWriter 
  
 = 
  
 createStreamWriter 
 ( 
 streamWriter 
 . 
  getStreamName 
 
 ()); 
  
 this 
 . 
 error 
  
 = 
  
 null 
 ; 
  
 } 
  
 // If earlier appends have failed, we need to reset before continuing. 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 // Append asynchronously for increased throughput. 
  
 ApiFuture<AppendRowsResponse> 
  
 future 
  
 = 
  
 streamWriter 
 . 
  append 
 
 ( 
  append 
 
Context . 
 data 
 ); 
  
  ApiFutures 
 
 . 
  addCallback 
 
 ( 
  
 future 
 , 
  
 new 
  
 AppendCompleteCallback 
 ( 
 this 
 , 
  
 appendContext 
 ), 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 // Increase the count of in-flight requests. 
  
 inflightRequestCount 
 . 
 register 
 (); 
  
 } 
  
 public 
  
 void 
  
 cleanup 
 () 
  
 { 
  
 // Wait for all in-flight requests to complete. 
  
 inflightRequestCount 
 . 
 arriveAndAwaitAdvance 
 (); 
  
 client 
 . 
  close 
 
 (); 
  
 // Close the connection to the server. 
  
 streamWriter 
 . 
  close 
 
 (); 
  
 // Verify that no error occurred in the stream. 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 } 
  
 static 
  
 class 
 AppendCompleteCallback 
  
 implements 
  
 ApiFutureCallback<AppendRowsResponse> 
  
 { 
  
 private 
  
 final 
  
 DataWriter 
  
 parent 
 ; 
  
 private 
  
 final 
  
 AppendContext 
  
 appendContext 
 ; 
  
 public 
  
 AppendCompleteCallback 
 ( 
 DataWriter 
  
 parent 
 , 
  
 AppendContext 
  
 appendContext 
 ) 
  
 { 
  
 this 
 . 
 parent 
  
 = 
  
 parent 
 ; 
  
 this 
 . 
 appendContext 
  
 = 
  
 appendContext 
 ; 
  
 } 
  
 public 
  
 void 
  
 onSuccess 
 ( 
  AppendRowsResponse 
 
  
 response 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 format 
 ( 
 "Append success\n" 
 ); 
  
 this 
 . 
 parent 
 . 
 recreateCount 
 . 
 set 
 ( 
 0 
 ); 
  
 done 
 (); 
  
 } 
  
 public 
  
 void 
  
 onFailure 
 ( 
 Throwable 
  
 throwable 
 ) 
  
 { 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 AppendSerializationError 
 ) 
  
 { 
  
 AppendSerializationError 
  
 ase 
  
 = 
  
 ( 
 AppendSerializationError 
 ) 
  
 throwable 
 ; 
  
 Map<Integer 
 , 
  
 String 
>  
 rowIndexToErrorMessage 
  
 = 
  
 ase 
 . 
 getRowIndexToErrorMessage 
 (); 
  
 if 
  
 ( 
 rowIndexToErrorMessage 
 . 
 size 
 () 
 > 
 0 
 ) 
  
 { 
  
 // Omit the faulty rows 
  
 JSONArray 
  
 dataNew 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 appendContext 
 . 
 data 
 . 
 length 
 (); 
  
 i 
 ++ 
 ) 
  
 { 
  
 if 
  
 ( 
 ! 
 rowIndexToErrorMessage 
 . 
 containsKey 
 ( 
 i 
 )) 
  
 { 
  
 dataNew 
 . 
 put 
 ( 
 appendContext 
 . 
 data 
 . 
 get 
 ( 
 i 
 )); 
  
 } 
  
 else 
  
 { 
  
 // process faulty rows by placing them on a dead-letter-queue, for instance 
  
 } 
  
 } 
  
 // Retry the remaining valid rows, but using a separate thread to 
  
 // avoid potentially blocking while we are in a callback. 
  
 if 
  
 ( 
 dataNew 
 . 
 length 
 () 
 > 
 0 
 ) 
  
 { 
  
 try 
  
 { 
  
 this 
 . 
 parent 
 . 
 append 
 ( 
 new 
  
 AppendContext 
 ( 
 dataNew 
 )); 
  
 } 
  
 catch 
  
 ( 
  DescriptorValidationException 
 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 InterruptedException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 } 
  
 // Mark the existing attempt as done since we got a response for it 
  
 done 
 (); 
  
 return 
 ; 
  
 } 
  
 } 
  
 boolean 
  
 resendRequest 
  
 = 
  
 false 
 ; 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 MaximumRequestCallbackWaitTimeExceededException 
 ) 
  
 { 
  
 resendRequest 
  
 = 
  
 true 
 ; 
  
 } 
  
 else 
  
 if 
  
 ( 
 throwable 
  
 instanceof 
  
 StreamWriterClosedException 
 ) 
  
 { 
  
 if 
  
 ( 
 ! 
 parent 
 . 
 streamWriter 
 . 
 isUserClosed 
 ()) 
  
 { 
  
 resendRequest 
  
 = 
  
 true 
 ; 
  
 } 
  
 } 
  
 if 
  
 ( 
 resendRequest 
 ) 
  
 { 
  
 // Retry this request. 
  
 try 
  
 { 
  
 this 
 . 
 parent 
 . 
 append 
 ( 
 new 
  
 AppendContext 
 ( 
 appendContext 
 . 
 data 
 )); 
  
 } 
  
 catch 
  
 ( 
  DescriptorValidationException 
 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 catch 
  
 ( 
 InterruptedException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 // Mark the existing attempt as done since we got a response for it 
  
 done 
 (); 
  
 return 
 ; 
  
 } 
  
 synchronized 
  
 ( 
 this 
 . 
 parent 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 parent 
 . 
 error 
  
 == 
  
 null 
 ) 
  
 { 
  
 StorageException 
  
 storageException 
  
 = 
  
  Exceptions 
 
 . 
 toStorageException 
 ( 
 throwable 
 ); 
  
 this 
 . 
 parent 
 . 
 error 
  
 = 
  
 ( 
 storageException 
  
 != 
  
 null 
 ) 
  
 ? 
  
 storageException 
  
 : 
  
 new 
  
 RuntimeException 
 ( 
 throwable 
 ); 
  
 } 
  
 } 
  
 done 
 (); 
  
 } 
  
 private 
  
 void 
  
 done 
 () 
  
 { 
  
 // Reduce the count of in-flight requests. 
  
 this 
 . 
 parent 
 . 
 inflightRequestCount 
 . 
 arriveAndDeregister 
 (); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

To learn how to install and use the client library for BigQuery, see BigQuery client libraries .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  const 
  
 { 
 adapt 
 , 
  
 managedwriter 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/bigquery-storage 
' 
 ); 
 const 
  
 { 
 WriterClient 
 , 
  
 JSONWriter 
 } 
  
 = 
  
 managedwriter 
 ; 
 async 
  
 function 
  
 appendJSONRowsDefaultStream 
 () 
  
 { 
  
 /** 
 * TODO(developer): Uncomment the following lines before running the sample. 
 */ 
  
 // projectId = 'my_project'; 
  
 // datasetId = 'my_dataset'; 
  
 // tableId = 'my_table'; 
  
 const 
  
 destinationTable 
  
 = 
  
 `projects/ 
 ${ 
 projectId 
 } 
 /datasets/ 
 ${ 
 datasetId 
 } 
 /tables/ 
 ${ 
 tableId 
 } 
 ` 
 ; 
  
 const 
  
 writeClient 
  
 = 
  
 new 
  
  WriterClient 
 
 ({ 
 projectId 
 }); 
  
 try 
  
 { 
  
 const 
  
 writeStream 
  
 = 
  
 await 
  
 writeClient 
 . 
 getWriteStream 
 ({ 
  
 streamId 
 : 
  
 ` 
 ${ 
 destinationTable 
 } 
 /streams/_default` 
 , 
  
 view 
 : 
  
 ' FULL 
' 
 , 
  
 }); 
  
 const 
  
 protoDescriptor 
  
 = 
  
 adapt 
 . 
  convertStorageSchemaToProto2Descriptor 
 
 ( 
  
 writeStream 
 . 
 tableSchema 
 , 
  
 'root' 
 , 
  
 ); 
  
 const 
  
 connection 
  
 = 
  
 await 
  
 writeClient 
 . 
  createStreamConnection 
 
 ({ 
  
 streamId 
 : 
  
 managedwriter 
 . 
  DefaultStream 
 
 , 
  
 destinationTable 
 , 
  
 }); 
  
 const 
  
 streamId 
  
 = 
  
 connection 
 . 
 getStreamId 
 (); 
  
 const 
  
 writer 
  
 = 
  
 new 
  
  JSONWriter 
 
 ({ 
  
 streamId 
 , 
  
 connection 
 , 
  
 protoDescriptor 
 , 
  
 }); 
  
 let 
  
 rows 
  
 = 
  
 []; 
  
 const 
  
 pendingWrites 
  
 = 
  
 []; 
  
 // Row 1 
  
 let 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 1 
 , 
  
 customer_name 
 : 
  
 'Octavia' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 2 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 2 
 , 
  
 customer_name 
 : 
  
 'Turing' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Send batch. 
  
 let 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 rows 
  
 = 
  
 []; 
  
 // Row 3 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 3 
 , 
  
 customer_name 
 : 
  
 'Bell' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Send batch. 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 const 
  
 results 
  
 = 
  
 await 
  
 Promise 
 . 
 all 
 ( 
  
 pendingWrites 
 . 
 map 
 ( 
 pw 
  
 = 
>  
 pw 
 . 
 getResult 
 ()), 
  
 ); 
  
 console 
 . 
 log 
 ( 
 'Write results:' 
 , 
  
 results 
 ); 
  
 } 
  
 catch 
  
 ( 
 err 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 err 
 ); 
  
 } 
  
 finally 
  
 { 
  
 writeClient 
 . 
 close 
 (); 
  
 } 
 } 
 

Python

This example shows how to insert a record with two fields using the default stream:

  from 
  
 google.cloud 
  
 import 
  bigquery_storage_v1 
 
 from 
  
 google.cloud.bigquery_storage_v1 
  
 import 
  types 
 
 from 
  
 google.cloud.bigquery_storage_v1 
  
 import 
  writer 
 
 from 
  
 google.protobuf 
  
 import 
 descriptor_pb2 
 import 
  
 logging 
 import 
  
 json 
 import 
  
 sample_data_pb2 
 # The list of columns from the table's schema to search in the given data to write to BigQuery. 
 TABLE_COLUMNS_TO_CHECK 
 = 
 [ 
 "name" 
 , 
 "age" 
 ] 
 # Function to create a batch of row data to be serialized. 
 def 
  
 create_row_data 
 ( 
 data 
 ): 
 row 
 = 
 sample_data_pb2 
 . 
 SampleData 
 () 
 for 
 field 
 in 
 TABLE_COLUMNS_TO_CHECK 
 : 
 # Optional fields will be passed as null if not provided 
 if 
 field 
 in 
 data 
 : 
 setattr 
 ( 
 row 
 , 
 field 
 , 
 data 
 [ 
 field 
 ]) 
 return 
 row 
 . 
 SerializeToString 
 () 
 class 
  
 BigQueryStorageWriteAppend 
 ( 
 object 
 ): 
 # The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default 
 def 
  
 append_rows_proto2 
 ( 
 project_id 
 : 
 str 
 , 
 dataset_id 
 : 
 str 
 , 
 table_id 
 : 
 str 
 , 
 data 
 : 
 dict 
 ): 
 write_client 
 = 
  bigquery_storage_v1 
 
 . 
 BigQueryWriteClient 
 () 
 parent 
 = 
 write_client 
 . 
  table_path 
 
 ( 
 project_id 
 , 
 dataset_id 
 , 
 table_id 
 ) 
 stream_name 
 = 
 f 
 ' 
 { 
 parent 
 } 
 /_default' 
 write_stream 
 = 
  types 
 
 . 
  WriteStream 
 
 () 
 # Create a template with fields needed for the first request. 
 request_template 
 = 
  types 
 
 . 
  AppendRowsRequest 
 
 () 
 # The request must contain the stream name. 
 request_template 
 . 
 write_stream 
 = 
 stream_name 
 # Generating the protocol buffer representation of the message descriptor. 
 proto_schema 
 = 
  types 
 
 . 
  ProtoSchema 
 
 () 
 proto_descriptor 
 = 
 descriptor_pb2 
 . 
 DescriptorProto 
 () 
 sample_data_pb2 
 . 
 SampleData 
 . 
 DESCRIPTOR 
 . 
 CopyToProto 
 ( 
 proto_descriptor 
 ) 
 proto_schema 
 . 
 proto_descriptor 
 = 
 proto_descriptor 
 proto_data 
 = 
  types 
 
 . 
  AppendRowsRequest 
 
 . 
  ProtoData 
 
 () 
 proto_data 
 . 
 writer_schema 
 = 
 proto_schema 
 request_template 
 . 
 proto_rows 
 = 
 proto_data 
 # Construct an AppendRowsStream to send an arbitrary number of requests to a stream. 
 append_rows_stream 
 = 
  writer 
 
 . 
  AppendRowsStream 
 
 ( 
 write_client 
 , 
 request_template 
 ) 
 # Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data. 
 proto_rows 
 = 
  types 
 
 . 
  ProtoRows 
 
 () 
 for 
 row 
 in 
 data 
 : 
 proto_rows 
 . 
 serialized_rows 
 . 
 append 
 ( 
 create_row_data 
 ( 
 row 
 )) 
 # Appends data to the given stream. 
 request 
 = 
  types 
 
 . 
  AppendRowsRequest 
 
 () 
 proto_data 
 = 
  types 
 
 . 
  AppendRowsRequest 
 
 . 
  ProtoData 
 
 () 
 proto_data 
 . 
  rows 
 
 = 
 proto_rows 
 request 
 . 
 proto_rows 
 = 
 proto_data 
 append_rows_stream 
 . 
  send 
 
 ( 
 request 
 ) 
 print 
 ( 
 f 
 "Rows to table: ' 
 { 
 parent 
 } 
 ' have been written." 
 ) 
 if 
 __name__ 
 == 
 "__main__" 
 : 
 ###### Uncomment the below block to provide additional logging capabilities ###### 
 #logging.basicConfig( 
 #    level=logging.DEBUG, 
 #    format="%(asctime)s [%(levelname)s] %(message)s", 
 #    handlers=[ 
 #        logging.StreamHandler() 
 #    ] 
 #) 
 ###### Uncomment the above block to provide additional logging capabilities ###### 
 with 
 open 
 ( 
 'entries.json' 
 , 
 'r' 
 ) 
 as 
 json_file 
 : 
 data 
 = 
 json 
 . 
 load 
 ( 
 json_file 
 ) 
 # Change this to your specific BigQuery project, dataset, table details 
 BigQueryStorageWriteAppend 
 . 
 append_rows_proto2 
 ( 
 "PROJECT_ID" 
 , 
 "DATASET_ID" 
 , 
 "TABLE_ID " 
 , 
 data 
 = 
 data 
 ) 
 

This code example depends on the compiled protocol module sample_data_pb2.py . To create the compiled module, execute the protoc --python_out=. sample_data.proto command, where protoc is the protocol buffer compiler. The sample_data.proto file defines the format of the messages used in the Python example. To install the protoc compiler, follow the instructions in Protocol Buffers - Google's data interchange format .

Here are the contents of the sample_data.proto file:

 message SampleData {
  required string name = 1;
  required int64 age = 2;
} 

This script consumes the entries.json file, which contains sample row data to be inserted into the BigQuery table:

 {"name": "Jim", "age": 35}
{"name": "Jane", "age": 27} 

Use multiplexing

You enable multiplexing at the stream writer level for default stream only. To enable multiplexing in Java, call the setEnableConnectionPool method when you construct a StreamWriter or JsonStreamWriter object.

After enabling the connection pool, the Java client library manages your connections in the background, scaling up connections if the existing connections are considered too busy. For automatic scaling up to be more effective, you should consider lowering the maxInflightRequests limit.

 // One possible way for constructing StreamWriter 
 StreamWriter 
 . 
 newBuilder 
 ( 
 streamName 
 ) 
  
 . 
 setWriterSchema 
 ( 
 protoSchema 
 ) 
  
 . 
 setEnableConnectionPool 
 ( 
 true 
 ) 
  
 . 
 setMaxInflightRequests 
 ( 
 100 
 ) 
  
 . 
 build 
 (); 
 // One possible way for constructing JsonStreamWriter 
 JsonStreamWriter 
 . 
 newBuilder 
 ( 
 tableName 
 , 
  
 bigqueryClient 
 ) 
  
 . 
 setEnableConnectionPool 
 ( 
 true 
 ) 
  
 . 
 setMaxInflightRequests 
 ( 
 100 
 ) 
  
 . 
 build 
 (); 

To enable multiplexing in Go, see Connection Sharing (Multiplexing) .

Use committed type for exactly-once semantics

If you need exactly-once write semantics, create a write stream in committed type. In committed type, records are available for query as soon as the client receives acknowledgement from the back end.

Committed type provides exactly-once delivery within a stream through the use of record offsets. By using record offsets, the application specifies the next append offset in each call to AppendRows . The write operation is only performed if the offset value matches the next append offset. For more information, see Manage stream offsets to achieve exactly-once semantics .

If you don't provide an offset, then records are appended to the current end of the stream. In that case, if an append request returns an error, retrying it could result in the record appearing more than once in the stream.

To use committed type, perform the following steps:

Java

  1. Call CreateWriteStream to create one or more streams in committed type.
  2. For each stream, call AppendRows in a loop to write batches of records.
  3. Call FinalizeWriteStream for each stream to release the stream. After you call this method, you cannot write any more rows to the stream. This step is optional in committed type, but helps to prevent exceeding the limit on active streams. For more information, see Limit the rate of stream creation .

Node.js

  1. Call createWriteStreamFullResponse to create one or more streams in committed type.
  2. For each stream, call appendRows in a loop to write batches of records.
  3. Call finalize for each stream to release the stream. After you call this method, you cannot write any more rows to the stream. This step is optional in committed type, but helps to prevent exceeding the limit on active streams. For more information, see Limit the rate of stream creation .

You cannot delete a stream explicitly. Streams follow the system-defined time to live (TTL):

  • A committed stream has a TTL of three days if there is no traffic on the stream.
  • A buffered stream by default has a TTL of seven days if there is no traffic on the stream.

The following code shows how to use committed type:

Java

To learn how to install and use the client library for BigQuery, see BigQuery client libraries . For more information, see the BigQuery Java API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutureCallback 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.api.gax.retrying. RetrySettings 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. AppendRowsResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. BigQueryWriteClient 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. CreateWriteStreamRequest 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. Exceptions 
. StorageException 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. FinalizeWriteStreamResponse 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. JsonStreamWriter 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. TableName 
 
 ; 
 import 
  
 com.google.cloud.bigquery.storage.v1. WriteStream 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.protobuf.Descriptors.DescriptorValidationException 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.Phaser 
 ; 
 import 
  
 javax.annotation.concurrent.GuardedBy 
 ; 
 import 
  
 org.json.JSONArray 
 ; 
 import 
  
 org.json.JSONObject 
 ; 
 import 
  
 org.threeten.bp.Duration 
 ; 
 public 
  
 class 
 WriteCommittedStream 
  
 { 
  
 public 
  
 static 
  
 void 
  
 runWriteCommittedStream 
 () 
  
 throws 
  
 DescriptorValidationException 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "MY_PROJECT_ID" 
 ; 
  
 String 
  
 datasetName 
  
 = 
  
 "MY_DATASET_NAME" 
 ; 
  
 String 
  
 tableName 
  
 = 
  
 "MY_TABLE_NAME" 
 ; 
  
 writeCommittedStream 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 writeCommittedStream 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 datasetName 
 , 
  
 String 
  
 tableName 
 ) 
  
 throws 
  
 DescriptorValidationException 
 , 
  
 InterruptedException 
 , 
  
 IOException 
  
 { 
  
  BigQueryWriteClient 
 
  
 client 
  
 = 
  
  BigQueryWriteClient 
 
 . 
 create 
 (); 
  
  TableName 
 
  
 parentTable 
  
 = 
  
  TableName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 datasetName 
 , 
  
 tableName 
 ); 
  
 DataWriter 
  
 writer 
  
 = 
  
 new 
  
 DataWriter 
 (); 
  
 // One time initialization. 
  
 writer 
 . 
 initialize 
 ( 
 parentTable 
 , 
  
 client 
 ); 
  
 try 
  
 { 
  
 // Write two batches of fake data to the stream, each with 10 JSON records.  Data may be 
  
 // batched up to the maximum request size: 
  
 // https://cloud.google.com/bigquery/quotas#write-api-limits 
  
 long 
  
 offset 
  
 = 
  
 0 
 ; 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 2 
 ; 
  
 i 
 ++ 
 ) 
  
 { 
  
 // Create a JSON object that is compatible with the table schema. 
  
 JSONArray 
  
 jsonArr 
  
 = 
  
 new 
  
 JSONArray 
 (); 
  
 for 
  
 ( 
 int 
  
 j 
  
 = 
  
 0 
 ; 
  
 j 
 < 
 10 
 ; 
  
 j 
 ++ 
 ) 
  
 { 
  
 JSONObject 
  
 record 
  
 = 
  
 new 
  
 JSONObject 
 (); 
  
 record 
 . 
 put 
 ( 
 "col1" 
 , 
  
 String 
 . 
 format 
 ( 
 "batch-record %03d-%03d" 
 , 
  
 i 
 , 
  
 j 
 )); 
  
 jsonArr 
 . 
 put 
 ( 
 record 
 ); 
  
 } 
  
 writer 
 . 
 append 
 ( 
 jsonArr 
 , 
  
 offset 
 ); 
  
 offset 
  
 += 
  
 jsonArr 
 . 
 length 
 (); 
  
 } 
  
 } 
  
 catch 
  
 ( 
 ExecutionException 
  
 e 
 ) 
  
 { 
  
 // If the wrapped exception is a StatusRuntimeException, check the state of the operation. 
  
 // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: 
  
 // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Failed to append records. \n" 
  
 + 
  
 e 
 ); 
  
 } 
  
 // Final cleanup for the stream. 
  
 writer 
 . 
 cleanup 
 ( 
 client 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Appended records successfully." 
 ); 
  
 } 
  
 // A simple wrapper object showing how the stateful stream writer should be used. 
  
 private 
  
 static 
  
 class 
 DataWriter 
  
 { 
  
 private 
  
  JsonStreamWriter 
 
  
 streamWriter 
 ; 
  
 // Track the number of in-flight requests to wait for all responses before shutting down. 
  
 private 
  
 final 
  
 Phaser 
  
 inflightRequestCount 
  
 = 
  
 new 
  
 Phaser 
 ( 
 1 
 ); 
  
 private 
  
 final 
  
 Object 
  
 lock 
  
 = 
  
 new 
  
 Object 
 (); 
  
 @GuardedBy 
 ( 
 "lock" 
 ) 
  
 private 
  
 RuntimeException 
  
 error 
  
 = 
  
 null 
 ; 
  
 void 
  
 initialize 
 ( 
  TableName 
 
  
 parentTable 
 , 
  
  BigQueryWriteClient 
 
  
 client 
 ) 
  
 throws 
  
 IOException 
 , 
  
 DescriptorValidationException 
 , 
  
 InterruptedException 
  
 { 
  
 // Initialize a write stream for the specified table. 
  
 // For more information on WriteStream.Type, see: 
  
 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html 
  
  WriteStream 
 
  
 stream 
  
 = 
  
  WriteStream 
 
 . 
 newBuilder 
 (). 
 setType 
 ( 
  WriteStream 
 
 . 
 Type 
 . 
 COMMITTED 
 ). 
 build 
 (); 
  
  CreateWriteStreamRequest 
 
  
 createWriteStreamRequest 
  
 = 
  
  CreateWriteStreamRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setParent 
 ( 
 parentTable 
 . 
  toString 
 
 ()) 
  
 . 
 setWriteStream 
 ( 
 stream 
 ) 
  
 . 
 build 
 (); 
  
  WriteStream 
 
  
 writeStream 
  
 = 
  
 client 
 . 
  createWriteStream 
 
 ( 
  createWriteStream 
 
Request ); 
  
 // Configure in-stream automatic retry settings. 
  
 // Error codes that are immediately retried: 
  
 // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED 
  
 // Error codes that are retried with exponential backoff: 
  
 // * RESOURCE_EXHAUSTED 
  
  RetrySettings 
 
  
 retrySettings 
  
 = 
  
  RetrySettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setInitialRetryDelay 
 
 ( 
 Duration 
 . 
 ofMillis 
 ( 
 500 
 )) 
  
 . 
  setRetryDelayMultiplier 
 
 ( 
 1.1 
 ) 
  
 . 
  setMaxAttempts 
 
 ( 
 5 
 ) 
  
 . 
  setMaxRetryDelay 
 
 ( 
 Duration 
 . 
 ofMinutes 
 ( 
 1 
 )) 
  
 . 
 build 
 (); 
  
 // Use the JSON stream writer to send records in JSON format. 
  
 // For more information about JsonStreamWriter, see: 
  
 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html 
  
 streamWriter 
  
 = 
  
  JsonStreamWriter 
 
 . 
 newBuilder 
 ( 
 writeStream 
 . 
  getName 
 
 (), 
  
 writeStream 
 . 
  getTableSchema 
 
 (), 
  
 client 
 ) 
  
 . 
 setRetrySettings 
 ( 
 retrySettings 
 ) 
  
 . 
 build 
 (); 
  
 } 
  
 public 
  
 void 
  
 append 
 ( 
 JSONArray 
  
 data 
 , 
  
 long 
  
 offset 
 ) 
  
 throws 
  
 DescriptorValidationException 
 , 
  
 IOException 
 , 
  
 ExecutionException 
  
 { 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 // If earlier appends have failed, we need to reset before continuing. 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 // Append asynchronously for increased throughput. 
  
 ApiFuture<AppendRowsResponse> 
  
 future 
  
 = 
  
 streamWriter 
 . 
  append 
 
 ( 
 data 
 , 
  
 offset 
 ); 
  
  ApiFutures 
 
 . 
  addCallback 
 
 ( 
  
 future 
 , 
  
 new 
  
 DataWriter 
 . 
 AppendCompleteCallback 
 ( 
 this 
 ), 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 // Increase the count of in-flight requests. 
  
 inflightRequestCount 
 . 
 register 
 (); 
  
 } 
  
 public 
  
 void 
  
 cleanup 
 ( 
  BigQueryWriteClient 
 
  
 client 
 ) 
  
 { 
  
 // Wait for all in-flight requests to complete. 
  
 inflightRequestCount 
 . 
 arriveAndAwaitAdvance 
 (); 
  
 // Close the connection to the server. 
  
 streamWriter 
 . 
  close 
 
 (); 
  
 // Verify that no error occurred in the stream. 
  
 synchronized 
  
 ( 
 this 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 error 
  
 != 
  
 null 
 ) 
  
 { 
  
 throw 
  
 this 
 . 
 error 
 ; 
  
 } 
  
 } 
  
 // Finalize the stream. 
  
  FinalizeWriteStreamResponse 
 
  
 finalizeResponse 
  
 = 
  
 client 
 . 
  finalizeWriteStream 
 
 ( 
 streamWriter 
 . 
 getStreamName 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Rows written: " 
  
 + 
  
 finalizeResponse 
 . 
  getRowCount 
 
 ()); 
  
 } 
  
 public 
  
 String 
  
 getStreamName 
 () 
  
 { 
  
 return 
  
 streamWriter 
 . 
  getStreamName 
 
 (); 
  
 } 
  
 static 
  
 class 
 AppendCompleteCallback 
  
 implements 
  
 ApiFutureCallback<AppendRowsResponse> 
  
 { 
  
 private 
  
 final 
  
 DataWriter 
  
 parent 
 ; 
  
 public 
  
 AppendCompleteCallback 
 ( 
 DataWriter 
  
 parent 
 ) 
  
 { 
  
 this 
 . 
 parent 
  
 = 
  
 parent 
 ; 
  
 } 
  
 public 
  
 void 
  
 onSuccess 
 ( 
  AppendRowsResponse 
 
  
 response 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 format 
 ( 
 "Append %d success\n" 
 , 
  
 response 
 . 
  getAppendResult 
 
 (). 
 getOffset 
 (). 
 getValue 
 ()); 
  
 done 
 (); 
  
 } 
  
 public 
  
 void 
  
 onFailure 
 ( 
 Throwable 
  
 throwable 
 ) 
  
 { 
  
 synchronized 
  
 ( 
 this 
 . 
 parent 
 . 
 lock 
 ) 
  
 { 
  
 if 
  
 ( 
 this 
 . 
 parent 
 . 
 error 
  
 == 
  
 null 
 ) 
  
 { 
  
  StorageException 
 
  
 storageException 
  
 = 
  
  Exceptions 
 
 . 
  toStorageException 
 
 ( 
 throwable 
 ); 
  
 this 
 . 
 parent 
 . 
 error 
  
 = 
  
 ( 
 storageException 
  
 != 
  
 null 
 ) 
  
 ? 
  
 storageException 
  
 : 
  
 new 
  
 RuntimeException 
 ( 
 throwable 
 ); 
  
 } 
  
 } 
  
 System 
 . 
 out 
 . 
 format 
 ( 
 "Error: %s\n" 
 , 
  
 throwable 
 . 
 toString 
 ()); 
  
 done 
 (); 
  
 } 
  
 private 
  
 void 
  
 done 
 () 
  
 { 
  
 // Reduce the count of in-flight requests. 
  
 this 
 . 
 parent 
 . 
 inflightRequestCount 
 . 
 arriveAndDeregister 
 (); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

To learn how to install and use the client library for BigQuery, see BigQuery client libraries .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  const 
  
 { 
 adapt 
 , 
  
 managedwriter 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/bigquery-storage 
' 
 ); 
 const 
  
 { 
 WriterClient 
 , 
  
 JSONWriter 
 } 
  
 = 
  
 managedwriter 
 ; 
 async 
  
 function 
  
 appendJSONRowsCommittedStream 
 () 
  
 { 
  
 /** 
 * TODO(developer): Uncomment the following lines before running the sample. 
 */ 
  
 // projectId = 'my_project'; 
  
 // datasetId = 'my_dataset'; 
  
 // tableId = 'my_table'; 
  
 const 
  
 destinationTable 
  
 = 
  
 `projects/ 
 ${ 
 projectId 
 } 
 /datasets/ 
 ${ 
 datasetId 
 } 
 /tables/ 
 ${ 
 tableId 
 } 
 ` 
 ; 
  
 const 
  
 streamType 
  
 = 
  
 managedwriter 
 . 
  CommittedStream 
 
 ; 
  
 const 
  
 writeClient 
  
 = 
  
 new 
  
  WriterClient 
 
 ({ 
 projectId 
 }); 
  
 try 
  
 { 
  
 const 
  
 writeStream 
  
 = 
  
 await 
  
 writeClient 
 . 
  createWriteStreamFullResponse 
 
 ({ 
  
 streamType 
 , 
  
 destinationTable 
 , 
  
 }); 
  
 const 
  
 streamId 
  
 = 
  
 writeStream 
 . 
 name 
 ; 
  
 console 
 . 
 log 
 ( 
 `Stream created: 
 ${ 
 streamId 
 } 
 ` 
 ); 
  
 const 
  
 protoDescriptor 
  
 = 
  
 adapt 
 . 
  convertStorageSchemaToProto2Descriptor 
 
 ( 
  
 writeStream 
 . 
 tableSchema 
 , 
  
 'root' 
 , 
  
 ); 
  
 const 
  
 connection 
  
 = 
  
 await 
  
 writeClient 
 . 
  createStreamConnection 
 
 ({ 
  
 streamId 
 , 
  
 }); 
  
 const 
  
 writer 
  
 = 
  
 new 
  
  JSONWriter 
 
 ({ 
  
 streamId 
 , 
  
 connection 
 , 
  
 protoDescriptor 
 , 
  
 }); 
  
 let 
  
 rows 
  
 = 
  
 []; 
  
 const 
  
 pendingWrites 
  
 = 
  
 []; 
  
 // Row 1 
  
 let 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 1 
 , 
  
 customer_name 
 : 
  
 'Octavia' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Row 2 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 2 
 , 
  
 customer_name 
 : 
  
 'Turing' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Send batch. 
  
 let 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 rows 
  
 = 
  
 []; 
  
 // Row 3 
  
 row 
  
 = 
  
 { 
  
 row_num 
 : 
  
 3 
 , 
  
 customer_name 
 : 
  
 'Bell' 
 , 
  
 }; 
  
 rows 
 . 
 push 
 ( 
 row 
 ); 
  
 // Send batch. 
  
 pw 
  
 = 
  
 writer 
 . 
 appendRows 
 ( 
 rows 
 ); 
  
 pendingWrites 
 . 
 push 
 ( 
 pw 
 ); 
  
 const 
  
 results 
  
 = 
  
 await 
  
 Promise 
 . 
 all 
 ( 
  
 pendingWrites 
 . 
 map 
 ( 
 pw 
  
 = 
>  
 pw 
 . 
 getResult 
 ()), 
  
 ); 
  
 console 
 . 
 log 
 ( 
 'Write results:' 
 , 
  
 results 
 ); 
  
 const 
  
 { 
 rowCount 
 } 
  
 = 
  
 await 
  
 connection 
 . 
 finalize 
 (); 
  
 console 
 . 
 log 
 ( 
 `Row count: 
 ${ 
 rowCount 
 } 
 ` 
 ); 
  
 } 
  
 catch 
  
 ( 
 err 
 ) 
  
 { 
  
 console 
 . 
 log 
 ( 
 err 
 ); 
  
 } 
  
 finally 
  
 { 
  
 writeClient 
 . 
 close 
 (); 
  
 } 
 } 
 

Use the Apache Arrow format to ingest data

The following code shows how to ingest data using the Apache Arrow format. For a more detailed, end-to-end example, see the PyArrow example on GitHub .

Python

This example shows how to ingest a serialized PyArrow table using the default stream.

  from 
  
 google.cloud.bigquery_storage_v1 
  
 import 
  types 
 
 as 
 gapic_types 
 from 
  
 google.cloud.bigquery_storage_v1.writer 
  
 import 
  AppendRowsStream 
 
 from 
  
 google.cloud 
  
 import 
  bigquery_storage_v1 
 
 def 
  
 append_rows_with_pyarrow 
 ( 
 pyarrow_table 
 : 
 pyarrow 
 . 
 Table 
 , 
 project_id 
 : 
 str 
 , 
 dataset_id 
 : 
 str 
 , 
 table_id 
 : 
 str 
 , 
 ): 
 bqstorage_write_client 
 = 
  bigquery_storage_v1 
 
 . 
 BigQueryWriteClient 
 () 
 # Create request_template. 
 request_template 
 = 
 gapic_types 
 . 
  AppendRowsRequest 
 
 () 
 request_template 
 . 
 write_stream 
 = 
 ( 
 f 
 "projects/ 
 { 
 project_id 
 } 
 /datasets/ 
 { 
 dataset_id 
 } 
 /tables/ 
 { 
 table_id 
 } 
 /_default" 
 ) 
 arrow_data 
 = 
 gapic_types 
 . 
  AppendRowsRequest 
 
 . 
  ArrowData 
 
 () 
 arrow_data 
 . 
 writer_schema 
 . 
 serialized_schema 
 = 
 ( 
 pyarrow_table 
 . 
 schema 
 . 
 serialize 
 () 
 . 
 to_pybytes 
 () 
 ) 
 request_template 
 . 
 arrow_rows 
 = 
 arrow_data 
 # Create AppendRowsStream. 
 append_rows_stream 
 = 
 AppendRowsStream 
 ( 
 bqstorage_write_client 
 , 
 request_template 
 , 
 ) 
 # Create request with table data. 
 request 
 = 
 gapic_types 
 . 
  AppendRowsRequest 
 
 () 
 request 
 . 
 arrow_rows 
 . 
  rows 
 
 . 
 serialized_record_batch 
 = 
 ( 
 pyarrow_table 
 . 
 to_batches 
 ()[ 
 0 
 ] 
 . 
 serialize 
 () 
 . 
 to_pybytes 
 () 
 ) 
 # Send request. 
 future 
 = 
 append_rows_stream 
 . 
  send 
 
 ( 
 request 
 ) 
 # Wait for result. 
 future 
 . 
  result 
 
 () 
 
Create a Mobile Website
View Site in Mobile | Classic
Share by: