Read and append to objects in zonal buckets

This page describes how to read and make appends to objects stored in zonal buckets, which use the Rapid storage class .

This page shows you how to perform the following operations:

  • Create and write to an appendable object.

  • Read appendable objects.

  • Pause, resume, and finalize appendable objects.

  • Read the tail end of appendable objects.

Before you use this page, you might want to read the following resources:

Make appendable writes to objects

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<google 
 :: 
 storage 
 :: 
 v2 
 :: 
 Object 
>  
 { 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Appendable upload started for object " 
 << 
 object_name 
 << 
 " 
 \n 
 " 
 ; 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "Some data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Wrote initial data. 
 \n 
 " 
 ; 
  
 // Flush the buffered data to the service. This is not a terminal 
  
 // operation. The writer can be used after the flush completes. 
  
 // After a flush, the data is visible to readers. 
  
 auto 
  
 flush_status 
  
 = 
  
 co_await 
  
 writer 
 . 
 Flush 
 (); 
  
 if 
  
 ( 
 ! 
 flush_status 
 . 
 ok 
 ()) 
  
 throw 
  
 std 
 :: 
 runtime_error 
 ( 
 flush_status 
 . 
 message 
 ()); 
  
 std 
 :: 
 cout 
 << 
 "Flush completed. Persisted size is now " 
 << 
 absl 
 :: 
 get<std 
 :: 
 int64_t 
> ( 
 writer 
 . 
 PersistedState 
 ()) 
 << 
 " 
 \n 
 " 
 ; 
  
 // The writer is still open. We can write more data. 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "Some more data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Wrote more data. 
 \n 
 " 
 ; 
  
 // Finalize the upload to make it a regular object. 
  
 co_return 
  
 ( 
 co_await 
  
 writer 
 . 
 Finalize 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // createAndWriteAppendableObject creates and uploads a new appendable object in 
 // a rapid bucket. The object will not be finalized. 
 func 
  
 createAndWriteAppendableObject 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 error 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create a Writer and write some data. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Flush the buffered data to the service. This is not a terminal 
  
 // operation. The Writer can be used after the flush completes. 
  
 // After a flush, the data is visible to readers. 
  
 size 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Flush 
 
 () 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Flush: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Flush completed. Persisted size is now %d\n" 
 , 
  
 size 
 ) 
  
 // The Writer is still open. We can write more data. 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some more data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Close the Writer to flush any remaining buffered data. 
  
 // The object will be unfinalized, which means another writer can 
  
 // later append to the object. 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded object %v\n" 
 , 
  
 object 
 ) 
  
 return 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
. AppendableUploadWriteableByteChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
. CloseAction 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. FlushPolicy 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.io.ByteStreams 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.nio.channels.FileChannel 
 ; 
 import 
  
 java.nio.channels.ReadableByteChannel 
 ; 
 import 
  
 java.nio.file.Paths 
 ; 
 import 
  
 java.util.Locale 
 ; 
 public 
  
 class 
 CreateAndWriteAppendableObject 
  
 { 
  
 public 
  
 static 
  
 void 
  
 createAndWriteAppendableObject 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 String 
  
 filePath 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The path to the file to upload 
  
 // String filePath = "path/to/your/file"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  BlobInfo 
 
  
 blobInfo 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 blobId 
 ). 
 build 
 (); 
  
 int 
  
 flushSize 
  
 = 
  
 64 
  
 * 
  
 1000 
 ; 
  
  FlushPolicy 
 
 . 
  MaxFlushSizeFlushPolicy 
 
  
 flushPolicy 
  
 = 
  
  FlushPolicy 
 
 . 
  maxFlushSize 
 
 ( 
 flushSize 
 ); 
  
  BlobAppendableUploadConfig 
 
  
 config 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 () 
  
 . 
  withCloseAction 
 
 ( 
  CloseAction 
 
 . 
 FINALIZE_WHEN_CLOSING 
 ) 
  
 . 
  withFlushPolicy 
 
 ( 
 flushPolicy 
 ); 
  
  BlobAppendableUpload 
 
  
 uploadSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 blobInfo 
 , 
  
 config 
 ); 
  
 try 
  
 ( 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 uploadSession 
 . 
  open 
 
 (); 
  
 ReadableByteChannel 
  
 readableByteChannel 
  
 = 
  
 FileChannel 
 . 
 open 
 ( 
 Paths 
 . 
 get 
 ( 
 filePath 
 ))) 
  
 { 
  
 ByteStreams 
 . 
 copy 
 ( 
 readableByteChannel 
 , 
  
 channel 
 ); 
  
 // Since the channel is in a try-with-resources block, channel.close() 
  
 // will be implicitly called here, which triggers the finalization. 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 ex 
 ) 
  
 { 
  
 throw 
  
 new 
  
 IOException 
 ( 
 "Failed to upload to object " 
  
 + 
  
 blobId 
 . 
  toGsUtilUri 
 
 (), 
  
 ex 
 ); 
  
 } 
  
  BlobInfo 
 
  
 result 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Object %s successfully uploaded" 
 , 
  
  result 
 
 . 
  getBlobId 
 
 (). 
  toGsUtilUriWithGeneration 
 
 ()); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  async 
 def 
  
 storage_create_and_write_appendable_object 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Uploads an appendable object to zonal bucket. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 generation 
 = 
 0 
 , 
 # throws `FailedPrecondition` if object already exists. 
 ) 
 # This creates a new appendable object of size 0 and opens it for appending. 
 await 
 writer 
 . 
 open 
 () 
 # appends data to the object 
 # you can perform `.append` multiple times as needed. Data will be appended 
 # to the end of the object. 
 await 
 writer 
 . 
 append 
 ( 
 b 
 "Some data" 
 ) 
 # Once all appends are done, close the gRPC bidirectional stream. 
 await 
 writer 
 . 
 close 
 () 
 print 
 ( 
 f 
 "Appended object 
 { 
 object_name 
 } 
 created of size 
 { 
 writer 
 . 
 persisted_size 
 } 
 bytes." 
 ) 
 

Read objects

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 // This helps consume the data from the read operation. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 auto 
  
 [ 
 reader 
 , 
  
 token 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 co_return 
  
 co_await 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 reader 
 ), 
  
 std 
 :: 
 move 
 ( 
 token 
 )); 
 }; 
 

The following sample performs a full read on a single object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 // This helps consume the data from the read operation. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 auto 
  
 [ 
 reader 
 , 
  
 token 
 ] 
  
 = 
  
 descriptor 
 . 
 ReadFromOffset 
 ( 
 0 
 ); 
  
 co_return 
  
 co_await 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 reader 
 ), 
  
 std 
 :: 
 move 
 ( 
 token 
 )); 
 }; 
 

The following sample performs ranged reads on a single object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 auto 
  
 [ 
 r1 
 , 
  
 t1 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 auto 
  
 [ 
 r2 
 , 
  
 t2 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 auto 
  
 [ 
 r3 
 , 
  
 t3 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 1024 
 , 
  
 1024 
 ); 
  
 auto 
  
 c1 
  
 = 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 r1 
 ), 
  
 std 
 :: 
 move 
 ( 
 t1 
 )); 
  
 auto 
  
 c2 
  
 = 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 r2 
 ), 
  
 std 
 :: 
 move 
 ( 
 t2 
 )); 
  
 auto 
  
 c3 
  
 = 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 r3 
 ), 
  
 std 
 :: 
 move 
 ( 
 t3 
 )); 
  
 co_return 
  
 ( 
 co_await 
  
 std 
 :: 
 move 
 ( 
 c1 
 )) 
  
 + 
  
 ( 
 co_await 
  
 std 
 :: 
 move 
 ( 
 c2 
 )) 
  
 + 
  
 ( 
 co_await 
  
 std 
 :: 
 move 
 ( 
 c3 
 )); 
 }; 
 

The following sample performs ranged reads on multiple objects (a single read per object):

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name1 
 , 
  
 std 
 :: 
 string 
  
 object_name2 
 , 
  
 std 
 :: 
 string 
  
 object_name3 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<void> 
  
 { 
  
 // List of object names to read (passed as arguments) 
  
 std 
 :: 
 vector<std 
 :: 
 string 
>  
 object_names 
  
 = 
  
 { 
 object_name1 
 , 
  
 object_name2 
 , 
  
 object_name3 
 }; 
  
 std 
 :: 
 vector<google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>>  
 futures 
 ; 
  
 // Start ranged reads for all objects and collect futures 
  
 // This example opens multiple objects, not one object multiple times. 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 name 
  
 : 
  
 object_names 
 ) 
  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 name 
 )). 
 value 
 (); 
  
 auto 
  
 [ 
 reader 
 , 
  
 token 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 futures 
 . 
 push_back 
 ( 
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 reader 
 ), 
  
 std 
 :: 
 move 
 ( 
 token 
 ))); 
  
 } 
  
 // Process futures as they become ready and print results 
  
 while 
  
 ( 
 ! 
 futures 
 . 
 empty 
 ()) 
  
 { 
  
 bool 
  
 progress_made 
  
 = 
  
 false 
 ; 
  
 for 
  
 ( 
 std 
 :: 
 size_t 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 futures 
 . 
 size 
 (); 
  
 ++ 
 i 
 ) 
  
 { 
  
 if 
  
 ( 
 futures 
 [ 
 i 
 ]. 
 is_ready 
 ()) 
  
 { 
  
 // Check if the future is ready 
  
 auto 
  
 count 
  
 = 
  
 futures 
 [ 
 i 
 ]. 
 get 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Object " 
 << 
 object_names 
 [ 
 i 
 ] 
 << 
 " read returned " 
 << 
 count 
 << 
 " newlines 
 \n 
 " 
 ; 
  
 futures 
 . 
 erase 
 ( 
 futures 
 . 
 begin 
 () 
  
 + 
  
 i 
 ); 
  
 object_names 
 . 
 erase 
 ( 
 object_names 
 . 
 begin 
 () 
  
 + 
  
 i 
 ); 
  
 progress_made 
  
 = 
  
 true 
 ; 
  
 break 
 ; 
  
 // Restart the loop after modifying the vectors 
  
 } 
  
 } 
  
 if 
  
 ( 
 ! 
 progress_made 
 ) 
  
 { 
  
 std 
 :: 
 this_thread 
 :: 
 sleep_for 
 ( 
  
 std 
 :: 
 chrono 
 :: 
 milliseconds 
 ( 
 10 
 )); 
  
 // Avoid busy spin 
  
 } 
  
 } 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // openObjectSingleRangedRead reads a single range from an object in a 
 // rapid bucket. 
 func 
  
 openObjectSingleRangedRead 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Read the first KiB of the file and copy into a buffer. 
  
 r 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewRangeReader 
 
 ( 
 ctx 
 , 
  
 0 
 , 
  
 1024 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewRangeReader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 r 
 . 
 Close 
 () 
  
 buf 
  
 := 
  
 new 
 ( 
 bytes 
 . 
 Buffer 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 io 
 . 
 Copy 
 ( 
 buf 
 , 
  
 r 
 ); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "copying data: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read the first 1024 bytes of %v into a buffer\n" 
 , 
  
 object 
 ) 
  
 return 
  
 buf 
 . 
  Bytes 
 
 (), 
  
 nil 
 } 
 

The following sample performs a full read on a single object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // OpenObjectReadFullObject reads a full object's data from a 
 // rapid bucket. 
 func 
  
 openObjectReadFullObject 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Read the first KiB of the file and copy into a buffer. 
  
 r 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewReader 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewReader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 r 
 . 
 Close 
 () 
  
 buf 
  
 := 
  
 new 
 ( 
 bytes 
 . 
 Buffer 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 io 
 . 
 Copy 
 ( 
 buf 
 , 
  
 r 
 ); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "copying data: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read the data of %v into a buffer\n" 
 , 
  
 object 
 ) 
  
 return 
  
 buf 
 . 
  Bytes 
 
 (), 
  
 nil 
 } 
 

The following sample performs ranged reads on a single object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // openObjectMultipleRangedRead opens a single object using 
 // MultiRangeDownloader to download multiple ranges. 
 func 
  
 openObjectMultipleRangedRead 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([][] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create the MultiRangeDownloader, which opens a stream to the object. 
  
 mrd 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewMultiRangeDownloader 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewMultiRangeDownloader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Add some 1 KiB ranges to download. This call is non-blocking. The 
  
 // provided callback is invoked when the range download is complete. 
  
 startOffsets 
  
 := 
  
 [] 
 int64 
 { 
 0 
 , 
  
 1024 
 , 
  
 2048 
 } 
  
 var 
  
 dataBufs 
  
 [ 
 3 
 ] 
 bytes 
 . 
 Buffer 
  
 var 
  
 errs 
  
 [] 
 error 
  
 for 
  
 i 
 , 
  
 off 
  
 := 
  
 range 
  
 startOffsets 
  
 { 
  
 mrd 
 . 
  Add 
 
 ( 
& dataBufs 
 [ 
 i 
 ], 
  
 off 
 , 
  
 1024 
 , 
  
 func 
 ( 
 off 
 , 
  
 length 
  
 int64 
 , 
  
 err 
  
 error 
 ) 
  
 { 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 errs 
  
 = 
  
 append 
 ( 
 errs 
 , 
  
 err 
 ) 
  
 } 
  
 else 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "downloaded range at offset %v" 
 , 
  
 off 
 ) 
  
 } 
  
 }) 
  
 } 
  
 // Wait for all downloads to complete. 
  
 mrd 
 . 
 Wait 
 () 
  
 if 
  
 len 
 ( 
 errs 
 ) 
 > 
 0 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "one or more downloads failed; errors: %v" 
 , 
  
 errs 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 mrd 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "MultiRangeDownloader.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read the ranges of %v into memory\n" 
 , 
  
 object 
 ) 
  
 // Collect the byte slices 
  
 var 
  
 byteSlices 
  
 [][] 
 byte 
  
 for 
  
 _ 
 , 
  
 buf 
  
 := 
  
 range 
  
 dataBufs 
  
 { 
  
 byteSlices 
  
 = 
  
 append 
 ( 
 byteSlices 
 , 
  
 buf 
 . 
  Bytes 
 
 ()) 
  
 } 
  
 return 
  
 byteSlices 
 , 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenObjectSingleRangedRead 
  
 { 
  
 public 
  
 static 
  
 void 
  
 openObjectSingleRangedRead 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 long 
  
 offset 
 , 
  
 int 
  
 length 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The beginning of the range 
  
 // long offset = 0 
  
 // The maximum number of bytes to read from the object. 
  
 // int length = 64; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureBlobReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 blobReadSession 
  
 = 
  
 futureBlobReadSession 
 . 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
 // Define the range of bytes to read. 
  
  RangeSpec 
 
  
 rangeSpec 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 offset 
 , 
  
 length 
 ); 
  
 ApiFuture<byte 
 [] 
>  
 future 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 rangeSpec 
 )); 
  
 // Wait for the read to complete. 
  
 byte 
 [] 
  
 bytes 
  
 = 
  
 future 
 . 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Successfully read " 
  
 + 
  
 bytes 
 . 
 length 
  
 + 
  
 " bytes from object " 
  
 + 
  
 objectName 
  
 + 
  
 " in bucket " 
  
 + 
  
 bucketName 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

The following sample performs a full read on a single object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadAsChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 java.nio.ByteBuffer 
 ; 
 import 
  
 java.nio.channels.ScatteringByteChannel 
 ; 
 import 
  
 java.util.Locale 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenObjectReadFullObject 
  
 { 
  
 public 
  
 static 
  
 void 
  
 openObjectReadFullObject 
 ( 
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object to read 
  
 // String objectName = "your-object-name"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureBlobReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 blobReadSession 
  
 = 
  
 futureBlobReadSession 
 . 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
  ReadAsChannel 
 
  
 readAsChannelConfig 
  
 = 
  
  ReadProjectionConfigs 
 
 . 
  asChannel 
 
 (); 
  
 try 
  
 ( 
 ScatteringByteChannel 
  
 channel 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
 readAsChannelConfig 
 )) 
  
 { 
  
 long 
  
 totalBytesRead 
  
 = 
  
 0 
 ; 
  
 ByteBuffer 
  
 buffer 
  
 = 
  
 ByteBuffer 
 . 
 allocate 
 ( 
 64 
  
 * 
  
 1024 
 ); 
  
 int 
  
 bytesRead 
 ; 
  
 while 
  
 (( 
 bytesRead 
  
 = 
  
 channel 
 . 
 read 
 ( 
 buffer 
 )) 
  
 != 
  
 - 
 1 
 ) 
  
 { 
  
 totalBytesRead 
  
 += 
  
 bytesRead 
 ; 
  
 buffer 
 . 
 clear 
 (); 
  
 } 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Successfully read a total of %d bytes from object %s%n" 
 , 
  
 totalBytesRead 
 , 
  
 blobId 
 . 
  toGsUtilUri 
 
 ()); 
  
 } 
  
 } 
  
 } 
  
 } 
 } 
 

The following sample performs ranged reads on a single object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.collect.ImmutableList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenObjectMultipleRangedRead 
  
 { 
  
 public 
  
 static 
  
 void 
  
 openObjectMultipleRangedRead 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 long 
  
 offset1 
 , 
  
 int 
  
 length1 
 , 
  
 long 
  
 offset2 
 , 
  
 int 
  
 length2 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The beginning of the range 1 
  
 // long offset = 0 
  
 // The maximum number of bytes to read in range 1 
  
 // int length = 16; 
  
 // The beginning of the range 2 
  
 // long offset = 16 
  
 // The maximum number of bytes to read in range 2 
  
 // int length = 32; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureBlobReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
  RangeSpec 
 
  
 rangeSpec1 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 offset1 
 , 
  
 length1 
 ); 
  
  RangeSpec 
 
  
 rangeSpec2 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 offset2 
 , 
  
 length2 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 blobReadSession 
  
 = 
  
 futureBlobReadSession 
 . 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
 ApiFuture<byte 
 [] 
>  
 future1 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 rangeSpec1 
 )); 
  
 ApiFuture<byte 
 [] 
>  
 future2 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 rangeSpec2 
 )); 
  
 List<byte 
 [] 
>  
 allBytes 
  
 = 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 ImmutableList 
 . 
 of 
 ( 
 future1 
 , 
  
 future2 
 )). 
 get 
 (); 
  
 byte 
 [] 
  
 bytes1 
  
 = 
  
 allBytes 
 . 
 get 
 ( 
 0 
 ); 
  
 byte 
 [] 
  
 bytes2 
  
 = 
  
 allBytes 
 . 
 get 
 ( 
 1 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Successfully read " 
  
 + 
  
 bytes1 
 . 
 length 
  
 + 
  
 " bytes from range 1 and " 
  
 + 
  
 bytes2 
 . 
 length 
  
 + 
  
 " bytes from range 2." 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

The following sample performs ranged reads on multiple objects:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadAsFutureBytes 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenMultipleObjectsRangedRead 
  
 { 
  
 public 
  
 static 
  
 void 
  
 multipleObjectsSingleRangedRead 
 ( 
  
 String 
  
 bucketName 
 , 
  
 List<String> 
  
 objectNames 
 , 
  
 long 
  
 startOffset 
 , 
  
 int 
  
 length 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS objects to read 
  
 // List<String> objectName = Arrays.asList("object-1", "object-2", "object-3"); 
  
  RangeSpec 
 
  
 singleRange 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 startOffset 
 , 
  
 length 
 ); 
  
  ReadAsFutureBytes 
 
  
 rangeConfig 
  
 = 
  
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 singleRange 
 ); 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
 List<ApiFuture<byte 
 [] 
>>  
 futuresToWaitOn 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 "Initiating single ranged read [%d, %d] on %d objects...%n" 
 , 
  
 startOffset 
 , 
  
 startOffset 
  
 + 
  
 length 
  
 - 
  
 1 
 , 
  
 objectNames 
 . 
 size 
 ()); 
  
 for 
  
 ( 
 String 
  
 objectName 
  
 : 
  
 objectNames 
 ) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
 ApiFuture<byte 
 [] 
>  
 readAndCloseFuture 
  
 = 
  
  ApiFutures 
 
 . 
  transformAsync 
 
 ( 
  
 futureReadSession 
 , 
  
 ( 
 BlobReadSession 
  
 session 
 ) 
  
 - 
>  
 { 
  
 ApiFuture<byte 
 [] 
>  
 readFuture 
  
 = 
  
 session 
 . 
 readAs 
 ( 
 rangeConfig 
 ); 
  
 readFuture 
 . 
 addListener 
 ( 
  
 () 
  
 - 
>  
 { 
  
 try 
  
 { 
  
 session 
 . 
 close 
 (); 
  
 } 
  
 catch 
  
 ( 
 java 
 . 
 io 
 . 
 IOException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 println 
 ( 
  
 "WARN: Background error while closing session: " 
  
 + 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 return 
  
 readFuture 
 ; 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 futuresToWaitOn 
 . 
 add 
 ( 
 readAndCloseFuture 
 ); 
  
 } 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 futuresToWaitOn 
 ). 
 get 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "All concurrent single-ranged read operations are complete." 
 ); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  async 
 def 
  
 storage_open_object_single_ranged_read 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 start_byte 
 , 
 size 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads a range of bytes from an object. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # requested range will be downloaded into this buffer, user may provide 
 # their own buffer or file-like object. 
 output_buffer 
 = 
 BytesIO 
 () 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 start_byte 
 , 
 size 
 , 
 output_buffer 
 )]) 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 # Downloaded size can differ from requested size if object is smaller. 
 # mrd will download at most up to the end of the object. 
 downloaded_size 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 print 
 ( 
 f 
 "Downloaded 
 { 
 downloaded_size 
 } 
 bytes from 
 { 
 object_name 
 } 
 " 
 ) 
 

The following sample performs a full read on a single object:

  async 
 def 
  
 storage_open_object_read_full_object 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads the entire content of an object using a multi-range downloader. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 # mrd = Multi-Range-Downloader 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # This could be any buffer or file-like object. 
 output_buffer 
 = 
 BytesIO 
 () 
 # A download range of (0, 0) means to read from the beginning to the end. 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 0 
 , 
 0 
 , 
 output_buffer 
 )]) 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 downloaded_bytes 
 = 
 output_buffer 
 . 
 getvalue 
 () 
 print 
 ( 
 f 
 "Downloaded all 
 { 
 len 
 ( 
 downloaded_bytes 
 ) 
 } 
 bytes from object 
 { 
 object_name 
 } 
 in bucket 
 { 
 bucket_name 
 } 
 ." 
 ) 
 

The following sample performs ranged reads on a single object:

  async 
 def 
  
 storage_open_object_multiple_ranged_read 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads multiple ranges of bytes from a single object into different buffers. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # Specify four different buffers to download ranges into. 
 buffers 
 = 
 [ 
 BytesIO 
 (), 
 BytesIO 
 (), 
 BytesIO 
 (), 
 BytesIO 
 ()] 
 # Define the ranges to download. Each range is a tuple of (start_byte, size, buffer). 
 # All ranges will download 10 bytes from different starting positions. 
 # We choose arbitrary start bytes for this example. An object should be large enough. 
 # A user can choose any start byte between 0 and `object_size`. 
 # If `start_bytes` is greater than `object_size`, mrd will throw an error. 
 ranges 
 = 
 [ 
 ( 
 0 
 , 
 10 
 , 
 buffers 
 [ 
 0 
 ]), 
 ( 
 20 
 , 
 10 
 , 
 buffers 
 [ 
 1 
 ]), 
 ( 
 40 
 , 
 10 
 , 
 buffers 
 [ 
 2 
 ]), 
 ( 
 60 
 , 
 10 
 , 
 buffers 
 [ 
 3 
 ]), 
 ] 
 await 
 mrd 
 . 
 download_ranges 
 ( 
 ranges 
 ) 
 finally 
 : 
 await 
 mrd 
 . 
 close 
 () 
 # Print the downloaded content from each buffer. 
 for 
 i 
 , 
 output_buffer 
 in 
 enumerate 
 ( 
 buffers 
 ): 
 downloaded_size 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 print 
 ( 
 f 
 "Downloaded 
 { 
 downloaded_size 
 } 
 bytes into buffer 
 { 
 i 
  
 + 
  
 1 
 } 
 from start byte 
 { 
 ranges 
 [ 
 i 
 ][ 
 0 
 ] 
 } 
 : 
 { 
 output_buffer 
 . 
 getvalue 
 () 
 } 
 " 
 ) 
 

The following sample performs ranged reads on multiple objects:

  async 
 def 
  
 storage_open_multiple_objects_ranged_read 
 ( 
 bucket_name 
 , 
 object_names 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads a range of bytes from multiple objects concurrently. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 async 
 def 
  
 _download_range 
 ( 
 object_name 
 ): 
  
 """Helper coroutine to download a range from a single object.""" 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # Each object downloads the first 100 bytes. 
 start_byte 
 = 
 0 
 size 
 = 
 100 
 # requested range will be downloaded into this buffer, user may provide 
 # their own buffer or file-like object. 
 output_buffer 
 = 
 BytesIO 
 () 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 start_byte 
 , 
 size 
 , 
 output_buffer 
 )]) 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 # Downloaded size can differ from requested size if object is smaller. 
 # mrd will download at most up to the end of the object. 
 downloaded_size 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 print 
 ( 
 f 
 "Downloaded 
 { 
 downloaded_size 
 } 
 bytes from 
 { 
 object_name 
 } 
 " 
 ) 
 download_tasks 
 = 
 [ 
 _download_range 
 ( 
 name 
 ) 
 for 
 name 
 in 
 object_names 
 ] 
 await 
 asyncio 
 . 
 gather 
 ( 
 * 
 download_tasks 
 ) 
 

Pause and resume an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<google 
 :: 
 storage 
 :: 
 v2 
 :: 
 Object 
>  
 { 
  
 // Start an appendable upload and write some data. 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 )) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Appendable upload started. 
 \n 
 " 
 ; 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "paused data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 // The writer is closed, but the upload is not finalized. This "pauses" the 
  
 // upload, as the object remains appendable. 
  
 auto 
  
 close_status 
  
 = 
  
 co_await 
  
 writer 
 . 
 Close 
 (); 
  
 if 
  
 ( 
 ! 
 close_status 
 . 
 ok 
 ()) 
  
 throw 
  
 std 
 :: 
 runtime_error 
 ( 
 close_status 
 . 
 message 
 ()); 
  
 std 
 :: 
 cout 
 << 
 "Upload paused. 
 \n 
 " 
 ; 
  
 // To resume the upload we need the object's generation. We can use the 
  
 // regular GCS client to get the latest metadata. 
  
 auto 
  
 regular_client 
  
 = 
  
 gcs 
 :: 
 Client 
 (); 
  
 auto 
  
 metadata 
  
 = 
  
 regular_client 
 . 
 GetObjectMetadata 
 ( 
 bucket_name 
 , 
  
 object_name 
 ). 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Object generation is " 
 << 
 metadata 
 . 
 generation 
 () 
 << 
 " 
 \n 
 " 
 ; 
  
 // Now resume the upload. 
  
 std 
 :: 
 tie 
 ( 
 writer 
 , 
  
 token 
 ) 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 ResumeAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 , 
  
 metadata 
 . 
 generation 
 ())) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Upload resumed from offset " 
 << 
 absl 
 :: 
 get<std 
 :: 
 int64_t 
> ( 
 writer 
 . 
 PersistedState 
 ()) 
 << 
 " 
 \n 
 " 
 ; 
  
 // Append the rest of the data. 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "resumed data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 // Finalize the upload and return the object metadata. 
  
 co_return 
  
 ( 
 co_await 
  
 writer 
 . 
 Finalize 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // pauseAndResumeAppendableUpload creates a new unfinalized appendable object, 
 // closes the Writer, then re-opens the object for writing using 
 // NewWriterFromAppendableObject. 
 func 
  
 pauseAndResumeAppendableUpload 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 error 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Start an appendable upload and write some data. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // The writer is closed, but the upload is not finalized. This "pauses" the 
  
 // upload, as the object remains appendable. 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded unfinalized object %v\n" 
 , 
  
 object 
 ) 
  
 // To resume the upload we need the object's generation. We can get this 
  
 // from the previous Writer after close. 
  
 gen 
  
 := 
  
 writer 
 . 
 Attrs 
 (). 
  Generation 
 
  
 // Now resume the upload. Writer options including finalization can be 
  
 // passed on calling this constructor. 
  
 appendWriter 
 , 
  
 offset 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  Generation 
 
 ( 
 gen 
 ). 
  NewWriterFromAppendableObject 
 
 ( 
  
 ctx 
 , 
  
& storage 
 . 
  AppendableWriterOpts 
 
 { 
  
 FinalizeOnClose 
 : 
  
 true 
 , 
  
 }, 
  
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewWriterFromAppendableObject: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Resuming upload from offset %v\n" 
 , 
  
 offset 
 ) 
  
 // Append the rest of the data and close the Writer to finalize. 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 append Write 
r 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "resumed data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "appendWriter.Write: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 appendWriter 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded and finalized object %v\n" 
 , 
  
 object 
 ) 
  
 return 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  import 
  
 com.google.cloud.storage. Blob 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
. AppendableUploadWriteableByteChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
. CloseAction 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageChannelUtils 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.io.ByteStreams 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.nio.ByteBuffer 
 ; 
 import 
  
 java.nio.channels.FileChannel 
 ; 
 import 
  
 java.nio.charset.StandardCharsets 
 ; 
 import 
  
 java.nio.file.Paths 
 ; 
 import 
  
 java.util.Locale 
 ; 
 public 
  
 class 
 PauseAndResumeAppendableObjectUpload 
  
 { 
  
 public 
  
 static 
  
 void 
  
 pauseAndResumeAppendableObjectUpload 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 String 
  
 filePath 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The path to the file to upload 
  
 // String filePath = "path/to/your/file"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  BlobInfo 
 
  
 blobInfo 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 blobId 
 ). 
 build 
 (); 
  
 // --- Step 1: Initial string write (PAUSE) --- 
  
 // Default close action will be CLOSE_WITHOUT_FINALIZING 
  
  BlobAppendableUploadConfig 
 
  
 initialConfig 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 (); 
  
  BlobAppendableUpload 
 
  
 initialUploadSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 blobInfo 
 , 
  
 initialConfig 
 ); 
  
 try 
  
 ( 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 initialUploadSession 
 . 
  open 
 
 ()) 
  
 { 
  
 String 
  
 initialData 
  
 = 
  
 "Initial data segment.\n" 
 ; 
  
 ByteBuffer 
  
 buffer 
  
 = 
  
 ByteBuffer 
 . 
 wrap 
 ( 
 initialData 
 . 
 getBytes 
 ( 
 StandardCharsets 
 . 
 UTF_8 
 )); 
  
 long 
  
 totalBytesWritten 
  
 = 
  
  StorageChannelUtils 
 
 . 
  blockingEmptyTo 
 
 ( 
 buffer 
 , 
  
 channel 
 ); 
  
 channel 
 . 
  flush 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Wrote %d bytes (initial string) in first segment.\n" 
 , 
  
 totalBytesWritten 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 ex 
 ) 
  
 { 
  
 throw 
  
 new 
  
 IOException 
 ( 
 "Failed initial upload to object " 
  
 + 
  
 blobId 
 . 
  toGsUtilUri 
 
 (), 
  
 ex 
 ); 
  
 } 
  
  Blob 
 
  
 existingBlob 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 long 
  
 currentObjectSize 
  
 = 
  
 existingBlob 
 . 
  getSize 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Initial upload paused. Currently uploaded size: %d bytes\n" 
 , 
  
 currentObjectSize 
 ); 
  
 // --- Step 2: Resume upload with file content and finalize --- 
  
 // Use FINALIZE_WHEN_CLOSING to ensure the object is finalized on channel closure. 
  
  BlobAppendableUploadConfig 
 
  
 resumeConfig 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 (). 
  withCloseAction 
 
 ( 
  CloseAction 
 
 . 
 FINALIZE_WHEN_CLOSING 
 ); 
  
  BlobAppendableUpload 
 
  
 resumeUploadSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 existingBlob 
 . 
  toBuilder 
 
 (). 
 build 
 (), 
  
 resumeConfig 
 ); 
  
 try 
  
 ( 
 FileChannel 
  
 fileChannel 
  
 = 
  
 FileChannel 
 . 
 open 
 ( 
 Paths 
 . 
 get 
 ( 
 filePath 
 )); 
  
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 resumeUploadSession 
 . 
  open 
 
 ()) 
  
 { 
  
 long 
  
 bytesToAppend 
  
 = 
  
 fileChannel 
 . 
 size 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Appending the entire file (%d bytes) after the initial string.\n" 
 , 
  
 bytesToAppend 
 ); 
  
 ByteStreams 
 . 
 copy 
 ( 
 fileChannel 
 , 
  
 channel 
 ); 
  
 } 
  
  BlobInfo 
 
  
 result 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "\nObject %s successfully resumed and finalized. Total size: %d bytes\n" 
 , 
  
  result 
 
 . 
  getBlobId 
 
 (). 
  toGsUtilUriWithGeneration 
 
 (), 
  
  result 
 
 . 
  getSize 
 
 ()); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  async 
 def 
  
 storage_pause_and_resume_appendable_upload 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Demonstrates pausing and resuming an appendable object upload. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer1 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 ) 
 await 
 writer1 
 . 
 open 
 () 
 await 
 writer1 
 . 
 append 
 ( 
 b 
 "First part of the data. " 
 ) 
 print 
 ( 
 f 
 "Appended 
 { 
 writer1 
 . 
 persisted_size 
 } 
 bytes with the first writer." 
 ) 
 # 2. After appending some data, close the writer to "pause" the upload. 
 #  NOTE: you can pause indefinitely and still read the conetent uploaded so far using MRD. 
 await 
 writer1 
 . 
 close 
 () 
 print 
 ( 
 "First writer closed. Upload is 'paused'." 
 ) 
 # 3. Create a new writer, passing the generation number from the previous 
 #    writer. This is a precondition to ensure that the object hasn't been 
 #    modified since we last accessed it. 
 generation_to_resume 
 = 
 writer1 
 . 
 generation 
 print 
 ( 
 f 
 "Generation to resume from is: 
 { 
 generation_to_resume 
 } 
 " 
 ) 
 writer2 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 generation 
 = 
 generation_to_resume 
 , 
 ) 
 # 4. Open the new writer. 
 try 
 : 
 await 
 writer2 
 . 
 open 
 () 
 # 5. Append some more data using the new writer. 
 await 
 writer2 
 . 
 append 
 ( 
 b 
 "Second part of the data." 
 ) 
 print 
 ( 
 f 
 "Appended more data. Total size is now 
 { 
 writer2 
 . 
 persisted_size 
 } 
 bytes." 
 ) 
 finally 
 : 
 # 6. Finally, close the new writer. 
 if 
 writer2 
 . 
 _is_stream_open 
 : 
 await 
 writer2 
 . 
 close 
 () 
 print 
 ( 
 "Second writer closed. Full object uploaded." 
 ) 
 

Finalize an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<google 
 :: 
 storage 
 :: 
 v2 
 :: 
 Object 
>  
 { 
  
 // Start an appendable upload. 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Appendable upload started. 
 \n 
 " 
 ; 
  
 // Write some data. 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "some data to finalize 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 // Finalize the upload. This makes the object non-appendable. 
  
 // No more data can be written to this writer. 
  
 auto 
  
 object 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Finalize 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Upload finalized. 
 \n 
 " 
 ; 
  
 co_return 
  
 object 
 ; 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // finalizeAppendableObject creates, uploads and finalizes a new object in 
 // a rapid bucket. 
 func 
  
 finalizeAppendableObject 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 error 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create a Writer and set FinalizeOnClose so that the object will be 
  
 // finalized after the write is complete. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 writer 
 . 
 FinalizeOnClose 
  
 = 
  
 true 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "some data to finalize\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Close the Writer to flush any remaining buffered data and finalize 
  
 // the upload. This makes the object non-appendable. 
  
 // No more data can be written to this object. 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded and finalized object %v\n" 
 , 
  
 object 
 ) 
  
 return 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  import 
  
 com.google.cloud.storage. Blob 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 public 
  
 class 
 FinalizeAppendableObjectUpload 
  
 { 
  
 public 
  
 static 
  
 void 
  
 finalizeAppendableObjectUpload 
 ( 
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS unfinalized appendable object 
  
 // String objectName = "your-object-name"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  Blob 
 
  
 existingBlob 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 if 
  
 ( 
 existingBlob 
  
 == 
  
 null 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Object " 
  
 + 
  
 objectName 
  
 + 
  
 " not found in bucket " 
  
 + 
  
 bucketName 
 ); 
  
 return 
 ; 
  
 } 
  
  BlobInfo 
 
  
 blobInfoForTakeover 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 existingBlob 
 . 
  getBlobId 
 
 ()). 
 build 
 (); 
  
  BlobAppendableUpload 
 
  
 finalizingSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
  
 blobInfoForTakeover 
 , 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 () 
  
 . 
  withCloseAction 
 
 ( 
  BlobAppendableUploadConfig 
 
 . 
 CloseAction 
 . 
 FINALIZE_WHEN_CLOSING 
 )); 
  
 try 
  
 ( 
  BlobAppendableUpload 
 
 . 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 finalizingSession 
 . 
  open 
 
 ()) 
  
 { 
  
 channel 
 . 
  finalizeAndClose 
 
 (); 
  
 } 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Successfully finalized object " 
  
 + 
  
 objectName 
  
 + 
  
 " in bucket " 
  
 + 
  
 bucketName 
 ); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  async 
 def 
  
 storage_finalize_appendable_object_upload 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Creates, writes to, and finalizes an appendable object. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 generation 
 = 
 0 
 , 
 # throws `FailedPrecondition` if object already exists. 
 ) 
 # This creates a new appendable object of size 0 and opens it for appending. 
 await 
 writer 
 . 
 open 
 () 
 # Appends data to the object. 
 await 
 writer 
 . 
 append 
 ( 
 b 
 "Some data" 
 ) 
 # finalize the appendable object, 
 # NOTE: 
 # 1. once finalized no more appends can be done to the object. 
 # 2. If you don't want to finalize, you can simply call `writer.close` 
 # 3. calling `.finalize()` also closes the grpc-bidi stream, calling 
 #   `.close` after `.finalize` may lead to undefined behavior. 
 object_resource 
 = 
 await 
 writer 
 . 
 finalize 
 () 
 print 
 ( 
 f 
 "Appendable object 
 { 
 object_name 
 } 
 created and finalized." 
 ) 
 print 
 ( 
 "Object Metadata:" 
 ) 
 print 
 ( 
 object_resource 
 ) 
 

Read the tail of an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<void> 
  
 { 
  
 // This coroutine simulates a "tail -f" command on a GCS object. It 
  
 // repeatedly polls an appendable object for new content. In a real 
  
 // application, the object would be written to by a separate process. 
  
 std 
 :: 
 cout 
 << 
 "Polling for content from " 
 << 
 object_name 
 << 
 "... 
 \n\n 
 " 
 ; 
  
 // Start an appendable upload. In a real application this would be done by 
  
 // a separate process, and this application would only know the object name. 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 )) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 int64_t 
  
 bytes_read 
  
 = 
  
 0 
 ; 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
  
 != 
  
 2 
 ; 
  
 ++ 
 i 
 ) 
  
 { 
  
 // In a real application, another process would append data here. 
  
 // We simulate this by writing to the object. 
  
 auto 
  
 content 
  
 = 
  
 "More data for tail example, iteration " 
  
 + 
  
 std 
 :: 
 to_string 
 ( 
 i 
 ) 
  
 + 
  
 " 
 \n 
 " 
 ; 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 content 
 ))) 
  
 . 
 value 
 (); 
  
 ( 
 void 
 ) 
 co_await 
  
 writer 
 . 
 Flush 
 (); 
  
 // Poll for new content by reading from the last known offset. 
  
 auto 
  
 payload 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 ReadObjectRange 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 , 
  
 bytes_read 
 , 
  
 -1 
 )) 
  
 . 
 value 
 (); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 std::string 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 ()); 
  
 bytes_read 
  
 += 
  
 buffer 
 . 
 size 
 (); 
  
 } 
  
 // In a real application you would wait here, e.g. with a timer. 
  
 std 
 :: 
 this_thread 
 :: 
 sleep_for 
 ( 
 std 
 :: 
 chrono 
 :: 
 seconds 
 ( 
 1 
 )); 
  
 } 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "errors" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // readAppendableObjectTail simulates a "tail -f" command on a GCS object. It 
 // repeatedly polls an appendable object for new content. In a real 
 // application, the object would be written to by a separate process. 
 func 
  
 readAppendableObjectTail 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // Set a context timeout. When this timeout is reached, the read stream 
  
 // will be closed, so omit this to tail indefinitely. 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 5 
 * 
 time 
 . 
 Minute 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create a new appendable object and write some data. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
 If 
 ( 
 storage 
 . 
  Conditions 
 
 { 
 DoesNotExist 
 : 
  
 true 
 }). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 gen 
  
 := 
  
 writer 
 . 
 Attrs 
 (). 
  Generation 
 
  
 // Create the MultiRangeDownloader, which opens a read stream to the object. 
  
 mrd 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewMultiRangeDownloader 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewMultiRangeDownloader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // In a goroutine, poll the object. In this example we continue until all the 
  
 // bytes we expect to see were received, but in a real application this could 
  
 // continue to poll indefinitely until some signal is received. 
  
 var 
  
 buf 
  
 bytes 
 . 
 Buffer 
  
 var 
  
 mrdErr 
  
 error 
  
 done 
  
 := 
  
 make 
 ( 
 chan 
  
 bool 
 ) 
  
 go 
  
 func 
 () 
  
 { 
  
 var 
  
 currOff 
  
 int64 
  
 rangeDownloaded 
  
 := 
  
 make 
 ( 
 chan 
  
 bool 
 ) 
  
 for 
  
 buf 
 . 
 Len 
 () 
 < 
 100 
  
 { 
  
 // Add the current range and wait for it to be downloaded. 
  
 // Using a length of 0 will read to the current end of the object. 
  
 // The callback will give the actual number of bytes that were 
  
 // read in each iteration. 
  
 mrd 
 . 
  Add 
 
 ( 
& buf 
 , 
  
 currOff 
 , 
  
 0 
 , 
  
 func 
 ( 
 offset 
 , 
  
 length 
  
 int64 
 , 
  
 err 
  
 error 
 ) 
  
 { 
  
 // After each range is received, update 
  
 // the starting offset based on how many bytes were received. 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 mrdErr 
  
 = 
  
 err 
  
 } 
  
 currOff 
  
 += 
  
 length 
  
 rangeDownloaded 
  
< - 
  
 true 
  
 }) 
  
 // Wait for the range download to complete with a timeout of 10s. 
  
 select 
  
 { 
  
 case 
  
< - 
 rangeDownloaded 
 : 
  
 case 
  
< - 
 time 
 . 
 After 
 ( 
 10 
  
 * 
  
 time 
 . 
 Second 
 ): 
  
 mrdErr 
  
 = 
  
 mrd 
 . 
  Error 
 
 () 
  
 if 
  
 mrdErr 
  
 == 
  
 nil 
  
 { 
  
 mrdErr 
  
 = 
  
 errors 
 . 
 New 
 ( 
 "range request timed out after 10s" 
 ) 
  
 } 
  
 } 
  
 if 
  
 mrdErr 
  
 != 
  
 nil 
  
 { 
  
 break 
  
 } 
  
 time 
 . 
 Sleep 
 ( 
 1 
  
 * 
  
 time 
 . 
 Second 
 ) 
  
 } 
  
 // After exiting the loop, close MultiRangeDownloader and signal that 
  
 // all ranges have been read. 
  
 if 
  
 err 
  
 := 
  
 mrd 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 mrdErr 
  
 = 
  
 err 
  
 } 
  
 done 
  
< - 
  
 true 
  
 }() 
  
 // Meanwhile, continue to write 10 bytes at a time to the object. 
  
 // This could be done by calling NewWriterFromAppendable object repeatedly 
  
 // (as in the example) or calling Writer.Flush without closing the Writer. 
  
 for 
  
 range 
  
 9 
  
 { 
  
 appendWriter 
 , 
  
 offset 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  Generation 
 
 ( 
 gen 
 ). 
  NewWriterFromAppendableObject 
 
 ( 
 ctx 
 , 
  
 nil 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewWriterFromAppendableObject: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 append Write 
r 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "more data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "appendWriter.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 appendWriter 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "appendWriter.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Wrote 10 bytes at offset %v" 
 , 
  
 offset 
 ) 
  
 } 
  
 // Wait for tailing goroutine to exit. 
  
< - 
 done 
  
 if 
  
 mrdErr 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "MultiRangeDownloader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read %v bytes from object %v" 
 , 
  
 buf 
 . 
 Len 
 (), 
  
 object 
 ) 
  
 return 
  
 buf 
 . 
  Bytes 
 
 (), 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
. AppendableUploadWriteableByteChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. FlushPolicy 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageChannelUtils 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.nio.ByteBuffer 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 ReadAppendableObjectTail 
  
 { 
  
 public 
  
 static 
  
 void 
  
 readAppendableObjectTail 
 ( 
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  BlobInfo 
 
  
 info 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 blobId 
 ). 
 build 
 (); 
  
 int 
  
 totalToWrite 
  
 = 
  
 64 
  
 * 
  
 1000 
 ; 
  
 // Define our flush policy to flush small increments 
  
 // This is useful for demonstration purposes, but you should use more appropriate values for 
  
 // your workload. 
  
 int 
  
 flushSize 
  
 = 
  
 totalToWrite 
  
 / 
  
 8 
 ; 
  
  FlushPolicy 
 
 . 
  MinFlushSizeFlushPolicy 
 
  
 flushPolicy 
  
 = 
  
  FlushPolicy 
 
 . 
  minFlushSize 
 
 ( 
 flushSize 
 ). 
  withMaxPendingBytes 
 
 ( 
 flushSize 
 ); 
  
  BlobAppendableUploadConfig 
 
  
 appendableUploadConfig 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 (). 
  withFlushPolicy 
 
 ( 
 flushPolicy 
 ); 
  
  BlobAppendableUpload 
 
  
 upload 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
  
 info 
 , 
  
 appendableUploadConfig 
 , 
  
  Storage 
 
 . 
 BlobWriteOption 
 . 
 doesNotExist 
 ()); 
  
 // Create the object, we'll takeover to write for our example. 
  
 upload 
 . 
  open 
 
 (). 
  closeWithoutFinalizing 
 
 (); 
  
  BlobInfo 
 
  
 gen1 
  
 = 
  
 upload 
 . 
  getResult 
 
 (). 
 get 
 (); 
  
  BlobAppendableUpload 
 
  
 takeover 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 gen1 
 , 
  
 appendableUploadConfig 
 ); 
  
 try 
  
 ( 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 takeover 
 . 
  open 
 
 ()) 
  
 { 
  
 // Start a background thread to write some data on a periodic basis 
  
 // In reality, you're application would probably be doing thing in another scope 
  
 Thread 
  
 writeThread 
  
 = 
  
 startWriteThread 
 ( 
 totalToWrite 
 , 
  
 channel 
 , 
  
 flushPolicy 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 readSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 gen1 
 . 
  getBlobId 
 
 ()). 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
 int 
  
 zeroCnt 
  
 = 
  
 0 
 ; 
  
 long 
  
 read 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 read 
 < 
 totalToWrite 
 ) 
  
 { 
  
 if 
  
 ( 
 zeroCnt 
  
> = 
  
 30 
 && 
 ! 
 channel 
 . 
 isOpen 
 ()) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "breaking" 
 ); 
  
 break 
 ; 
  
 } 
  
 ApiFuture<byte 
 [] 
>  
 future 
  
 = 
  
 readSession 
 . 
  readAs 
 
 ( 
  
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 () 
  
 . 
 withRangeSpec 
 ( 
  RangeSpec 
 
 . 
 of 
 ( 
 read 
 , 
  
 flushPolicy 
 . 
  getMinFlushSize 
 
 ()))); 
  
 byte 
 [] 
  
 bytes 
  
 = 
  
 future 
 . 
 get 
 ( 
 20 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 read 
  
 += 
  
 bytes 
 . 
 length 
 ; 
  
 long 
  
 defaultSleep 
  
 = 
  
 1_500L 
 ; 
  
 if 
  
 ( 
 bytes 
 . 
 length 
  
 == 
  
 0 
 ) 
  
 { 
  
 zeroCnt 
 ++ 
 ; 
  
 long 
  
 millis 
  
 = 
  
 defaultSleep 
  
 * 
  
 zeroCnt 
 ; 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "millis = " 
  
 + 
  
 millis 
 ); 
  
 Thread 
 . 
 sleep 
 ( 
 millis 
 ); 
  
 } 
  
 else 
  
 { 
  
 zeroCnt 
  
 = 
  
 0 
 ; 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "bytes.length = " 
  
 + 
  
 bytes 
 . 
 length 
  
 + 
  
 " read = " 
  
 + 
  
 read 
 ); 
  
 Thread 
 . 
 sleep 
 ( 
 defaultSleep 
 ); 
  
 } 
  
 } 
  
 assert 
  
 read 
  
 == 
  
 totalToWrite 
  
 : 
  
 "not enough bytes" 
 ; 
  
 } 
  
 writeThread 
 . 
 join 
 (); 
  
 } 
  
 } 
  
 } 
  
 private 
  
 static 
  
 Thread 
  
 startWriteThread 
 ( 
  
 int 
  
 totalToWrite 
 , 
  
  AppendableUploadWriteableByteChannel 
 
  
 channel 
 , 
  
  FlushPolicy 
 
 . 
  MinFlushSizeFlushPolicy 
 
  
 flushPolicy 
 ) 
  
 { 
  
 Thread 
  
 writeThread 
  
 = 
  
 new 
  
 Thread 
 ( 
  
 () 
  
 - 
>  
 { 
  
 try 
  
 { 
  
 for 
  
 ( 
 long 
  
 written 
  
 = 
  
 0 
 ; 
  
 written 
 < 
 totalToWrite 
 ; 
  
 ) 
  
 { 
  
 byte 
  
 alphaOffset 
  
 = 
  
 ( 
 byte 
 ) 
  
 ( 
 written 
  
 % 
  
 0x1a 
 ); 
  
 ByteBuffer 
  
 buf 
  
 = 
  
 ByteBuffer 
 . 
 wrap 
 ( 
 new 
  
 byte 
 [] 
  
 {( 
 byte 
 ) 
  
 ( 
 0x41 
  
 + 
  
 alphaOffset 
 )}); 
  
 int 
  
 w 
  
 = 
  
  StorageChannelUtils 
 
 . 
  blockingEmptyTo 
 
 ( 
 buf 
 , 
  
 channel 
 ); 
  
 written 
  
 += 
  
 w 
 ; 
  
 if 
  
 ( 
 written 
  
 % 
  
 flushPolicy 
 . 
  getMinFlushSize 
 
 () 
  
 == 
  
 0 
 ) 
  
 { 
  
 channel 
 . 
  flush 
 
 (); 
  
 Thread 
 . 
 sleep 
 ( 
 40 
 ); 
  
 } 
  
 } 
  
 channel 
 . 
  closeWithoutFinalizing 
 
 (); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 | 
  
 InterruptedException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 }); 
  
 writeThread 
 . 
 start 
 (); 
  
 return 
  
 writeThread 
 ; 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  async 
 def 
  
 appender 
 ( 
 writer 
 : 
 AsyncAppendableObjectWriter 
 , 
 duration 
 : 
 int 
 ): 
  
 """Appends 10 bytes to the object every second for a given duration.""" 
 print 
 ( 
 "Appender started." 
 ) 
 bytes_appended 
 = 
 0 
 start_time 
 = 
 time 
 . 
 monotonic 
 () 
 # Run the appender for the specified duration. 
 while 
 time 
 . 
 monotonic 
 () 
 - 
 start_time 
< duration 
 : 
 await 
 writer 
 . 
 append 
 ( 
 BYTES_TO_APPEND 
 ) 
 now 
 = 
 datetime 
 . 
 now 
 () 
 . 
 strftime 
 ( 
 "%Y-%m- 
 %d 
 %H:%M:%S. 
 %f 
 " 
 )[: 
 - 
 3 
 ] 
 bytes_appended 
 += 
 NUM_BYTES_TO_APPEND_EVERY_SECOND 
 print 
 ( 
 f 
 "[ 
 { 
 now 
 } 
 ] Appended 
 { 
 NUM_BYTES_TO_APPEND_EVERY_SECOND 
 } 
 new bytes. Total appended: 
 { 
 bytes_appended 
 } 
 bytes." 
 ) 
 await 
 asyncio 
 . 
 sleep 
 ( 
 0.1 
 ) 
 print 
 ( 
 "Appender finished." 
 ) 
 async 
 def 
  
 tailer 
 ( 
 bucket_name 
 : 
 str 
 , 
 object_name 
 : 
 str 
 , 
 duration 
 : 
 int 
 , 
 client 
 : 
 AsyncGrpcClient 
 ): 
  
 """Tails the object by reading new data as it is appended.""" 
 print 
 ( 
 "Tailer started." 
 ) 
 start_byte 
 = 
 0 
 start_time 
 = 
 time 
 . 
 monotonic 
 () 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 await 
 mrd 
 . 
 open 
 () 
 # Run the tailer for the specified duration. 
 while 
 time 
 . 
 monotonic 
 () 
 - 
 start_time 
< duration 
 : 
 output_buffer 
 = 
 BytesIO 
 () 
 # A download range of (start, 0) means to read from 'start' to the end. 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 start_byte 
 , 
 0 
 , 
 output_buffer 
 )]) 
 bytes_downloaded 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 if 
 bytes_downloaded 
> 0 
 : 
 now 
 = 
 datetime 
 . 
 now 
 () 
 . 
 strftime 
 ( 
 "%Y-%m- 
 %d 
 %H:%M:%S. 
 %f 
 " 
 )[: 
 - 
 3 
 ] 
 print 
 ( 
 f 
 "[ 
 { 
 now 
 } 
 ] Tailer read 
 { 
 bytes_downloaded 
 } 
 new bytes: " 
 ) 
 start_byte 
 += 
 bytes_downloaded 
 await 
 asyncio 
 . 
 sleep 
 ( 
 0.1 
 ) 
 # Poll for new data every 0.1 seconds. 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 print 
 ( 
 "Tailer finished." 
 ) 
 # read_appendable_object_tail simulates a "tail -f" command on a GCS object. It 
 # repeatedly polls an appendable object for new content. In a real 
 # application, the object would be written to by a separate process. 
 async 
 def 
  
 read_appendable_object_tail 
 ( 
 bucket_name 
 : 
 str 
 , 
 object_name 
 : 
 str 
 , 
 duration 
 : 
 int 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Main function to create an appendable object and run tasks. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 ) 
 # 1. Create an empty appendable object. 
 try 
 : 
 # 1. Create an empty appendable object. 
 await 
 writer 
 . 
 open 
 () 
 print 
 ( 
 f 
 "Created empty appendable object: 
 { 
 object_name 
 } 
 " 
 ) 
 # 2. Create the appender and tailer coroutines. 
 appender_task 
 = 
 asyncio 
 . 
 create_task 
 ( 
 appender 
 ( 
 writer 
 , 
 duration 
 )) 
 tailer_task 
 = 
 asyncio 
 . 
 create_task 
 ( 
 tailer 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 duration 
 , 
 grpc_client 
 ) 
 ) 
 # 3. Execute the coroutines concurrently. 
 await 
 asyncio 
 . 
 gather 
 ( 
 appender_task 
 , 
 tailer_task 
 ) 
 finally 
 : 
 if 
 writer 
 . 
 _is_stream_open 
 : 
 await 
 writer 
 . 
 close 
 () 
 print 
 ( 
 "Writer closed." 
 ) 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: