Append data for a complex schema

Demonstrates how to append complex data to a table with a default stream.

Code sample

Go

Before trying this sample, follow the Go setup instructions in the BigQuery quickstart using client libraries . For more information, see the BigQuery Go API reference documentation .

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "math/rand" 
  
 "time" 
  
 "cloud.google.com/go/bigquery/storage/managedwriter" 
  
 "cloud.google.com/go/bigquery/storage/managedwriter/adapt" 
  
 "github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/managedwriter/exampleproto" 
  
 "google.golang.org/protobuf/proto" 
 ) 
 // generateExampleMessages generates a slice of serialized protobuf messages using a statically defined 
 // and compiled protocol buffer file, and returns the binary serialized representation. 
 func 
  
 generateExampleDefaultMessages 
 ( 
 numMessages 
  
 int 
 ) 
  
 ([][] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 msgs 
  
 := 
  
 make 
 ([][] 
 byte 
 , 
  
 numMessages 
 ) 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 numMessages 
 ; 
  
 i 
 ++ 
  
 { 
  
 // instantiate a new random source. 
  
 random 
  
 := 
  
 rand 
 . 
 New 
 ( 
  
 rand 
 . 
 NewSource 
 ( 
 time 
 . 
 Now 
 (). 
 UnixNano 
 ()), 
  
 ) 
  
 // Our example data embeds an array of structs, so we'll construct that first. 
  
 sl 
  
 := 
  
 make 
 ([] 
 * 
 exampleproto 
 . 
 SampleStruct 
 , 
  
 5 
 ) 
  
 for 
  
 i 
  
 := 
  
 0 
 ; 
  
 i 
 < 
 int 
 ( 
 random 
 . 
 Int63n 
 ( 
 5 
 ) 
 + 
 1 
 ); 
  
 i 
 ++ 
  
 { 
  
 sl 
 [ 
 i 
 ] 
  
 = 
  
& exampleproto 
 . 
 SampleStruct 
 { 
  
 SubIntCol 
 : 
  
 proto 
 . 
 Int64 
 ( 
 random 
 . 
 Int63 
 ()), 
  
 } 
  
 } 
  
 m 
  
 := 
  
& exampleproto 
 . 
 SampleData 
 { 
  
 BoolCol 
 : 
  
 proto 
 . 
 Bool 
 ( 
 true 
 ), 
  
 BytesCol 
 : 
  
 [] 
 byte 
 ( 
 "some bytes" 
 ), 
  
 Float64Col 
 : 
  
 proto 
 . 
 Float64 
 ( 
 3.14 
 ), 
  
 Int64Col 
 : 
  
 proto 
 . 
 Int64 
 ( 
 123 
 ), 
  
 StringCol 
 : 
  
 proto 
 . 
 String 
 ( 
 "example string value" 
 ), 
  
 // These types require special encoding/formatting to transmit. 
  
 // DATE values are number of days since the Unix epoch. 
  
 DateCol 
 : 
  
 proto 
 . 
 Int32 
 ( 
 int32 
 ( 
 time 
 . 
 Now 
 (). 
 UnixNano 
 () 
  
 / 
  
 86400000000000 
 )), 
  
 // DATETIME uses the literal format. 
  
 DatetimeCol 
 : 
  
 proto 
 . 
 String 
 ( 
 "2022-01-01 12:13:14.000000" 
 ), 
  
 // GEOGRAPHY uses Well-Known-Text (WKT) format. 
  
 GeographyCol 
 : 
  
 proto 
 . 
 String 
 ( 
 "POINT(-122.350220 47.649154)" 
 ), 
  
 // NUMERIC and BIGNUMERIC can be passed as string, or more efficiently 
  
 // using a packed byte representation. 
  
 NumericCol 
 : 
  
 proto 
 . 
 String 
 ( 
 "99999999999999999999999999999.999999999" 
 ), 
  
 BignumericCol 
 : 
  
 proto 
 . 
 String 
 ( 
 "578960446186580977117854925043439539266.34992332820282019728792003956564819967" 
 ), 
  
 // TIME also uses literal format. 
  
 TimeCol 
 : 
  
 proto 
 . 
 String 
 ( 
 "12:13:14.000000" 
 ), 
  
 // TIMESTAMP uses microseconds since Unix epoch. 
  
 TimestampCol 
 : 
  
 proto 
 . 
 Int64 
 ( 
 time 
 . 
 Now 
 (). 
 UnixNano 
 () 
  
 / 
  
 1000 
 ), 
  
 // Int64List is an array of INT64 types. 
  
 Int64List 
 : 
  
 [] 
 int64 
 { 
 2 
 , 
  
 4 
 , 
  
 6 
 , 
  
 8 
 }, 
  
 // This is a required field in the schema, and thus must be present. 
  
 RowNum 
 : 
  
 proto 
 . 
 Int64 
 ( 
 23 
 ), 
  
 // StructCol is a single nested message. 
  
 StructCol 
 : 
  
& exampleproto 
 . 
 SampleStruct 
 { 
  
 SubIntCol 
 : 
  
 proto 
 . 
 Int64 
 ( 
 random 
 . 
 Int63 
 ()), 
  
 }, 
  
 // StructList is a repeated array of a nested message. 
  
 StructList 
 : 
  
 sl 
 , 
  
 } 
  
 b 
 , 
  
 err 
  
 := 
  
 proto 
 . 
 Marshal 
 ( 
 m 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "error generating message %d: %w" 
 , 
  
 i 
 , 
  
 err 
 ) 
  
 } 
  
 msgs 
 [ 
 i 
 ] 
  
 = 
  
 b 
  
 } 
  
 return 
  
 msgs 
 , 
  
 nil 
 } 
 // appendToDefaultStream demonstrates using the managedwriter package to write some example data 
 // to a default stream. 
 func 
  
 appendToDefaultStream 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 datasetID 
 , 
  
 tableID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "myproject" 
  
 // datasetID := "mydataset" 
  
 // tableID := "mytable" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 // Instantiate a managedwriter client to handle interactions with the service. 
  
 client 
 , 
  
 err 
  
 := 
  
 managedwriter 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 projectID 
 , 
  
 managedwriter 
 . 
  WithMultiplexing 
 
 (), 
  
 // Enables connection sharing. 
  
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "managedwriter.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Close the client when we exit the function. 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // We need to communicate the descriptor of the protocol buffer message we're using, which 
  
 // is analagous to the "schema" for the message.  Both SampleData and SampleStruct are 
  
 // two distinct messages in the compiled proto file, so we'll use adapt.NormalizeDescriptor 
  
 // to unify them into a single self-contained descriptor representation. 
  
 var 
  
 m 
  
 * 
 exampleproto 
 . 
 SampleData 
  
 descriptorProto 
 , 
  
 err 
  
 := 
  
 adapt 
 . 
  NormalizeDescriptor 
 
 ( 
 m 
 . 
 ProtoReflect 
 (). 
 Descriptor 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "NormalizeDescriptor: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Build the formatted reference to the destination table. 
  
 tableReference 
  
 := 
  
 managedwriter 
 . 
  TableParentFromParts 
 
 ( 
 projectID 
 , 
  
 datasetID 
 , 
  
 tableID 
 ) 
  
 // Instantiate a ManagedStream, which manages low level details like connection state and provides 
  
 // additional features like a future-like callback for appends, etc.  Default streams are provided by 
  
 // the system, so there's no need to create them. 
  
 managedStream 
 , 
  
 err 
  
 := 
  
 client 
 . 
  NewManagedStream 
 
 ( 
 ctx 
 , 
  
 managedwriter 
 . 
  WithType 
 
 ( 
 managedwriter 
 . 
  DefaultStream 
 
 ), 
  
 managedwriter 
 . 
  WithDestinationTable 
 
 ( 
 tableReference 
 ), 
  
 managedwriter 
 . 
  WithSchemaDescriptor 
 
 ( 
 descriptorProto 
 ), 
  
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewManagedStream: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Automatically close the writer when we're done. 
  
 defer 
  
 managedStream 
 . 
 Close 
 () 
  
 // First, we'll append a single row. 
  
 rows 
 , 
  
 err 
  
 := 
  
 generateExampleDefaultMessages 
 ( 
 1 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "generateExampleMessages: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // We can append data asyncronously, so we'll check our appends at the end. 
  
 var 
  
 results 
  
 [] 
 * 
 managedwriter 
 . 
  AppendResult 
 
  
 result 
 , 
  
 err 
  
 := 
  
 managedStream 
 . 
  AppendRows 
 
 ( 
 ctx 
 , 
  
 rows 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "AppendRows first call error: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 result 
 ) 
  
 // This time, we'll append three more rows in a single request. 
  
 rows 
 , 
  
 err 
  
 = 
  
 generateExampleMessages 
 ( 
 3 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "generateExampleMessages: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 result 
 , 
  
 err 
  
 = 
  
 managedStream 
 . 
  AppendRows 
 
 ( 
 ctx 
 , 
  
 rows 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "AppendRows second call error: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 result 
 ) 
  
 // Finally, we'll append two more rows. 
  
 rows 
 , 
  
 err 
  
 = 
  
 generateExampleMessages 
 ( 
 2 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "generateExampleMessages: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 result 
 , 
  
 err 
  
 = 
  
 managedStream 
 . 
  AppendRows 
 
 ( 
 ctx 
 , 
  
 rows 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "AppendRows third call error: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 results 
  
 = 
  
 append 
 ( 
 results 
 , 
  
 result 
 ) 
  
 // We've been collecting references to our status callbacks to allow us to append in a faster 
  
 // asynchronous fashion.  Normally you could do this in another goroutine or similar, but for 
  
 // this example we'll now iterate through those results and verify they were all successful. 
  
 for 
  
 k 
 , 
  
 v 
  
 := 
  
 range 
  
 results 
  
 { 
  
 // GetResult blocks until we receive a response from the API. 
  
 recvOffset 
 , 
  
 err 
  
 := 
  
 v 
 . 
  GetResult 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "append %d returned error: %w" 
 , 
  
 k 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Successfully appended data at offset %d.\n" 
 , 
  
 recvOffset 
 ) 
  
 } 
  
 // This stream is a default stream, which means it doesn't require any form of finalization 
  
 // or commit.  The rows were automatically committed to the table. 
  
 return 
  
 nil 
 } 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: