// The bigquery_storage_quickstart application demonstrates usage of the
// BigQuery Storage read API. It demonstrates API features such as column
// projection (limiting the output to a subset of a table's columns),
// column filtering (using simple predicates to filter records on the server
// side), establishing the snapshot time (reading data from the table at a
// specific point in time), decoding Avro row blocks using the third party
// "github.com/linkedin/goavro" library, and decoding Arrow row blocks using
// the third party "github.com/apache/arrow/go" library.
package
main
import
(
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"sort"
"strings"
"sync"
"time"
bqStorage
"cloud.google.com/go/bigquery/storage/apiv1"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/ipc"
"github.com/apache/arrow/go/v10/arrow/memory"
gax
"github.com/googleapis/gax-go/v2"
goavro
"github.com/linkedin/goavro/v2"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
// rpcOpts is used to configure the underlying gRPC client to accept large
// messages. The BigQuery Storage API may send message blocks up to 128MB
// in size.
var
rpcOpts
=
gax
.
WithGRPCOptions
(
grpc
.
MaxCallRecvMsgSize
(
1024
*
1024
*
129
),
)
// Available formats
const
(
AVRO_FORMAT
=
"avro"
ARROW_FORMAT
=
"arrow"
)
// Command-line flags.
var
(
projectID
=
flag
.
String
(
"project_id"
,
""
,
"Cloud Project ID, used for session creation."
)
snapshotMillis
=
flag
.
Int64
(
"snapshot_millis"
,
0
,
"Snapshot time to use for reads, represented in epoch milliseconds format. Default behavior reads current data."
)
format
=
flag
.
String
(
"format"
,
AVRO_FORMAT
,
"format to read data from storage API. Default is avro."
)
)
func
main
()
{
flag
.
Parse
()
ctx
:=
context
.
Background
()
bqReadClient
,
err
:=
bqStorage
.
NewBigQueryReadClient
(
ctx
)
if
err
!=
nil
{
log
.
Fatalf
(
"NewBigQueryStorageClient: %v"
,
err
)
}
defer
bqReadClient
.
Close
()
// Verify we've been provided a parent project which will contain the read session. The
// session may exist in a different project than the table being read.
if
*
projectID
==
""
{
log
.
Fatalf
(
"No parent project ID specified, please supply using the --project_id flag."
)
}
// This example uses baby name data from the public datasets.
srcProjectID
:=
"bigquery-public-data"
srcDatasetID
:=
"usa_names"
srcTableID
:=
"usa_1910_current"
readTable
:=
fmt
.
Sprintf
(
"projects/%s/datasets/%s/tables/%s"
,
srcProjectID
,
srcDatasetID
,
srcTableID
,
)
// We limit the output columns to a subset of those allowed in the table,
// and set a simple filter to only report names from the state of
// Washington (WA).
tableReadOptions
:=
& storagepb
.
ReadSession_TableReadOptions
{
SelectedFields
:
[]
string
{
"name"
,
"number"
,
"state"
},
RowRestriction
:
`state = "WA"`
,
}
dataFormat
:=
storagepb
.
DataFormat_AVRO
if
*
format
==
ARROW_FORMAT
{
dataFormat
=
storagepb
.
DataFormat_ARROW
}
createReadSessionRequest
:=
& storagepb
.
CreateReadSessionRequest
{
Parent
:
fmt
.
Sprintf
(
"projects/%s"
,
*
projectID
),
ReadSession
:
& storagepb
.
ReadSession
{
Table
:
readTable
,
DataFormat
:
dataFormat
,
ReadOptions
:
tableReadOptions
,
},
MaxStreamCount
:
1
,
}
// Set a snapshot time if it's been specified.
if
*
snapshotMillis
>
0
{
ts
:=
timestamppb
.
New
(
time
.
Unix
(
0
,
*
snapshotMillis
*
1000
))
if
!
ts
.
IsValid
()
{
log
.
Fatalf
(
"Invalid snapshot millis (%d): %v"
,
*
snapshotMillis
,
err
)
}
createReadSessionRequest
.
ReadSession
.
TableModifiers
=
& storagepb
.
ReadSession_TableModifiers
{
SnapshotTime
:
ts
,
}
}
// Create the session from the request.
session
,
err
:=
bqReadClient
.
CreateReadSession
(
ctx
,
createReadSessionRequest
,
rpcOpts
)
if
err
!=
nil
{
log
.
Fatalf
(
"CreateReadSession: %v"
,
err
)
}
fmt
.
Printf
(
"Read session: %s\n"
,
session
.
GetName
())
if
len
(
session
.
GetStreams
())
==
0
{
log
.
Fatalf
(
"no streams in session. if this was a small query result, consider writing to output to a named table."
)
}
// We'll use only a single stream for reading data from the table. Because
// of dynamic sharding, this will yield all the rows in the table. However,
// if you wanted to fan out multiple readers you could do so by having a
// increasing the MaxStreamCount.
readStream
:=
session
.
GetStreams
()[
0
].
Name
ch
:=
make
(
chan
*
storagepb
.
ReadRowsResponse
)
// Use a waitgroup to coordinate the reading and decoding goroutines.
var
wg
sync
.
WaitGroup
// Start the reading in one goroutine.
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
if
err
:=
processStream
(
ctx
,
bqReadClient
,
readStream
,
ch
);
err
!=
nil
{
log
.
Fatalf
(
"processStream failure: %v"
,
err
)
}
close
(
ch
)
}()
// Start Avro processing and decoding in another goroutine.
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
var
err
error
switch
*
format
{
case
ARROW_FORMAT
:
err
=
processArrow
(
ctx
,
session
.
GetArrowSchema
().
GetSerializedSchema
(),
ch
)
case
AVRO_FORMAT
:
err
=
processAvro
(
ctx
,
session
.
GetAvroSchema
().
GetSchema
(),
ch
)
}
if
err
!=
nil
{
log
.
Fatalf
(
"error processing %s: %v"
,
*
format
,
err
)
}
}()
// Wait until both the reading and decoding goroutines complete.
wg
.
Wait
()
}
// printDatum prints the decoded row datum.
func
printDatum
(
d
interface
{})
{
m
,
ok
:=
d
.(
map
[
string
]
interface
{})
if
!
ok
{
log
.
Printf
(
"failed type assertion: %v"
,
d
)
}
// Go's map implementation returns keys in a random ordering, so we sort
// the keys before accessing.
keys
:=
make
([]
string
,
len
(
m
))
i
:=
0
for
k
:=
range
m
{
keys
[
i
]
=
k
i
++
}
sort
.
Strings
(
keys
)
for
_
,
key
:=
range
keys
{
fmt
.
Printf
(
"%s: %-20v "
,
key
,
valueFromTypeMap
(
m
[
key
]))
}
fmt
.
Println
()
}
// printRecordBatch prints the arrow record batch
func
printRecordBatch
(
record
arrow
.
Record
)
error
{
out
,
err
:=
record
.
MarshalJSON
()
if
err
!=
nil
{
return
err
}
list
:=
[]
map
[
string
]
interface
{}{}
err
=
json
.
Unmarshal
(
out
,
& list
)
if
err
!=
nil
{
return
err
}
if
len
(
list
)
==
0
{
return
nil
}
first
:=
list
[
0
]
keys
:=
make
([]
string
,
len
(
first
))
i
:=
0
for
k
:=
range
first
{
keys
[
i
]
=
k
i
++
}
sort
.
Strings
(
keys
)
builder
:=
strings
.
Builder
{}
for
_
,
m
:=
range
list
{
for
_
,
key
:=
range
keys
{
builder
.
WriteString
(
fmt
.
Sprintf
(
"%s: %-20v "
,
key
,
m
[
key
]))
}
builder
.
WriteString
(
"\n"
)
}
fmt
.
Print
(
builder
.
String
())
return
nil
}
// valueFromTypeMap returns the first value/key in the type map. This function
// is only suitable for simple schemas, as complex typing such as arrays and
// records necessitate a more robust implementation. See the goavro library
// and the Avro specification for more information.
func
valueFromTypeMap
(
field
interface
{})
interface
{}
{
m
,
ok
:=
field
.(
map
[
string
]
interface
{})
if
!
ok
{
return
nil
}
for
_
,
v
:=
range
m
{
// Return the first key encountered.
return
v
}
return
nil
}
// processStream reads rows from a single storage Stream, and sends the Storage Response
// data blocks to a channel. This function will retry on transient stream
// failures and bookmark progress to avoid re-reading data that's already been
// successfully transmitted.
func
processStream
(
ctx
context
.
Context
,
client
*
bqStorage
.
BigQueryReadClient
,
st
string
,
ch
chan
< -
*
storagepb
.
ReadRowsResponse
)
error
{
var
offset
int64
// Streams may be long-running. Rather than using a global retry for the
// stream, implement a retry that resets once progress is made.
retryLimit
:=
3
retries
:=
0
for
{
// Send the initiating request to start streaming row blocks.
rowStream
,
err
:=
client
.
ReadRows
(
ctx
,
& storagepb
.
ReadRowsRequest
{
ReadStream
:
st
,
Offset
:
offset
,
},
rpcOpts
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"couldn't invoke ReadRows: %w"
,
err
)
}
// Process the streamed responses.
for
{
r
,
err
:=
rowStream
.
Recv
()
if
err
==
io
.
EOF
{
return
nil
}
if
err
!=
nil
{
// If there is an error, check whether it is a retryable
// error with a retry delay and sleep instead of increasing
// retries count.
var
retryDelayDuration
time
.
Duration
if
errorStatus
,
ok
:=
status
.
FromError
(
err
);
ok
&&
errorStatus
.
Code
()
==
codes
.
ResourceExhausted
{
for
_
,
detail
:=
range
errorStatus
.
Details
()
{
retryInfo
,
ok
:=
detail
.(
*
errdetails
.
RetryInfo
)
if
!
ok
{
continue
}
retryDelay
:=
retryInfo
.
GetRetryDelay
()
retryDelayDuration
=
time
.
Duration
(
retryDelay
.
Seconds
)
*
time
.
Second
+
time
.
Duration
(
retryDelay
.
Nanos
)
*
time
.
Nanosecond
break
}
}
if
retryDelayDuration
!=
0
{
log
.
Printf
(
"processStream failed with a retryable error, retrying in %v"
,
retryDelayDuration
)
time
.
Sleep
(
retryDelayDuration
)
}
else
{
retries
++
if
retries
> =
retryLimit
{
return
fmt
.
Errorf
(
"processStream retries exhausted: %w"
,
err
)
}
}
// break the inner loop, and try to recover by starting a new streaming
// ReadRows call at the last known good offset.
break
}
else
{
// Reset retries after a successful response.
retries
=
0
}
rc
:=
r
.
GetRowCount
()
if
rc
>
0
{
// Bookmark our progress in case of retries and send the rowblock on the channel.
offset
=
offset
+
rc
// We're making progress, reset retries.
retries
=
0
ch
< -
r
}
}
}
}
// processArrow receives row blocks from a channel, and uses the provided Arrow
// schema to decode the blocks into individual row messages for printing. Will
// continue to run until the channel is closed or the provided context is
// cancelled.
func
processArrow
(
ctx
context
.
Context
,
schema
[]
byte
,
ch
< -
chan
*
storagepb
.
ReadRowsResponse
)
error
{
mem
:=
memory
.
NewGoAllocator
()
buf
:=
bytes
.
NewBuffer
(
schema
)
r
,
err
:=
ipc
.
NewReader
(
buf
,
ipc
.
WithAllocator
(
mem
))
if
err
!=
nil
{
return
err
}
aschema
:=
r
.
Schema
()
for
{
select
{
case
< -
ctx
.
Done
():
// Context was cancelled. Stop.
return
ctx
.
Err
()
case
rows
,
ok
:=
< -
ch
:
if
!
ok
{
// Channel closed, no further arrow messages. Stop.
return
nil
}
undecoded
:=
rows
.
GetArrowRecordBatch
().
GetSerializedRecordBatch
()
if
len
(
undecoded
)
>
0
{
buf
=
bytes
.
NewBuffer
(
schema
)
buf
.
Write
(
undecoded
)
r
,
err
=
ipc
.
NewReader
(
buf
,
ipc
.
WithAllocator
(
mem
),
ipc
.
WithSchema
(
aschema
))
if
err
!=
nil
{
return
err
}
for
r
.
Next
()
{
rec
:=
r
.
Record
()
err
=
printRecordBatch
(
rec
)
if
err
!=
nil
{
return
err
}
}
}
}
}
}
// processAvro receives row blocks from a channel, and uses the provided Avro
// schema to decode the blocks into individual row messages for printing. Will
// continue to run until the channel is closed or the provided context is
// cancelled.
func
processAvro
(
ctx
context
.
Context
,
schema
string
,
ch
< -
chan
*
storagepb
.
ReadRowsResponse
)
error
{
// Establish a decoder that can process blocks of messages using the
// reference schema. All blocks share the same schema, so the decoder
// can be long-lived.
codec
,
err
:=
goavro
.
NewCodec
(
schema
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"couldn't create codec: %w"
,
err
)
}
for
{
select
{
case
< -
ctx
.
Done
():
// Context was cancelled. Stop.
return
ctx
.
Err
()
case
rows
,
ok
:=
< -
ch
:
if
!
ok
{
// Channel closed, no further avro messages. Stop.
return
nil
}
undecoded
:=
rows
.
GetAvroRows
().
GetSerializedBinaryRows
()
for
len
(
undecoded
)
>
0
{
datum
,
remainingBytes
,
err
:=
codec
.
NativeFromBinary
(
undecoded
)
if
err
!=
nil
{
if
err
==
io
.
EOF
{
break
}
return
fmt
.
Errorf
(
"decoding error with %d bytes remaining: %v"
,
len
(
undecoded
),
err
)
}
printDatum
(
datum
)
undecoded
=
remainingBytes
}
}
}
}