This sample demonstrates how to use a protocol buffer to write data into a BigQuery table.
Code sample
Node.js
Before trying this sample, follow the Node.js setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Node.js API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
const
{
adapt
,
managedwriter
}
=
require
(
' @google-cloud/bigquery-storage
'
);
const
{
WriterClient
,
Writer
}
=
managedwriter
;
const
sample_data_pb
=
require
(
'./sample_data_pb.js'
);
const
{
SampleData
}
=
sample_data_pb
;
const
protobufjs
=
require
(
'protobufjs'
);
require
(
'protobufjs/ext/descriptor'
);
async
function
appendRowsProto2
()
{
/**
* If you make updates to the sample_data.proto protocol buffers definition,
* run:
* pbjs sample_data.proto -t static-module -w commonjs -o sample_data.js
* pbjs sample_data.proto -t json --keep-case -o sample_data.json
* from the /samples directory to generate the sample_data module.
*/
// So that BigQuery knows how to parse the serialized_rows, create a
// protocol buffer representation of your message descriptor.
const
root
=
protobufjs
.
loadSync
(
'./sample_data.json'
);
const
descriptor
=
root
.
lookupType
(
'SampleData'
).
toDescriptor
(
'proto2'
);
const
protoDescriptor
=
adapt
.
normalizeDescriptor
(
descriptor
).
toJSON
();
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// projectId = 'my_project';
// datasetId = 'my_dataset';
// tableId = 'my_table';
const
destinationTable
=
`projects/
${
projectId
}
/datasets/
${
datasetId
}
/tables/
${
tableId
}
`
;
const
streamType
=
managedwriter
.
PendingStream
;
const
writeClient
=
new
WriterClient
({
projectId
});
try
{
const
streamId
=
await
writeClient
.
createWriteStream
({
streamType
,
destinationTable
,
});
console
.
log
(
`Stream created:
${
streamId
}
`
);
const
connection
=
await
writeClient
.
createStreamConnection
({
streamId
,
});
const
writer
=
new
Writer
({
connection
,
protoDescriptor
,
});
let
serializedRows
=
[];
const
pendingWrites
=
[];
// Row 1
let
row
=
{
rowNum
:
1
,
boolCol
:
true
,
bytesCol
:
Buffer
.
from
(
'hello world'
),
float64Col
:
parseFloat
(
'+123.45'
),
int64Col
:
123
,
stringCol
:
'omg'
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 2
row
=
{
rowNum
:
2
,
boolCol
:
false
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 3
row
=
{
rowNum
:
3
,
bytesCol
:
Buffer
.
from
(
'later, gator'
),
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 4
row
=
{
rowNum
:
4
,
float64Col
:
987.654
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 5
row
=
{
rowNum
:
5
,
int64Col
:
321
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 6
row
=
{
rowNum
:
6
,
stringCol
:
'octavia'
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Set an offset to allow resuming this stream if the connection breaks.
// Keep track of which requests the server has acknowledged and resume the
// stream at the first non-acknowledged message. If the server has already
// processed a message with that offset, it will return an ALREADY_EXISTS
// error, which can be safely ignored.
// The first request must always have an offset of 0.
let
offsetValue
=
0
;
// Send batch.
let
pw
=
writer
.
appendRows
({
serializedRows
},
offsetValue
);
pendingWrites
.
push
(
pw
);
// Reset rows.
serializedRows
=
[];
// Row 7
const
days
=
new
Date
(
'2019-02-07'
).
getTime
()
/
(
1000
*
60
*
60
*
24
);
row
=
{
rowNum
:
7
,
dateCol
:
days
,
// The value is the number of days since the Unix epoch (1970-01-01)
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 8
row
=
{
rowNum
:
8
,
datetimeCol
:
'2019-02-17 11:24:00.000'
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 9
row
=
{
rowNum
:
9
,
geographyCol
:
'POINT(5 5)'
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 10
row
=
{
rowNum
:
10
,
numericCol
:
'123456'
,
bignumericCol
:
'99999999999999999999999999999.999999999'
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 11
row
=
{
rowNum
:
11
,
timeCol
:
'18:00:00'
,
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 12
const
timestamp
=
new
Date
(
'2022-01-09T03:49:46.564Z'
).
getTime
();
row
=
{
rowNum
:
12
,
timestampCol
:
timestamp
*
1000
,
// The value is given in microseconds since the Unix epoch (1970-01-01)
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Offset must equal the number of rows that were previously sent.
offsetValue
=
6
;
// Send batch.
pw
=
writer
.
appendRows
({
serializedRows
},
offsetValue
);
pendingWrites
.
push
(
pw
);
serializedRows
=
[];
// Row 13
row
=
{
rowNum
:
13
,
int64List
:
[
1999
,
2001
],
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 14
row
=
{
rowNum
:
14
,
structCol
:
{
subIntCol
:
99
,
},
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 15
row
=
{
rowNum
:
15
,
structList
:
[{
subIntCol
:
100
},
{
subIntCol
:
101
}],
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
// Row 16
const
timestampStart
=
new
Date
(
'2022-01-09T03:49:46.564Z'
).
getTime
();
const
timestampEnd
=
new
Date
(
'2022-01-09T04:49:46.564Z'
).
getTime
();
row
=
{
rowNum
:
16
,
rangeCol
:
{
start
:
timestampStart
*
1000
,
end
:
timestampEnd
*
1000
,
},
};
serializedRows
.
push
(
SampleData
.
encode
(
row
).
finish
());
offsetValue
=
12
;
// Send batch.
pw
=
writer
.
appendRows
({
serializedRows
},
offsetValue
);
pendingWrites
.
push
(
pw
);
const
results
=
await
Promise
.
all
(
pendingWrites
.
map
(
pw
=
>
pw
.
getResult
()),
);
console
.
log
(
'Write results:'
,
results
);
const
{
rowCount
}
=
await
connection
.
finalize
();
console
.
log
(
`Row count:
${
rowCount
}
`
);
const
response
=
await
writeClient
.
batchCommitWriteStream
({
parent
:
destinationTable
,
writeStreams
:
[
streamId
],
});
console
.
log
(
response
);
}
catch
(
err
)
{
console
.
log
(
err
);
}
finally
{
writeClient
.
close
();
}
}
Python
Before trying this sample, follow the Python setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Python API reference documentation .
To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .
"""
This code sample demonstrates using the low-level generated client for Python.
"""
import
datetime
import
decimal
from
google.protobuf
import
descriptor_pb2
from
google.cloud
import
bigquery_storage_v1
from
google.cloud.bigquery_storage_v1
import
types
,
writer
# If you make updates to the sample_data.proto protocol buffers definition,
# run:
#
# protoc --python_out=. sample_data.proto
#
# from the samples/snippets directory to generate the sample_data_pb2 module.
from
.
import
sample_data_pb2
def
append_rows_proto2
(
project_id
:
str
,
dataset_id
:
str
,
table_id
:
str
):
"""Create a write stream, write some sample data, and commit the stream."""
write_client
=
bigquery_storage_v1
.
BigQueryWriteClient
()
parent
=
write_client
.
table_path
(
project_id
,
dataset_id
,
table_id
)
write_stream
=
types
.
WriteStream
()
# When creating the stream, choose the type. Use the PENDING type to wait
# until the stream is committed before it is visible. See:
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream
.
type_
=
types
.
WriteStream
.
Type
.
PENDING
write_stream
=
write_client
.
create_write_stream
(
parent
=
parent
,
write_stream
=
write_stream
)
stream_name
=
write_stream
.
name
# Create a template with fields needed for the first request.
request_template
=
types
.
AppendRowsRequest
()
# The initial request must contain the stream name.
request_template
.
write_stream
=
stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema
=
types
.
ProtoSchema
()
proto_descriptor
=
descriptor_pb2
.
DescriptorProto
()
sample_data_pb2
.
SampleData
.
DESCRIPTOR
.
CopyToProto
(
proto_descriptor
)
proto_schema
.
proto_descriptor
=
proto_descriptor
proto_data
=
types
.
AppendRowsRequest
.
ProtoData
()
proto_data
.
writer_schema
=
proto_schema
request_template
.
proto_rows
=
proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream
=
writer
.
AppendRowsStream
(
write_client
,
request_template
)
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows
=
types
.
ProtoRows
()
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
1
row
.
bool_col
=
True
row
.
bytes_col
=
b
"Hello, World!"
row
.
float64_col
=
float
(
"+inf"
)
row
.
int64_col
=
123
row
.
string_col
=
"Howdy!"
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
2
row
.
bool_col
=
False
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
3
row
.
bytes_col
=
b
"See you later!"
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
4
row
.
float64_col
=
1000000.125
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
5
row
.
int64_col
=
67000
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
6
row
.
string_col
=
"Auf Wiedersehen!"
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request
=
types
.
AppendRowsRequest
()
request
.
offset
=
0
proto_data
=
types
.
AppendRowsRequest
.
ProtoData
()
proto_data
.
rows
=
proto_rows
request
.
proto_rows
=
proto_data
response_future_1
=
append_rows_stream
.
send
(
request
)
# Create a batch of rows containing scalar values that don't directly
# correspond to a protocol buffers scalar type. See the documentation for
# the expected data formats:
# https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
proto_rows
=
types
.
ProtoRows
()
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
7
date_value
=
datetime
.
date
(
2021
,
8
,
12
)
epoch_value
=
datetime
.
date
(
1970
,
1
,
1
)
delta
=
date_value
-
epoch_value
row
.
date_col
=
delta
.
days
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
8
datetime_value
=
datetime
.
datetime
(
2021
,
8
,
12
,
9
,
46
,
23
,
987456
)
row
.
datetime_col
=
datetime_value
.
strftime
(
"%Y-%m-
%d
%H:%M:%S.
%f
"
)
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
9
row
.
geography_col
=
"POINT(-122.347222 47.651111)"
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
10
numeric_value
=
decimal
.
Decimal
(
"1.23456789101112e+6"
)
row
.
numeric_col
=
str
(
numeric_value
)
bignumeric_value
=
decimal
.
Decimal
(
"-1.234567891011121314151617181920e+16"
)
row
.
bignumeric_col
=
str
(
bignumeric_value
)
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
11
time_value
=
datetime
.
time
(
11
,
7
,
48
,
123456
)
row
.
time_col
=
time_value
.
strftime
(
"%H:%M:%S.
%f
"
)
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
12
timestamp_value
=
datetime
.
datetime
(
2021
,
8
,
12
,
16
,
11
,
22
,
987654
,
tzinfo
=
datetime
.
timezone
.
utc
)
epoch_value
=
datetime
.
datetime
(
1970
,
1
,
1
,
tzinfo
=
datetime
.
timezone
.
utc
)
delta
=
timestamp_value
-
epoch_value
row
.
timestamp_col
=
int
(
delta
.
total_seconds
())
*
1000000
+
int
(
delta
.
microseconds
)
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request
=
types
.
AppendRowsRequest
()
proto_data
=
types
.
AppendRowsRequest
.
ProtoData
()
proto_data
.
rows
=
proto_rows
request
.
proto_rows
=
proto_data
# Offset must equal the number of rows that were previously sent.
request
.
offset
=
6
response_future_2
=
append_rows_stream
.
send
(
request
)
# Create a batch of rows with STRUCT and ARRAY BigQuery data types. In
# protocol buffers, these correspond to nested messages and repeated
# fields, respectively.
proto_rows
=
types
.
ProtoRows
()
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
13
row
.
int64_list
.
append
(
1
)
row
.
int64_list
.
append
(
2
)
row
.
int64_list
.
append
(
3
)
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
14
row
.
struct_col
.
sub_int_col
=
7
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
15
sub_message
=
sample_data_pb2
.
SampleData
.
SampleStruct
()
sub_message
.
sub_int_col
=
-
1
row
.
struct_list
.
append
(
sub_message
)
sub_message
=
sample_data_pb2
.
SampleData
.
SampleStruct
()
sub_message
.
sub_int_col
=
-
2
row
.
struct_list
.
append
(
sub_message
)
sub_message
=
sample_data_pb2
.
SampleData
.
SampleStruct
()
sub_message
.
sub_int_col
=
-
3
row
.
struct_list
.
append
(
sub_message
)
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
row
=
sample_data_pb2
.
SampleData
()
row
.
row_num
=
16
date_value
=
datetime
.
date
(
2021
,
8
,
8
)
epoch_value
=
datetime
.
date
(
1970
,
1
,
1
)
delta
=
date_value
-
epoch_value
row
.
range_date
.
start
=
delta
.
days
proto_rows
.
serialized_rows
.
append
(
row
.
SerializeToString
())
request
=
types
.
AppendRowsRequest
()
request
.
offset
=
12
proto_data
=
types
.
AppendRowsRequest
.
ProtoData
()
proto_data
.
rows
=
proto_rows
request
.
proto_rows
=
proto_data
# For each request sent, a message is expected in the responses iterable.
# This sample sends 3 requests, therefore expect exactly 3 responses.
response_future_3
=
append_rows_stream
.
send
(
request
)
# All three requests are in-flight, wait for them to finish being processed
# before finalizing the stream.
print
(
response_future_1
.
result
())
print
(
response_future_2
.
result
())
print
(
response_future_3
.
result
())
# Shutdown background threads and close the streaming connection.
append_rows_stream
.
close
()
# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client
.
finalize_write_stream
(
name
=
write_stream
.
name
)
# Commit the stream you created earlier.
batch_commit_write_streams_request
=
types
.
BatchCommitWriteStreamsRequest
()
batch_commit_write_streams_request
.
parent
=
parent
batch_commit_write_streams_request
.
write_streams
=
[
write_stream
.
name
]
write_client
.
batch_commit_write_streams
(
batch_commit_write_streams_request
)
print
(
f
"Writes to stream: '
{
write_stream
.
name
}
' have been committed."
)
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .