Use the JSON stream writer to append records using default client.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Java
Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Java API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
import
com.google.api.core. ApiFuture
;
import
com.google.api.core. ApiFutureCallback
;
import
com.google.api.core. ApiFutures
;
import
com.google.api.gax.batching. FlowControlSettings
;
import
com.google.api.gax.core. FixedExecutorProvider
;
import
com.google.api.gax.retrying. RetrySettings
;
import
com.google.cloud.bigquery. BigQuery
;
import
com.google.cloud.bigquery. BigQueryOptions
;
import
com.google.cloud.bigquery. QueryJobConfiguration
;
import
com.google.cloud.bigquery. TableResult
;
import
com.google.cloud.bigquery.storage.v1. AppendRowsRequest
;
import
com.google.cloud.bigquery.storage.v1. AppendRowsResponse
;
import
com.google.cloud.bigquery.storage.v1. BigQueryWriteClient
;
import
com.google.cloud.bigquery.storage.v1. BigQueryWriteSettings
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.AppendSerializationError
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.MaximumRequestCallbackWaitTimeExceededException
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.StorageException
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
.StreamWriterClosedException
;
import
com.google.cloud.bigquery.storage.v1. JsonStreamWriter
;
import
com.google.cloud.bigquery.storage.v1. TableName
;
import
com.google.common.util.concurrent.MoreExecutors
;
import
com.google.protobuf. ByteString
;
import
com.google.protobuf. Descriptors
. DescriptorValidationException
;
import
java.io.IOException
;
import
java.util.Map
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Phaser
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
javax.annotation.concurrent.GuardedBy
;
import
org.json.JSONArray
;
import
org.json.JSONObject
;
import
org.threeten.bp. Duration
;
public
class
WriteToDefaultStream
{
public
static
void
runWriteToDefaultStream
()
throws
DescriptorValidationException
,
InterruptedException
,
IOException
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"MY_PROJECT_ID"
;
String
datasetName
=
"MY_DATASET_NAME"
;
String
tableName
=
"MY_TABLE_NAME"
;
writeToDefaultStream
(
projectId
,
datasetName
,
tableName
);
}
private
static
ByteString
buildByteString
()
{
byte
[]
bytes
=
new
byte
[]
{
1
,
2
,
3
,
4
,
5
};
return
ByteString
.
copyFrom
(
bytes
);
}
// Create a JSON object that is compatible with the table schema.
private
static
JSONObject
buildRecord
(
int
i
,
int
j
)
{
JSONObject
record
=
new
JSONObject
();
StringBuilder
sbSuffix
=
new
StringBuilder
();
for
(
int
k
=
0
;
k
<
j
;
k
++
)
{
sbSuffix
.
append
(
k
);
}
record
.
put
(
"test_string"
,
String
.
format
(
"record %03d-%03d %s"
,
i
,
j
,
sbSuffix
.
toString
()));
ByteString
byteString
=
buildByteString
();
record
.
put
(
"test_bytes"
,
byteString
);
record
.
put
(
"test_geo"
,
"POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))"
);
return
record
;
}
public
static
void
writeToDefaultStream
(
String
projectId
,
String
datasetName
,
String
tableName
)
throws
DescriptorValidationException
,
InterruptedException
,
IOException
{
TableName
parentTable
=
TableName
.
of
(
projectId
,
datasetName
,
tableName
);
DataWriter
writer
=
new
DataWriter
();
// One time initialization for the worker.
writer
.
initialize
(
parentTable
);
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
JSONArray
jsonArr
=
new
JSONArray
();
for
(
int
j
=
0
;
j
<
10
;
j
++
)
{
JSONObject
record
=
buildRecord
(
i
,
j
);
jsonArr
.
put
(
record
);
}
writer
.
append
(
new
AppendContext
(
jsonArr
));
}
// Final cleanup for the stream during worker teardown.
writer
.
cleanup
();
verifyExpectedRowCount
(
parentTable
,
12
);
System
.
out
.
println
(
"Appended records successfully."
);
}
private
static
void
verifyExpectedRowCount
(
TableName
parentTable
,
int
expectedRowCount
)
throws
InterruptedException
{
String
queryRowCount
=
"SELECT COUNT(*) FROM `"
+
parentTable
.
getProject
()
+
"."
+
parentTable
.
getDataset
()
+
"."
+
parentTable
.
getTable
()
+
"`"
;
QueryJobConfiguration
queryConfig
=
QueryJobConfiguration
.
newBuilder
(
queryRowCount
).
build
();
BigQuery
bigquery
=
BigQueryOptions
.
getDefaultInstance
().
getService
();
TableResult
results
=
bigquery
.
query
(
queryConfig
);
int
countRowsActual
=
Integer
.
parseInt
(
results
.
getValues
().
iterator
().
next
().
get
(
"f0_"
).
getStringValue
());
if
(
countRowsActual
!=
expectedRowCount
)
{
throw
new
RuntimeException
(
"Unexpected row count. Expected: "
+
expectedRowCount
+
". Actual: "
+
countRowsActual
);
}
}
private
static
class
AppendContext
{
JSONArray
data
;
AppendContext
(
JSONArray
data
)
{
this
.
data
=
data
;
}
}
private
static
class
DataWriter
{
private
static
final
int
MAX_RECREATE_COUNT
=
3
;
private
BigQueryWriteClient
client
;
// Track the number of in-flight requests to wait for all responses before shutting down.
private
final
Phaser
inflightRequestCount
=
new
Phaser
(
1
);
private
final
Object
lock
=
new
Object
();
private
JsonStreamWriter
streamWriter
;
@GuardedBy
(
"lock"
)
private
RuntimeException
error
=
null
;
private
AtomicInteger
recreateCount
=
new
AtomicInteger
(
0
);
private
JsonStreamWriter
createStreamWriter
(
String
tableName
)
throws
DescriptorValidationException
,
IOException
,
InterruptedException
{
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings
retrySettings
=
RetrySettings
.
newBuilder
()
.
setInitialRetryDelay
(
Duration
.
ofMillis
(
500
))
.
setRetryDelayMultiplier
(
1.1
)
.
setMaxAttempts
(
5
)
.
setMaxRetryDelay
(
Duration
.
ofMinutes
(
1
))
.
build
();
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return
JsonStreamWriter
.
newBuilder
(
tableName
,
client
)
.
setExecutorProvider
(
FixedExecutorProvider
.
create
(
Executors
.
newScheduledThreadPool
(
100
)))
.
setChannelProvider
(
BigQueryWriteSettings
.
defaultGrpcTransportProviderBuilder
()
.
setKeepAliveTime
(
org
.
threeten
.
bp
.
Duration
.
ofMinutes
(
1
))
.
setKeepAliveTimeout
(
org
.
threeten
.
bp
.
Duration
.
ofMinutes
(
1
))
.
setKeepAliveWithoutCalls
(
true
)
.
setChannelsPerCpu
(
2
)
.
build
())
.
setEnableConnectionPool
(
true
)
// This will allow connection pool to scale up better.
.
setFlowControlSettings
(
FlowControlSettings
.
newBuilder
().
setMaxOutstandingElementCount
(
100L
).
build
())
// If value is missing in json and there is a default value configured on bigquery
// column, apply the default value to the missing value field.
.
setDefaultMissingValueInterpretation
(
AppendRowsRequest
.
MissingValueInterpretation
.
DEFAULT_VALUE
)
.
setRetrySettings
(
retrySettings
)
.
build
();
}
public
void
initialize
(
TableName
parentTable
)
throws
DescriptorValidationException
,
IOException
,
InterruptedException
{
// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client
=
BigQueryWriteClient
.
create
();
streamWriter
=
createStreamWriter
(
parentTable
.
toString
());
}
public
void
append
(
AppendContext
appendContext
)
throws
DescriptorValidationException
,
IOException
,
InterruptedException
{
synchronized
(
this
.
lock
)
{
if
(
!
streamWriter
.
isUserClosed
()
&&
streamWriter
.
isClosed
()
&&
recreateCount
.
getAndIncrement
()
<
MAX_RECREATE_COUNT
)
{
streamWriter
=
createStreamWriter
(
streamWriter
.
getStreamName
());
this
.
error
=
null
;
}
// If earlier appends have failed, we need to reset before continuing.
if
(
this
.
error
!=
null
)
{
throw
this
.
error
;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse>
future
=
streamWriter
.
append
(
append
Context .
data
);
ApiFutures
.
addCallback
(
future
,
new
AppendCompleteCallback
(
this
,
appendContext
),
MoreExecutors
.
directExecutor
());
// Increase the count of in-flight requests.
inflightRequestCount
.
register
();
}
public
void
cleanup
()
{
// Wait for all in-flight requests to complete.
inflightRequestCount
.
arriveAndAwaitAdvance
();
client
.
close
();
// Close the connection to the server.
streamWriter
.
close
();
// Verify that no error occurred in the stream.
synchronized
(
this
.
lock
)
{
if
(
this
.
error
!=
null
)
{
throw
this
.
error
;
}
}
}
static
class
AppendCompleteCallback
implements
ApiFutureCallback<AppendRowsResponse>
{
private
final
DataWriter
parent
;
private
final
AppendContext
appendContext
;
public
AppendCompleteCallback
(
DataWriter
parent
,
AppendContext
appendContext
)
{
this
.
parent
=
parent
;
this
.
appendContext
=
appendContext
;
}
public
void
onSuccess
(
AppendRowsResponse
response
)
{
System
.
out
.
format
(
"Append success\n"
);
this
.
parent
.
recreateCount
.
set
(
0
);
done
();
}
public
void
onFailure
(
Throwable
throwable
)
{
if
(
throwable
instanceof
AppendSerializationError
)
{
AppendSerializationError
ase
=
(
AppendSerializationError
)
throwable
;
Map<Integer
,
String
>
rowIndexToErrorMessage
=
ase
.
getRowIndexToErrorMessage
();
if
(
rowIndexToErrorMessage
.
size
()
>
0
)
{
// Omit the faulty rows
JSONArray
dataNew
=
new
JSONArray
();
for
(
int
i
=
0
;
i
<
appendContext
.
data
.
length
();
i
++
)
{
if
(
!
rowIndexToErrorMessage
.
containsKey
(
i
))
{
dataNew
.
put
(
appendContext
.
data
.
get
(
i
));
}
else
{
// process faulty rows by placing them on a dead-letter-queue, for instance
}
}
// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if
(
dataNew
.
length
()
>
0
)
{
try
{
this
.
parent
.
append
(
new
AppendContext
(
dataNew
));
}
catch
(
DescriptorValidationException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
}
// Mark the existing attempt as done since we got a response for it
done
();
return
;
}
}
boolean
resendRequest
=
false
;
if
(
throwable
instanceof
MaximumRequestCallbackWaitTimeExceededException
)
{
resendRequest
=
true
;
}
else
if
(
throwable
instanceof
StreamWriterClosedException
)
{
if
(
!
parent
.
streamWriter
.
isUserClosed
())
{
resendRequest
=
true
;
}
}
if
(
resendRequest
)
{
// Retry this request.
try
{
this
.
parent
.
append
(
new
AppendContext
(
appendContext
.
data
));
}
catch
(
DescriptorValidationException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
// Mark the existing attempt as done since we got a response for it
done
();
return
;
}
synchronized
(
this
.
parent
.
lock
)
{
if
(
this
.
parent
.
error
==
null
)
{
StorageException
storageException
=
Exceptions
.
toStorageException
(
throwable
);
this
.
parent
.
error
=
(
storageException
!=
null
)
?
storageException
:
new
RuntimeException
(
throwable
);
}
}
done
();
}
private
void
done
()
{
// Reduce the count of in-flight requests.
this
.
parent
.
inflightRequestCount
.
arriveAndDeregister
();
}
}
}
}
Node.js
Before trying this sample, follow the Node.js setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Node.js API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
const
{
adapt
,
managedwriter
}
=
require
(
' @google-cloud/bigquery-storage
'
);
const
{
WriterClient
,
JSONWriter
}
=
managedwriter
;
async
function
appendJSONRowsDefaultStream
()
{
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const
destinationTable
=
`projects/
${
projectId
}
/datasets/
${
datasetId
}
/tables/
${
tableId
}
`
;
const
writeClient
=
new
WriterClient
({
projectId
});
try
{
const
writeStream
=
await
writeClient
.
getWriteStream
({
streamId
:
`
${
destinationTable
}
/streams/_default`
,
view
:
' FULL
'
,
});
const
protoDescriptor
=
adapt
.
convertStorageSchemaToProto2Descriptor
(
writeStream
.
tableSchema
,
'root'
,
);
const
connection
=
await
writeClient
.
createStreamConnection
({
streamId
:
managedwriter
.
DefaultStream
,
destinationTable
,
});
const
streamId
=
connection
.
getStreamId
();
const
writer
=
new
JSONWriter
({
streamId
,
connection
,
protoDescriptor
,
});
let
rows
=
[];
const
pendingWrites
=
[];
// Row 1
let
row
=
{
row_num
:
1
,
customer_name
:
'Octavia'
,
};
rows
.
push
(
row
);
// Row 2
row
=
{
row_num
:
2
,
customer_name
:
'Turing'
,
};
rows
.
push
(
row
);
// Send batch.
let
pw
=
writer
.
appendRows
(
rows
);
pendingWrites
.
push
(
pw
);
rows
=
[];
// Row 3
row
=
{
row_num
:
3
,
customer_name
:
'Bell'
,
};
rows
.
push
(
row
);
// Send batch.
pw
=
writer
.
appendRows
(
rows
);
pendingWrites
.
push
(
pw
);
const
results
=
await
Promise
.
all
(
pendingWrites
.
map
(
pw
=
>
pw
.
getResult
()),
);
console
.
log
(
'Write results:'
,
results
);
}
catch
(
err
)
{
console
.
log
(
err
);
}
finally
{
writeClient
.
close
();
}
}
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .