import
(
"context"
"fmt"
"io"
"math/rand"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/managedwriter/exampleproto"
"google.golang.org/protobuf/proto"
)
// generateExampleMessages generates a slice of serialized protobuf messages using a statically defined
// and compiled protocol buffer file, and returns the binary serialized representation.
func
generateExampleMessages
(
numMessages
int
)
([][]
byte
,
error
)
{
msgs
:=
make
([][]
byte
,
numMessages
)
for
i
:=
0
;
i
<
numMessages
;
i
++
{
random
:=
rand
.
New
(
rand
.
NewSource
(
time
.
Now
().
UnixNano
()))
// Our example data embeds an array of structs, so we'll construct that first.
sList
:=
make
([]
*
exampleproto
.
SampleStruct
,
5
)
for
i
:=
0
;
i
<
int
(
random
.
Int63n
(
5
)
+
1
);
i
++
{
sList
[
i
]
=
& exampleproto
.
SampleStruct
{
SubIntCol
:
proto
.
Int64
(
random
.
Int63
()),
}
}
m
:=
& exampleproto
.
SampleData
{
BoolCol
:
proto
.
Bool
(
true
),
BytesCol
:
[]
byte
(
"some bytes"
),
Float64Col
:
proto
.
Float64
(
3.14
),
Int64Col
:
proto
.
Int64
(
123
),
StringCol
:
proto
.
String
(
"example string value"
),
// These types require special encoding/formatting to transmit.
// DATE values are number of days since the Unix epoch.
DateCol
:
proto
.
Int32
(
int32
(
time
.
Now
().
UnixNano
()
/
86400000000000
)),
// DATETIME uses the literal format.
DatetimeCol
:
proto
.
String
(
"2022-01-01 12:13:14.000000"
),
// GEOGRAPHY uses Well-Known-Text (WKT) format.
GeographyCol
:
proto
.
String
(
"POINT(-122.350220 47.649154)"
),
// NUMERIC and BIGNUMERIC can be passed as string, or more efficiently
// using a packed byte representation.
NumericCol
:
proto
.
String
(
"99999999999999999999999999999.999999999"
),
BignumericCol
:
proto
.
String
(
"578960446186580977117854925043439539266.34992332820282019728792003956564819967"
),
// TIME also uses literal format.
TimeCol
:
proto
.
String
(
"12:13:14.000000"
),
// TIMESTAMP uses microseconds since Unix epoch.
TimestampCol
:
proto
.
Int64
(
time
.
Now
().
UnixNano
()
/
1000
),
// Int64List is an array of INT64 types.
Int64List
:
[]
int64
{
2
,
4
,
6
,
8
},
// This is a required field, and thus must be present.
RowNum
:
proto
.
Int64
(
23
),
// StructCol is a single nested message.
StructCol
:
& exampleproto
.
SampleStruct
{
SubIntCol
:
proto
.
Int64
(
random
.
Int63
()),
},
// StructList is a repeated array of a nested message.
StructList
:
sList
,
}
b
,
err
:=
proto
.
Marshal
(
m
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"error generating message %d: %w"
,
i
,
err
)
}
msgs
[
i
]
=
b
}
return
msgs
,
nil
}
// appendToPendingStream demonstrates using the managedwriter package to write some example data
// to a pending stream, and then committing it to a table.
func
appendToPendingStream
(
w
io
.
Writer
,
projectID
,
datasetID
,
tableID
string
)
error
{
// projectID := "myproject"
// datasetID := "mydataset"
// tableID := "mytable"
ctx
:=
context
.
Background
()
// Instantiate a managedwriter client to handle interactions with the service.
client
,
err
:=
managedwriter
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"managedwriter.NewClient: %w"
,
err
)
}
// Close the client when we exit the function.
defer
client
.
Close
()
// Create a new pending stream. We'll use the stream name to construct a writer.
pendingStream
,
err
:=
client
.
CreateWriteStream
(
ctx
,
& storagepb
.
CreateWriteStreamRequest
{
Parent
:
fmt
.
Sprintf
(
"projects/%s/datasets/%s/tables/%s"
,
projectID
,
datasetID
,
tableID
),
WriteStream
:
& storagepb
.
WriteStream
{
Type
:
storagepb
.
WriteStream_PENDING
,
},
})
if
err
!=
nil
{
return
fmt
.
Errorf
(
"CreateWriteStream: %w"
,
err
)
}
// We need to communicate the descriptor of the protocol buffer message we're using, which
// is analagous to the "schema" for the message. Both SampleData and SampleStruct are
// two distinct messages in the compiled proto file, so we'll use adapt.NormalizeDescriptor
// to unify them into a single self-contained descriptor representation.
m
:=
& exampleproto
.
SampleData
{}
descriptorProto
,
err
:=
adapt
.
NormalizeDescriptor
(
m
.
ProtoReflect
().
Descriptor
())
if
err
!=
nil
{
return
fmt
.
Errorf
(
"NormalizeDescriptor: %w"
,
err
)
}
// Instantiate a ManagedStream, which manages low level details like connection state and provides
// additional features like a future-like callback for appends, etc. NewManagedStream can also create
// the stream on your behalf, but in this example we're being explicit about stream creation.
managedStream
,
err
:=
client
.
NewManagedStream
(
ctx
,
managedwriter
.
WithStreamName
(
pendingStream
.
GetName
()),
managedwriter
.
WithSchemaDescriptor
(
descriptorProto
))
if
err
!=
nil
{
return
fmt
.
Errorf
(
"NewManagedStream: %w"
,
err
)
}
defer
managedStream
.
Close
()
// First, we'll append a single row.
rows
,
err
:=
generateExampleMessages
(
1
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"generateExampleMessages: %w"
,
err
)
}
// We'll keep track of the current offset in the stream with curOffset.
var
curOffset
int64
// We can append data asyncronously, so we'll check our appends at the end.
var
results
[]
*
managedwriter
.
AppendResult
result
,
err
:=
managedStream
.
AppendRows
(
ctx
,
rows
,
managedwriter
.
WithOffset
(
0
))
if
err
!=
nil
{
return
fmt
.
Errorf
(
"AppendRows first call error: %w"
,
err
)
}
results
=
append
(
results
,
result
)
// Advance our current offset.
curOffset
=
curOffset
+
1
// This time, we'll append three more rows in a single request.
rows
,
err
=
generateExampleMessages
(
3
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"generateExampleMessages: %w"
,
err
)
}
result
,
err
=
managedStream
.
AppendRows
(
ctx
,
rows
,
managedwriter
.
WithOffset
(
curOffset
))
if
err
!=
nil
{
return
fmt
.
Errorf
(
"AppendRows second call error: %w"
,
err
)
}
results
=
append
(
results
,
result
)
// Advance our offset again.
curOffset
=
curOffset
+
3
// Finally, we'll append two more rows.
rows
,
err
=
generateExampleMessages
(
2
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"generateExampleMessages: %w"
,
err
)
}
result
,
err
=
managedStream
.
AppendRows
(
ctx
,
rows
,
managedwriter
.
WithOffset
(
curOffset
))
if
err
!=
nil
{
return
fmt
.
Errorf
(
"AppendRows third call error: %w"
,
err
)
}
results
=
append
(
results
,
result
)
// Now, we'll check that our batch of three appends all completed successfully.
// Monitoring the results could also be done out of band via a goroutine.
for
k
,
v
:=
range
results
{
// GetResult blocks until we receive a response from the API.
recvOffset
,
err
:=
v
.
GetResult
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"append %d returned error: %w"
,
k
,
err
)
}
fmt
.
Fprintf
(
w
,
"Successfully appended data at offset %d.\n"
,
recvOffset
)
}
// We're now done appending to this stream. We now mark pending stream finalized, which blocks
// further appends.
rowCount
,
err
:=
managedStream
.
Finalize
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"error during Finalize: %w"
,
err
)
}
fmt
.
Fprintf
(
w
,
"Stream %s finalized with %d rows.\n"
,
managedStream
.
StreamName
(),
rowCount
)
// To commit the data to the table, we need to run a batch commit. You can commit several streams
// atomically as a group, but in this instance we'll only commit the single stream.
req
:=
& storagepb
.
BatchCommitWriteStreamsRequest
{
Parent
:
managedwriter
.
TableParentFromStreamName
(
managedStream
.
StreamName
()),
WriteStreams
:
[]
string
{
managedStream
.
StreamName
()},
}
resp
,
err
:=
client
.
BatchCommitWriteStreams
(
ctx
,
req
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"client.BatchCommit: %w"
,
err
)
}
if
len
(
resp
.
GetStreamErrors
())
>
0
{
return
fmt
.
Errorf
(
"stream errors present: %v"
,
resp
.
GetStreamErrors
())
}
fmt
.
Fprintf
(
w
,
"Table data committed at %s\n"
,
resp
.
GetCommitTime
().
AsTime
().
Format
(
time
.
RFC3339Nano
))
return
nil
}