Use the JSON stream writer to append pending records.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Java
Before trying this sample, follow the Java setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Java API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
import
com.google.api.core. ApiFuture
;
import
com.google.api.core. ApiFutureCallback
;
import
com.google.api.core. ApiFutures
;
import
com.google.api.gax.retrying. RetrySettings
;
import
com.google.cloud.bigquery.storage.v1. AppendRowsResponse
;
import
com.google.cloud.bigquery.storage.v1. BatchCommitWriteStreamsRequest
;
import
com.google.cloud.bigquery.storage.v1. BatchCommitWriteStreamsResponse
;
import
com.google.cloud.bigquery.storage.v1. BigQueryWriteClient
;
import
com.google.cloud.bigquery.storage.v1. CreateWriteStreamRequest
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
;
import
com.google.cloud.bigquery.storage.v1. Exceptions
. StorageException
;
import
com.google.cloud.bigquery.storage.v1. FinalizeWriteStreamResponse
;
import
com.google.cloud.bigquery.storage.v1. JsonStreamWriter
;
import
com.google.cloud.bigquery.storage.v1. StorageError
;
import
com.google.cloud.bigquery.storage.v1. TableName
;
import
com.google.cloud.bigquery.storage.v1. WriteStream
;
import
com.google.common.util.concurrent.MoreExecutors
;
import
com.google.protobuf.Descriptors.DescriptorValidationException
;
import
java.io.IOException
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.Phaser
;
import
javax.annotation.concurrent.GuardedBy
;
import
org.json.JSONArray
;
import
org.json.JSONObject
;
import
org.threeten.bp.Duration
;
public
class
WritePendingStream
{
public
static
void
runWritePendingStream
()
throws
DescriptorValidationException
,
InterruptedException
,
IOException
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"MY_PROJECT_ID"
;
String
datasetName
=
"MY_DATASET_NAME"
;
String
tableName
=
"MY_TABLE_NAME"
;
writePendingStream
(
projectId
,
datasetName
,
tableName
);
}
public
static
void
writePendingStream
(
String
projectId
,
String
datasetName
,
String
tableName
)
throws
DescriptorValidationException
,
InterruptedException
,
IOException
{
BigQueryWriteClient
client
=
BigQueryWriteClient
.
create
();
TableName
parentTable
=
TableName
.
of
(
projectId
,
datasetName
,
tableName
);
DataWriter
writer
=
new
DataWriter
();
// One time initialization.
writer
.
initialize
(
parentTable
,
client
);
try
{
// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
long
offset
=
0
;
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
// Create a JSON object that is compatible with the table schema.
JSONArray
jsonArr
=
new
JSONArray
();
for
(
int
j
=
0
;
j
<
10
;
j
++
)
{
JSONObject
record
=
new
JSONObject
();
record
.
put
(
"col1"
,
String
.
format
(
"batch-record %03d-%03d"
,
i
,
j
));
jsonArr
.
put
(
record
);
}
writer
.
append
(
jsonArr
,
offset
);
offset
+=
jsonArr
.
length
();
}
}
catch
(
ExecutionException
e
)
{
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System
.
out
.
println
(
"Failed to append records. \n"
+
e
);
}
// Final cleanup for the stream.
writer
.
cleanup
(
client
);
System
.
out
.
println
(
"Appended records successfully."
);
// Once all streams are done, if all writes were successful, commit all of them in one request.
// This example only has the one stream. If any streams failed, their workload may be
// retried on a new stream, and then only the successful stream should be included in the
// commit.
BatchCommitWriteStreamsRequest
commitRequest
=
BatchCommitWriteStreamsRequest
.
newBuilder
()
.
setParent
(
parentTable
.
toString
())
.
addWriteStreams
(
writer
.
getStreamName
())
.
build
();
BatchCommitWriteStreamsResponse
commitResponse
=
client
.
batchCommitWriteStreams
(
commitRequest
);
// If the response does not have a commit time, it means the commit operation failed.
if
(
commitResponse
.
hasCommitTime
()
==
false
)
{
for
(
StorageError
err
:
commitResponse
.
getStreamErrorsList
())
{
System
.
out
.
println
(
err
.
getErrorMessage
());
}
throw
new
RuntimeException
(
"Error committing the streams"
);
}
System
.
out
.
println
(
"Appended and committed records successfully."
);
}
// A simple wrapper object showing how the stateful stream writer should be used.
private
static
class
DataWriter
{
private
JsonStreamWriter
streamWriter
;
// Track the number of in-flight requests to wait for all responses before shutting down.
private
final
Phaser
inflightRequestCount
=
new
Phaser
(
1
);
private
final
Object
lock
=
new
Object
();
@GuardedBy
(
"lock"
)
private
RuntimeException
error
=
null
;
void
initialize
(
TableName
parentTable
,
BigQueryWriteClient
client
)
throws
IOException
,
DescriptorValidationException
,
InterruptedException
{
// Initialize a write stream for the specified table.
// For more information on WriteStream.Type, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream
stream
=
WriteStream
.
newBuilder
().
setType
(
WriteStream
.
Type
.
PENDING
).
build
();
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings
retrySettings
=
RetrySettings
.
newBuilder
()
.
setInitialRetryDelay
(
Duration
.
ofMillis
(
500
))
.
setRetryDelayMultiplier
(
1.1
)
.
setMaxAttempts
(
5
)
.
setMaxRetryDelay
(
Duration
.
ofMinutes
(
1
))
.
build
();
CreateWriteStreamRequest
createWriteStreamRequest
=
CreateWriteStreamRequest
.
newBuilder
()
.
setParent
(
parentTable
.
toString
())
.
setWriteStream
(
stream
)
.
build
();
WriteStream
writeStream
=
client
.
createWriteStream
(
createWriteStream
Request );
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.JsonStreamWriter
streamWriter
=
JsonStreamWriter
.
newBuilder
(
writeStream
.
getName
(),
writeStream
.
getTableSchema
())
.
setRetrySettings
(
retrySettings
)
.
build
();
}
public
void
append
(
JSONArray
data
,
long
offset
)
throws
DescriptorValidationException
,
IOException
,
ExecutionException
{
synchronized
(
this
.
lock
)
{
// If earlier appends have failed, we need to reset before continuing.
if
(
this
.
error
!=
null
)
{
throw
this
.
error
;
}
}
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse>
future
=
streamWriter
.
append
(
data
,
offset
);
ApiFutures
.
addCallback
(
future
,
new
AppendCompleteCallback
(
this
),
MoreExecutors
.
directExecutor
());
// Increase the count of in-flight requests.
inflightRequestCount
.
register
();
}
public
void
cleanup
(
BigQueryWriteClient
client
)
{
// Wait for all in-flight requests to complete.
inflightRequestCount
.
arriveAndAwaitAdvance
();
// Close the connection to the server.
streamWriter
.
close
();
// Verify that no error occurred in the stream.
synchronized
(
this
.
lock
)
{
if
(
this
.
error
!=
null
)
{
throw
this
.
error
;
}
}
// Finalize the stream.
FinalizeWriteStreamResponse
finalizeResponse
=
client
.
finalizeWriteStream
(
streamWriter
.
getStreamName
());
System
.
out
.
println
(
"Rows written: "
+
finalizeResponse
.
getRowCount
());
}
public
String
getStreamName
()
{
return
streamWriter
.
getStreamName
();
}
static
class
AppendCompleteCallback
implements
ApiFutureCallback<AppendRowsResponse>
{
private
final
DataWriter
parent
;
public
AppendCompleteCallback
(
DataWriter
parent
)
{
this
.
parent
=
parent
;
}
public
void
onSuccess
(
AppendRowsResponse
response
)
{
System
.
out
.
format
(
"Append %d success\n"
,
response
.
getAppendResult
().
getOffset
().
getValue
());
done
();
}
public
void
onFailure
(
Throwable
throwable
)
{
synchronized
(
this
.
parent
.
lock
)
{
if
(
this
.
parent
.
error
==
null
)
{
StorageException
storageException
=
Exceptions
.
toStorageException
(
throwable
);
this
.
parent
.
error
=
(
storageException
!=
null
)
?
storageException
:
new
RuntimeException
(
throwable
);
}
}
System
.
out
.
format
(
"Error: %s\n"
,
throwable
.
toString
());
done
();
}
private
void
done
()
{
// Reduce the count of in-flight requests.
this
.
parent
.
inflightRequestCount
.
arriveAndDeregister
();
}
}
}
}
Node.js
Before trying this sample, follow the Node.js setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Node.js API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
const
{
adapt
,
managedwriter
}
=
require
(
' @google-cloud/bigquery-storage
'
);
const
{
WriterClient
,
JSONWriter
}
=
managedwriter
;
async
function
appendRowsPendingStream
()
{
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const
destinationTable
=
`projects/
${
projectId
}
/datasets/
${
datasetId
}
/tables/
${
tableId
}
`
;
const
streamType
=
managedwriter
.
PendingStream
;
const
writeClient
=
new
WriterClient
({
projectId
:
projectId
});
try
{
const
writeStream
=
await
writeClient
.
createWriteStreamFullResponse
({
streamType
,
destinationTable
,
});
const
streamId
=
writeStream
.
name
;
console
.
log
(
`Stream created:
${
streamId
}
`
);
const
protoDescriptor
=
adapt
.
convertStorageSchemaToProto2Descriptor
(
writeStream
.
tableSchema
,
'root'
,
);
const
connection
=
await
writeClient
.
createStreamConnection
({
streamId
,
});
const
writer
=
new
JSONWriter
({
connection
,
protoDescriptor
,
});
let
rows
=
[];
const
pendingWrites
=
[];
// Row 1
let
row
=
{
row_num
:
1
,
bool_col
:
true
,
bytes_col
:
Buffer
.
from
(
'hello world'
),
float64_col
:
parseFloat
(
'+123.44999694824219'
),
int64_col
:
123
,
string_col
:
'omg'
,
};
rows
.
push
(
row
);
// Row 2
row
=
{
row_num
:
2
,
bool_col
:
false
,
};
rows
.
push
(
row
);
// Row 3
row
=
{
row_num
:
3
,
bytes_col
:
Buffer
.
from
(
'later, gator'
),
};
rows
.
push
(
row
);
// Row 4
row
=
{
row_num
:
4
,
float64_col
:
987.6539916992188
,
};
rows
.
push
(
row
);
// Row 5
row
=
{
row_num
:
5
,
int64_col
:
321
,
};
rows
.
push
(
row
);
// Row 6
row
=
{
row_num
:
6
,
string_col
:
'octavia'
,
};
rows
.
push
(
row
);
// Set an offset to allow resuming this stream if the connection breaks.
// Keep track of which requests the server has acknowledged and resume the
// stream at the first non-acknowledged message. If the server has already
// processed a message with that offset, it will return an ALREADY_EXISTS
// error, which can be safely ignored.
// The first request must always have an offset of 0.
let
offsetValue
=
0
;
// Send batch.
let
pw
=
writer
.
appendRows
(
rows
,
offsetValue
);
pendingWrites
.
push
(
pw
);
// Reset rows.
rows
=
[];
// Row 7
row
=
{
row_num
:
7
,
date_col
:
new
Date
(
'2019-02-07'
),
};
rows
.
push
(
row
);
// Row 8
row
=
{
row_num
:
8
,
datetime_col
:
new
Date
(
'2019-02-17T11:24:00.000Z'
),
};
rows
.
push
(
row
);
// Row 9
row
=
{
row_num
:
9
,
geography_col
:
'POINT(5 5)'
,
};
rows
.
push
(
row
);
// Row 10
row
=
{
row_num
:
10
,
numeric_col
:
123456
,
bignumeric_col
:
'99999999999999999999999999999.999999999'
,
};
rows
.
push
(
row
);
// Row 11
row
=
{
row_num
:
11
,
time_col
:
'18:00:00'
,
};
rows
.
push
(
row
);
// Row 12
row
=
{
row_num
:
12
,
timestamp_col
:
new
Date
(
'2022-01-09T03:49:46.564Z'
),
};
rows
.
push
(
row
);
// Offset must equal the number of rows that were previously sent.
offsetValue
=
6
;
// Send batch.
pw
=
writer
.
appendRows
(
rows
,
offsetValue
);
pendingWrites
.
push
(
pw
);
rows
=
[];
// Row 13
row
=
{
row_num
:
13
,
int64_list
:
[
1999
,
2001
],
};
rows
.
push
(
row
);
// Row 14
row
=
{
row_num
:
14
,
struct_col
:
{
sub_int_col
:
99
,
},
};
rows
.
push
(
row
);
// Row 15
row
=
{
row_num
:
15
,
struct_list
:
[{
sub_int_col
:
100
},
{
sub_int_col
:
101
}],
};
rows
.
push
(
row
);
// Row 16
row
=
{
row_num
:
16
,
range_col
:
{
start
:
new
Date
(
'2022-01-09T03:49:46.564Z'
),
end
:
new
Date
(
'2022-01-09T04:49:46.564Z'
),
},
};
rows
.
push
(
row
);
offsetValue
=
12
;
// Send batch.
pw
=
writer
.
appendRows
(
rows
,
offsetValue
);
pendingWrites
.
push
(
pw
);
const
results
=
await
Promise
.
all
(
pendingWrites
.
map
(
pw
=
>
pw
.
getResult
()),
);
console
.
log
(
'Write results:'
,
results
);
const
{
rowCount
}
=
await
connection
.
finalize
();
console
.
log
(
`Row count:
${
rowCount
}
`
);
const
response
=
await
writeClient
.
batchCommitWriteStream
({
parent
:
destinationTable
,
writeStreams
:
[
streamId
],
});
console
.
log
(
response
);
}
catch
(
err
)
{
console
.
log
(
err
.
message
,
err
);
}
finally
{
writeClient
.
close
();
}
}
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .