Stream data using the Storage Write API
This document describes how to use the BigQuery Storage Write API to stream data into BigQuery.
In streaming scenarios, data arrives continuously and should be available for reads with minimal latency. When using the BigQuery Storage Write API for streaming workloads, consider what guarantees you need:
- If your application only needs at-least-once semantics, then use the default stream.
- If you need exactly-once semantics, then create one or more streams in committed typeand use stream offsets to guarantee exactly-once writes.
In committed type, data written to the stream is available for query as soon as the server acknowledges the write request. The default stream also uses committed type, but does not provide exactly-once guarantees.
Use the default stream for at-least-once semantics
If your application can accept the possibility of duplicate records appearing in the destination table, then we recommend using the default stream for streaming scenarios.
The following code shows how to write data to the default stream:
Java
To learn how to install and use the client library for BigQuery, see BigQuery client libraries . For more information, see the BigQuery Java API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
import
com.google.api.core. ApiFuture
;
import
com.google.api.core. ApiFutureCallback
;
import
com.google.api.core. ApiFutures
;
import
com.google.api.gax.batching. FlowControlSettings
;
import
com.google.api.gax.core. FixedExecutorProvider
;
import
com.google.api.gax.retrying. RetrySettings
;
import
com.google.cloud.bigquery. BigQuery
;
import
com.google.cloud.bigquery. BigQueryOptions
;
import
com.google.cloud.bigquery. QueryJobConfiguration
;
import
com.google.cloud.bigquery. TableResult
;
import
com.google.cloud.bigquery.storage.v1. AppendRowsRequest
;
import
com.google.cloud.bigquery.storage.v1. AppendRowsResponse
;
import
com.google.cloud.bigquery.storage.v1. BigQueryWriteClient
;
import
com.google.cloud.bigquery.storage.v1. BigQueryWriteSettings
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.AppendSerializationError
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.MaximumRequestCallbackWaitTimeExceededException
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.StorageException
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.StreamWriterClosedException
;
import
com.google.cloud.bigquery.storage.v1. JsonStreamWriter
;
import
com.google.cloud.bigquery.storage.v1. TableName
;
import
com.google.common.util.concurrent.MoreExecutors
;
import
com.google.protobuf. ByteString
;
import
com.google.protobuf. Descriptors
. DescriptorValidationException
;
import
java.io.IOException
;
import
java.util.Map
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Phaser
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
javax.annotation.concurrent.GuardedBy
;
import
org.json.JSONArray
;
import
org.json.JSONObject
;
import
org.threeten.bp. Duration
;
public
class
WriteToDefaultStream
{
public
static
void
runWriteToDefaultStream
()
throws
DescriptorValidationException
,
InterruptedException
,
IOException
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"MY_PROJECT_ID"
;
String
datasetName
=
"MY_DATASET_NAME"
;
String
tableName
=
"MY_TABLE_NAME"
;
writeToDefaultStream
(
projectId
,
datasetName
,
tableName
);
}
private
static
ByteString
buildByteString
()
{
byte
[]
bytes
=
new
byte
[]
{
1
,
2
,
3
,
4
,
5
};
return
ByteString
.
copyFrom
(
bytes
);
}
// Create a JSON object that is compatible with the table schema.
private
static
JSONObject
buildRecord
(
int
i
,
int
j
)
{
JSONObject
record
=
new
JSONObject
();
StringBuilder
sbSuffix
=
new
StringBuilder
();
for
(
int
k
=
0
;
k
<
j
;
k
++
)
{
sbSuffix
.
append
(
k
);
}
record
.
put
(
"test_string"
,
String
.
format
(
"record %03d-%03d %s"
,
i
,
j
,
sbSuffix
.
toString
()));
ByteString
byteString
=
buildByteString
();
record
.
put
(
"test_bytes"
,
byteString
);
record
.
put
(
"test_geo"
,
"POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))"
);
return
record
;
}
public
static
void
writeToDefaultStream
(
String
projectId
,
String
datasetName
,
String
tableName
)
throws
DescriptorValidationException
,
InterruptedException
,
IOException
{
TableName
parentTable
=
TableName
.
of
(
projectId
,
datasetName
,
tableName
);
DataWriter
writer
=
new
DataWriter
();
// One time initialization for the worker.
writer
.
initialize
(
parentTable
);
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
JSONArray
jsonArr
=
new
JSONArray
();
for
(
int
j
=
0
;
j
<
10
;
j
++
)
{
JSONObject
record
=
buildRecord
(
i
,
j
);
jsonArr
.
put
(
record
);
}
writer
.
append
(
new
AppendContext
(
jsonArr
));
}
// Final cleanup for the stream during worker teardown.
writer
.
cleanup
();
verifyExpectedRowCount
(
parentTable
,
12
);
System
.
out
.
println
(
"Appended records successfully."
);
}
private
static
void
verifyExpectedRowCount
(
TableName
parentTable
,
int
expectedRowCount
)
throws
InterruptedException
{
String
queryRowCount
=
"SELECT COUNT(*) FROM `"
+
parentTable
.
getProject
()
+
"."
+
parentTable
.
getDataset
()
+
"."
+
parentTable
.
getTable
()
+
"`"
;
QueryJobConfiguration
queryConfig
=
QueryJobConfiguration
.
newBuilder
(
queryRowCount
).
build
();
BigQuery
bigquery
=
BigQueryOptions
.
getDefaultInstance
().
getService
();
TableResult
results
=
bigquery
.
query
(
queryConfig
);
int
countRowsActual
=
Integer
.
parseInt
(
results
.
getValues
().
iterator
().
next
().
get
(
"f0_"
).
getStringValue
());
if
(
countRowsActual
!=
expectedRowCount
)
{
throw
new
RuntimeException
(
"Unexpected row count. Expected: "
+
expectedRowCount
+
". Actual: "
+
countRowsActual
);
}
}
private
static
class
AppendContext
{
JSONArray
data
;
AppendContext
(
JSONArray
data
)
{
this
.
data
=
data
;
}
}
private
static
class
DataWriter
{
private
static
final
int
MAX_RECREATE_COUNT
=
3
;
private
BigQueryWriteClient
client
;
// Track the number of in-flight requests to wait for all responses before shutting down.
private
final
Phaser
inflightRequestCount
=
new
Phaser
(
1
);
private
final
Object
lock
=
new
Object
();
private
JsonStreamWriter
streamWriter
;
@GuardedBy
(
"lock"
)
private
RuntimeException
error
=
null
;
private
AtomicInteger
recreateCount
=
new
AtomicInteger
(
0
);
private
JsonStreamWriter
createStreamWriter
(
String
tableName
)
throws
DescriptorValidationException
,
IOException
,
InterruptedException
{
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings
retrySettings
=
RetrySettings
.
newBuilder
()
.
setInitialRetryDelay
(
Duration
.
ofMillis
(
500
))
.
setRetryDelayMultiplier
(
1.1
)
.
setMaxAttempts
(
5
)
.
setMaxRetryDelay
(
Duration
.
ofMinutes
(
1
))
.
build
();
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return
JsonStreamWriter
.
newBuilder
(
tableName
,
client
)
.
setExecutorProvider
(
FixedExecutorProvider
.
create
(
Executors
.
newScheduledThreadPool
(
100
)))
.
setChannelProvider
(
BigQueryWriteSettings
.
defaultGrpcTransportProviderBuilder
()
.
setKeepAliveTime
(
org
.
threeten
.
bp
.
Duration
.
ofMinutes
(
1
))
.
setKeepAliveTimeout
(
org
.
threeten
.
bp
.
Duration
.
ofMinutes
(
1
))
.
setKeepAliveWithoutCalls
(
true
)
.
setChannelsPerCpu
(
2
)
.
build
())
.
setEnableConnectionPool
(
true
)
// This will allow connection pool to scale up better.
.
setFlowControlSettings
(
FlowControlSettings
.
newBuilder
().
setMaxOutstandingElementCount
(
100L
).
build
())
// If value is missing in json and there is a default value configured on bigquery
// column, apply the default value to the missing value field.
.
setDefaultMissingValueInterpretation
(
AppendRowsRequest
.
MissingValueInterpretation
.
DEFAULT_VALUE
)
.
setRetrySettings
(
retrySettings
)
.
build
();
}
public
void
initialize
(
TableName
parentTable
)
throws
DescriptorValidationException
,
IOException
,
InterruptedException
{
// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client
=
BigQueryWriteClient
.
create
();
streamWriter
=
createStreamWriter
(
parentTable
.
toString
());
}
public
void
append
(
AppendContext
appendContext
)
throws
DescriptorValidationException
,
IOException
,
InterruptedException
{
synchronized
(
this
.
lock
)
{
if
(
!
streamWriter
.
isUserClosed
()
&&
streamWriter
.
isClosed
()
&&
recreateCount
.
getAndIncrement
()
<
MAX_RECREATE_COUNT
)
{
streamWriter
=
createStreamWriter
(
streamWriter
.
getStreamName
());
this
.
error
=
null
;
}
// If earlier appends have failed, we need to reset before continuing.
if
(
this
.
error
!=
null
)
{
throw
this
.
error
;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse>
future
=
streamWriter
.
append
(
append
Context .
data
);
ApiFutures
.
addCallback
(
future
,
new
AppendCompleteCallback
(
this
,
appendContext
),
MoreExecutors
.
directExecutor
());
// Increase the count of in-flight requests.
inflightRequestCount
.
register
();
}
public
void
cleanup
()
{
// Wait for all in-flight requests to complete.
inflightRequestCount
.
arriveAndAwaitAdvance
();
client
.
close
();
// Close the connection to the server.
streamWriter
.
close
();
// Verify that no error occurred in the stream.
synchronized
(
this
.
lock
)
{
if
(
this
.
error
!=
null
)
{
throw
this
.
error
;
}
}
}
static
class
AppendCompleteCallback
implements
ApiFutureCallback<AppendRowsResponse>
{
private
final
DataWriter
parent
;
private
final
AppendContext
appendContext
;
public
AppendCompleteCallback
(
DataWriter
parent
,
AppendContext
appendContext
)
{
this
.
parent
=
parent
;
this
.
appendContext
=
appendContext
;
}
public
void
onSuccess
(
AppendRowsResponse
response
)
{
System
.
out
.
format
(
"Append success\n"
);
this
.
parent
.
recreateCount
.
set
(
0
);
done
();
}
public
void
onFailure
(
Throwable
throwable
)
{
if
(
throwable
instanceof
AppendSerializationError
)
{
AppendSerializationError
ase
=
(
AppendSerializationError
)
throwable
;
Map<Integer
,
String
>
rowIndexToErrorMessage
=
ase
.
getRowIndexToErrorMessage
();
if
(
rowIndexToErrorMessage
.
size
()
>
0
)
{
// Omit the faulty rows
JSONArray
dataNew
=
new
JSONArray
();
for
(
int
i
=
0
;
i
<
appendContext
.
data
.
length
();
i
++
)
{
if
(
!
rowIndexToErrorMessage
.
containsKey
(
i
))
{
dataNew
.
put
(
appendContext
.
data
.
get
(
i
));
}
else
{
// process faulty rows by placing them on a dead-letter-queue, for instance
}
}
// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if
(
dataNew
.
length
()
>
0
)
{
try
{
this
.
parent
.
append
(
new
AppendContext
(
dataNew
));
}
catch
(
DescriptorValidationException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
}
// Mark the existing attempt as done since we got a response for it
done
();
return
;
}
}
boolean
resendRequest
=
false
;
if
(
throwable
instanceof
MaximumRequestCallbackWaitTimeExceededException
)
{
resendRequest
=
true
;
}
else
if
(
throwable
instanceof
StreamWriterClosedException
)
{
if
(
!
parent
.
streamWriter
.
isUserClosed
())
{
resendRequest
=
true
;
}
}
if
(
resendRequest
)
{
// Retry this request.
try
{
this
.
parent
.
append
(
new
AppendContext
(
appendContext
.
data
));
}
catch
(
DescriptorValidationException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
// Mark the existing attempt as done since we got a response for it
done
();
return
;
}
synchronized
(
this
.
parent
.
lock
)
{
if
(
this
.
parent
.
error
==
null
)
{
StorageException
storageException
=
Exceptions
.
toStorageException
(
throwable
);
this
.
parent
.
error
=
(
storageException
!=
null
)
?
storageException
:
new
RuntimeException
(
throwable
);
}
}
done
();
}
private
void
done
()
{
// Reduce the count of in-flight requests.
this
.
parent
.
inflightRequestCount
.
arriveAndDeregister
();
}
}
}
}