Read and append to objects in zonal buckets

You can read from and append data to objects in zonal buckets, which are made available through Rapid Bucket . Zonal buckets use the Rapid storage class , a high-performance storage class that's optimized for I/O-intensive workloads. Appendable objects let you incrementally add data to a file without needing to rewrite the entire object, helping you more effectively manage continuously flowing data.

This page describes how to read and make appends to objects stored in zonal buckets. It shows you how to perform the following operations:

  • Create and write to an appendable object.

  • Read appendable objects.

  • Pause, resume, and finalize appendable objects.

  • Read the tail end of appendable objects.

Before you use this page, you might want to read the following resources:

Make appendable writes to objects

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<google 
 :: 
 storage 
 :: 
 v2 
 :: 
 Object 
>  
 { 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Appendable upload started for object " 
 << 
 object_name 
 << 
 " 
 \n 
 " 
 ; 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "Some data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Wrote initial data. 
 \n 
 " 
 ; 
  
 // Flush the buffered data to the service. This is not a terminal 
  
 // operation. The writer can be used after the flush completes. 
  
 // After a flush, the data is visible to readers. 
  
 auto 
  
 flush_status 
  
 = 
  
 co_await 
  
 writer 
 . 
 Flush 
 (); 
  
 if 
  
 ( 
 ! 
 flush_status 
 . 
 ok 
 ()) 
  
 throw 
  
 std 
 :: 
 runtime_error 
 ( 
 flush_status 
 . 
 message 
 ()); 
  
 std 
 :: 
 cout 
 << 
 "Flush completed. Persisted size is now " 
 << 
 absl 
 :: 
 get<std 
 :: 
 int64_t 
> ( 
 writer 
 . 
 PersistedState 
 ()) 
 << 
 " 
 \n 
 " 
 ; 
  
 // The writer is still open. We can write more data. 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "Some more data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Wrote more data. 
 \n 
 " 
 ; 
  
 // Finalize the upload to make it a regular object. 
  
 co_return 
  
 ( 
 co_await 
  
 writer 
 . 
 Finalize 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // createAndWriteAppendableObject creates and uploads a new appendable object in 
 // a rapid bucket. The object will not be finalized. 
 func 
  
 createAndWriteAppendableObject 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 error 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create a Writer and write some data. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Flush the buffered data to the service. This is not a terminal 
  
 // operation. The Writer can be used after the flush completes. 
  
 // After a flush, the data is visible to readers. 
  
 size 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Flush 
 
 () 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Flush: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Flush completed. Persisted size is now %d\n" 
 , 
  
 size 
 ) 
  
 // The Writer is still open. We can write more data. 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some more data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Close the Writer to flush any remaining buffered data. 
  
 // The object will be unfinalized, which means another writer can 
  
 // later append to the object. 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded object %v\n" 
 , 
  
 object 
 ) 
  
 return 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
. AppendableUploadWriteableByteChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
. CloseAction 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. FlushPolicy 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.io.ByteStreams 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.nio.channels.FileChannel 
 ; 
 import 
  
 java.nio.channels.ReadableByteChannel 
 ; 
 import 
  
 java.nio.file.Paths 
 ; 
 import 
  
 java.util.Locale 
 ; 
 public 
  
 class 
 CreateAndWriteAppendableObject 
  
 { 
  
 public 
  
 static 
  
 void 
  
 createAndWriteAppendableObject 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 String 
  
 filePath 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The path to the file to upload 
  
 // String filePath = "path/to/your/file"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  BlobInfo 
 
  
 blobInfo 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 blobId 
 ). 
 build 
 (); 
  
 int 
  
 flushSize 
  
 = 
  
 64 
  
 * 
  
 1000 
 ; 
  
  FlushPolicy 
 
 . 
  MaxFlushSizeFlushPolicy 
 
  
 flushPolicy 
  
 = 
  
  FlushPolicy 
 
 . 
  maxFlushSize 
 
 ( 
 flushSize 
 ); 
  
  BlobAppendableUploadConfig 
 
  
 config 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 () 
  
 . 
  withCloseAction 
 
 ( 
  CloseAction 
 
 . 
 FINALIZE_WHEN_CLOSING 
 ) 
  
 . 
  withFlushPolicy 
 
 ( 
 flushPolicy 
 ); 
  
  BlobAppendableUpload 
 
  
 uploadSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 blobInfo 
 , 
  
 config 
 ); 
  
 try 
  
 ( 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 uploadSession 
 . 
  open 
 
 (); 
  
 ReadableByteChannel 
  
 readableByteChannel 
  
 = 
  
 FileChannel 
 . 
 open 
 ( 
 Paths 
 . 
 get 
 ( 
 filePath 
 ))) 
  
 { 
  
 ByteStreams 
 . 
 copy 
 ( 
 readableByteChannel 
 , 
  
 channel 
 ); 
  
 // Since the channel is in a try-with-resources block, channel.close() 
  
 // will be implicitly called here, which triggers the finalization. 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 ex 
 ) 
  
 { 
  
 throw 
  
 new 
  
 IOException 
 ( 
 "Failed to upload to object " 
  
 + 
  
 blobId 
 . 
  toGsUtilUri 
 
 (), 
  
 ex 
 ); 
  
 } 
  
  BlobInfo 
 
  
 result 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Object %s successfully uploaded" 
 , 
  
  result 
 
 . 
  getBlobId 
 
 (). 
  toGsUtilUriWithGeneration 
 
 ()); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample uploads an appendable object:

  async 
 def 
  
 storage_create_and_write_appendable_object 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Uploads an appendable object to zonal bucket. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 generation 
 = 
 0 
 , 
 # throws `FailedPrecondition` if object already exists. 
 ) 
 # This creates a new appendable object of size 0 and opens it for appending. 
 await 
 writer 
 . 
 open 
 () 
 # appends data to the object 
 # you can perform `.append` multiple times as needed. Data will be appended 
 # to the end of the object. 
 await 
 writer 
 . 
 append 
 ( 
 b 
 "Some data" 
 ) 
 # Once all appends are done, close the gRPC bidirectional stream. 
 await 
 writer 
 . 
 close 
 () 
 print 
 ( 
 f 
 "Appended object 
 { 
 object_name 
 } 
 created of size 
 { 
 writer 
 . 
 persisted_size 
 } 
 bytes." 
 ) 
 

Read objects

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 // This helps consume the data from the read operation. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 auto 
  
 [ 
 reader 
 , 
  
 token 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 co_return 
  
 co_await 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 reader 
 ), 
  
 std 
 :: 
 move 
 ( 
 token 
 )); 
 }; 
 

The following sample performs a full read on a single object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 // This helps consume the data from the read operation. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 auto 
  
 [ 
 reader 
 , 
  
 token 
 ] 
  
 = 
  
 descriptor 
 . 
 ReadFromOffset 
 ( 
 0 
 ); 
  
 co_return 
  
 co_await 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 reader 
 ), 
  
 std 
 :: 
 move 
 ( 
 token 
 )); 
 }; 
 

The following sample performs ranged reads on a single object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 auto 
  
 [ 
 r1 
 , 
  
 t1 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 auto 
  
 [ 
 r2 
 , 
  
 t2 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 auto 
  
 [ 
 r3 
 , 
  
 t3 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 1024 
 , 
  
 1024 
 ); 
  
 auto 
  
 c1 
  
 = 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 r1 
 ), 
  
 std 
 :: 
 move 
 ( 
 t1 
 )); 
  
 auto 
  
 c2 
  
 = 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 r2 
 ), 
  
 std 
 :: 
 move 
 ( 
 t2 
 )); 
  
 auto 
  
 c3 
  
 = 
  
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 r3 
 ), 
  
 std 
 :: 
 move 
 ( 
 t3 
 )); 
  
 co_return 
  
 ( 
 co_await 
  
 std 
 :: 
 move 
 ( 
 c1 
 )) 
  
 + 
  
 ( 
 co_await 
  
 std 
 :: 
 move 
 ( 
 c2 
 )) 
  
 + 
  
 ( 
 co_await 
  
 std 
 :: 
 move 
 ( 
 c3 
 )); 
 }; 
 

The following sample performs ranged reads on multiple objects (a single read per object):

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 // Helper coroutine to count newlines returned by an AsyncReader. 
 auto 
  
 count_newlines 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncReader 
  
 reader 
 , 
  
 gcs 
 :: 
 AsyncToken 
  
 token 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>  
 { 
  
 std 
 :: 
 uint64_t 
  
 count 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 token 
 . 
 valid 
 ()) 
  
 { 
  
 auto 
  
 [ 
 payload 
 , 
  
 t 
 ] 
  
 = 
  
 ( 
 co_await 
  
 reader 
 . 
 Read 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 token 
  
 = 
  
 std 
 :: 
 move 
 ( 
 t 
 ); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 count 
  
 += 
  
 std 
 :: 
 count 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 (), 
  
 '\n' 
 ); 
  
 } 
  
 } 
  
 co_return 
  
 count 
 ; 
 }; 
 auto 
  
 coro 
  
 = 
  
 [ 
& count_newlines 
 ]( 
  
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name1 
 , 
  
 std 
 :: 
 string 
  
 object_name2 
 , 
  
 std 
 :: 
 string 
  
 object_name3 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<void> 
  
 { 
  
 // List of object names to read (passed as arguments) 
  
 std 
 :: 
 vector<std 
 :: 
 string 
>  
 object_names 
  
 = 
  
 { 
 object_name1 
 , 
  
 object_name2 
 , 
  
 object_name3 
 }; 
  
 std 
 :: 
 vector<google 
 :: 
 cloud 
 :: 
 future<std 
 :: 
 uint64_t 
>>  
 futures 
 ; 
  
 // Start ranged reads for all objects and collect futures 
  
 // This example opens multiple objects, not one object multiple times. 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 name 
  
 : 
  
 object_names 
 ) 
  
 { 
  
 auto 
  
 descriptor 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 Open 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 name 
 )). 
 value 
 (); 
  
 auto 
  
 [ 
 reader 
 , 
  
 token 
 ] 
  
 = 
  
 descriptor 
 . 
 Read 
 ( 
 0 
 , 
  
 1024 
 ); 
  
 futures 
 . 
 push_back 
 ( 
 count_newlines 
 ( 
 std 
 :: 
 move 
 ( 
 reader 
 ), 
  
 std 
 :: 
 move 
 ( 
 token 
 ))); 
  
 } 
  
 // Process futures as they become ready and print results 
  
 while 
  
 ( 
 ! 
 futures 
 . 
 empty 
 ()) 
  
 { 
  
 bool 
  
 progress_made 
  
 = 
  
 false 
 ; 
  
 for 
  
 ( 
 std 
 :: 
 size_t 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
 < 
 futures 
 . 
 size 
 (); 
  
 ++ 
 i 
 ) 
  
 { 
  
 if 
  
 ( 
 futures 
 [ 
 i 
 ]. 
 is_ready 
 ()) 
  
 { 
  
 // Check if the future is ready 
  
 auto 
  
 count 
  
 = 
  
 futures 
 [ 
 i 
 ]. 
 get 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Object " 
 << 
 object_names 
 [ 
 i 
 ] 
 << 
 " read returned " 
 << 
 count 
 << 
 " newlines 
 \n 
 " 
 ; 
  
 futures 
 . 
 erase 
 ( 
 futures 
 . 
 begin 
 () 
  
 + 
  
 i 
 ); 
  
 object_names 
 . 
 erase 
 ( 
 object_names 
 . 
 begin 
 () 
  
 + 
  
 i 
 ); 
  
 progress_made 
  
 = 
  
 true 
 ; 
  
 break 
 ; 
  
 // Restart the loop after modifying the vectors 
  
 } 
  
 } 
  
 if 
  
 ( 
 ! 
 progress_made 
 ) 
  
 { 
  
 std 
 :: 
 this_thread 
 :: 
 sleep_for 
 ( 
  
 std 
 :: 
 chrono 
 :: 
 milliseconds 
 ( 
 10 
 )); 
  
 // Avoid busy spin 
  
 } 
  
 } 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // openObjectSingleRangedRead reads a single range from an object in a 
 // rapid bucket. 
 func 
  
 openObjectSingleRangedRead 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Read the first KiB of the file and copy into a buffer. 
  
 r 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewRangeReader 
 
 ( 
 ctx 
 , 
  
 0 
 , 
  
 1024 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewRangeReader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 r 
 . 
 Close 
 () 
  
 buf 
  
 := 
  
 new 
 ( 
 bytes 
 . 
 Buffer 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 io 
 . 
 Copy 
 ( 
 buf 
 , 
  
 r 
 ); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "copying data: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read the first 1024 bytes of %v into a buffer\n" 
 , 
  
 object 
 ) 
  
 return 
  
 buf 
 . 
  Bytes 
 
 (), 
  
 nil 
 } 
 

The following sample performs a full read on a single object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // OpenObjectReadFullObject reads a full object's data from a 
 // rapid bucket. 
 func 
  
 openObjectReadFullObject 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Read the first KiB of the file and copy into a buffer. 
  
 r 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewReader 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewReader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 r 
 . 
 Close 
 () 
  
 buf 
  
 := 
  
 new 
 ( 
 bytes 
 . 
 Buffer 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 io 
 . 
 Copy 
 ( 
 buf 
 , 
  
 r 
 ); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "copying data: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read the data of %v into a buffer\n" 
 , 
  
 object 
 ) 
  
 return 
  
 buf 
 . 
  Bytes 
 
 (), 
  
 nil 
 } 
 

The following sample performs ranged reads on a single object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // openObjectMultipleRangedRead opens a single object using 
 // MultiRangeDownloader to download multiple ranges. 
 func 
  
 openObjectMultipleRangedRead 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([][] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create the MultiRangeDownloader, which opens a stream to the object. 
  
 mrd 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewMultiRangeDownloader 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewMultiRangeDownloader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Add some 1 KiB ranges to download. This call is non-blocking. The 
  
 // provided callback is invoked when the range download is complete. 
  
 startOffsets 
  
 := 
  
 [] 
 int64 
 { 
 0 
 , 
  
 1024 
 , 
  
 2048 
 } 
  
 var 
  
 dataBufs 
  
 [ 
 3 
 ] 
 bytes 
 . 
 Buffer 
  
 var 
  
 errs 
  
 [] 
 error 
  
 for 
  
 i 
 , 
  
 off 
  
 := 
  
 range 
  
 startOffsets 
  
 { 
  
 mrd 
 . 
  Add 
 
 ( 
& dataBufs 
 [ 
 i 
 ], 
  
 off 
 , 
  
 1024 
 , 
  
 func 
 ( 
 off 
 , 
  
 length 
  
 int64 
 , 
  
 err 
  
 error 
 ) 
  
 { 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 errs 
  
 = 
  
 append 
 ( 
 errs 
 , 
  
 err 
 ) 
  
 } 
  
 else 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "downloaded range at offset %v" 
 , 
  
 off 
 ) 
  
 } 
  
 }) 
  
 } 
  
 // Wait for all downloads to complete. 
  
 mrd 
 . 
 Wait 
 () 
  
 if 
  
 len 
 ( 
 errs 
 ) 
 > 
 0 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "one or more downloads failed; errors: %v" 
 , 
  
 errs 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 mrd 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "MultiRangeDownloader.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read the ranges of %v into memory\n" 
 , 
  
 object 
 ) 
  
 // Collect the byte slices 
  
 var 
  
 byteSlices 
  
 [][] 
 byte 
  
 for 
  
 _ 
 , 
  
 buf 
  
 := 
  
 range 
  
 dataBufs 
  
 { 
  
 byteSlices 
  
 = 
  
 append 
 ( 
 byteSlices 
 , 
  
 buf 
 . 
  Bytes 
 
 ()) 
  
 } 
  
 return 
  
 byteSlices 
 , 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenObjectSingleRangedRead 
  
 { 
  
 public 
  
 static 
  
 void 
  
 openObjectSingleRangedRead 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 long 
  
 offset 
 , 
  
 int 
  
 length 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The beginning of the range 
  
 // long offset = 0 
  
 // The maximum number of bytes to read from the object. 
  
 // int length = 64; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureBlobReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 blobReadSession 
  
 = 
  
 futureBlobReadSession 
 . 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
 // Define the range of bytes to read. 
  
  RangeSpec 
 
  
 rangeSpec 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 offset 
 , 
  
 length 
 ); 
  
 ApiFuture<byte 
 [] 
>  
 future 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 rangeSpec 
 )); 
  
 // Wait for the read to complete. 
  
 byte 
 [] 
  
 bytes 
  
 = 
  
 future 
 . 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Successfully read " 
  
 + 
  
 bytes 
 . 
 length 
  
 + 
  
 " bytes from object " 
  
 + 
  
 objectName 
  
 + 
  
 " in bucket " 
  
 + 
  
 bucketName 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

The following sample performs a full read on a single object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadAsChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 java.nio.ByteBuffer 
 ; 
 import 
  
 java.nio.channels.ScatteringByteChannel 
 ; 
 import 
  
 java.util.Locale 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenObjectReadFullObject 
  
 { 
  
 public 
  
 static 
  
 void 
  
 openObjectReadFullObject 
 ( 
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object to read 
  
 // String objectName = "your-object-name"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureBlobReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 blobReadSession 
  
 = 
  
 futureBlobReadSession 
 . 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
  ReadAsChannel 
 
  
 readAsChannelConfig 
  
 = 
  
  ReadProjectionConfigs 
 
 . 
  asChannel 
 
 (); 
  
 try 
  
 ( 
 ScatteringByteChannel 
  
 channel 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
 readAsChannelConfig 
 )) 
  
 { 
  
 long 
  
 totalBytesRead 
  
 = 
  
 0 
 ; 
  
 ByteBuffer 
  
 buffer 
  
 = 
  
 ByteBuffer 
 . 
 allocate 
 ( 
 64 
  
 * 
  
 1024 
 ); 
  
 int 
  
 bytesRead 
 ; 
  
 while 
  
 (( 
 bytesRead 
  
 = 
  
 channel 
 . 
 read 
 ( 
 buffer 
 )) 
  
 != 
  
 - 
 1 
 ) 
  
 { 
  
 totalBytesRead 
  
 += 
  
 bytesRead 
 ; 
  
 buffer 
 . 
 clear 
 (); 
  
 } 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Successfully read a total of %d bytes from object %s%n" 
 , 
  
 totalBytesRead 
 , 
  
 blobId 
 . 
  toGsUtilUri 
 
 ()); 
  
 } 
  
 } 
  
 } 
  
 } 
 } 
 

The following sample performs ranged reads on a single object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.collect.ImmutableList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenObjectMultipleRangedRead 
  
 { 
  
 public 
  
 static 
  
 void 
  
 openObjectMultipleRangedRead 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 long 
  
 offset1 
 , 
  
 int 
  
 length1 
 , 
  
 long 
  
 offset2 
 , 
  
 int 
  
 length2 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The beginning of the range 1 
  
 // long offset = 0 
  
 // The maximum number of bytes to read in range 1 
  
 // int length = 16; 
  
 // The beginning of the range 2 
  
 // long offset = 16 
  
 // The maximum number of bytes to read in range 2 
  
 // int length = 32; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureBlobReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
  RangeSpec 
 
  
 rangeSpec1 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 offset1 
 , 
  
 length1 
 ); 
  
  RangeSpec 
 
  
 rangeSpec2 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 offset2 
 , 
  
 length2 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 blobReadSession 
  
 = 
  
 futureBlobReadSession 
 . 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
 ApiFuture<byte 
 [] 
>  
 future1 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 rangeSpec1 
 )); 
  
 ApiFuture<byte 
 [] 
>  
 future2 
  
 = 
  
  blobReadSession 
 
 . 
  readAs 
 
 ( 
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 rangeSpec2 
 )); 
  
 List<byte 
 [] 
>  
 allBytes 
  
 = 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 ImmutableList 
 . 
 of 
 ( 
 future1 
 , 
  
 future2 
 )). 
 get 
 (); 
  
 byte 
 [] 
  
 bytes1 
  
 = 
  
 allBytes 
 . 
 get 
 ( 
 0 
 ); 
  
 byte 
 [] 
  
 bytes2 
  
 = 
  
 allBytes 
 . 
 get 
 ( 
 1 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Successfully read " 
  
 + 
  
 bytes1 
 . 
 length 
  
 + 
  
 " bytes from range 1 and " 
  
 + 
  
 bytes2 
 . 
 length 
  
 + 
  
 " bytes from range 2." 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

The following sample performs ranged reads on multiple objects:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.api.core. ApiFutures 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadAsFutureBytes 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 OpenMultipleObjectsRangedRead 
  
 { 
  
 public 
  
 static 
  
 void 
  
 multipleObjectsSingleRangedRead 
 ( 
  
 String 
  
 bucketName 
 , 
  
 List<String> 
  
 objectNames 
 , 
  
 long 
  
 startOffset 
 , 
  
 int 
  
 length 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS objects to read 
  
 // List<String> objectName = Arrays.asList("object-1", "object-2", "object-3"); 
  
  RangeSpec 
 
  
 singleRange 
  
 = 
  
  RangeSpec 
 
 . 
 of 
 ( 
 startOffset 
 , 
  
 length 
 ); 
  
  ReadAsFutureBytes 
 
  
 rangeConfig 
  
 = 
  
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 (). 
 withRangeSpec 
 ( 
 singleRange 
 ); 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
 List<ApiFuture<byte 
 [] 
>>  
 futuresToWaitOn 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 "Initiating single ranged read [%d, %d] on %d objects...%n" 
 , 
  
 startOffset 
 , 
  
 startOffset 
  
 + 
  
 length 
  
 - 
  
 1 
 , 
  
 objectNames 
 . 
 size 
 ()); 
  
 for 
  
 ( 
 String 
  
 objectName 
  
 : 
  
 objectNames 
 ) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
 ApiFuture<BlobReadSession> 
  
 futureReadSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 blobId 
 ); 
  
 ApiFuture<byte 
 [] 
>  
 readAndCloseFuture 
  
 = 
  
  ApiFutures 
 
 . 
  transformAsync 
 
 ( 
  
 futureReadSession 
 , 
  
 ( 
 BlobReadSession 
  
 session 
 ) 
  
 - 
>  
 { 
  
 ApiFuture<byte 
 [] 
>  
 readFuture 
  
 = 
  
 session 
 . 
 readAs 
 ( 
 rangeConfig 
 ); 
  
 readFuture 
 . 
 addListener 
 ( 
  
 () 
  
 - 
>  
 { 
  
 try 
  
 { 
  
 session 
 . 
 close 
 (); 
  
 } 
  
 catch 
  
 ( 
 java 
 . 
 io 
 . 
 IOException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 println 
 ( 
  
 "WARN: Background error while closing session: " 
  
 + 
  
 e 
 . 
 getMessage 
 ()); 
  
 } 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 return 
  
 readFuture 
 ; 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 futuresToWaitOn 
 . 
 add 
 ( 
 readAndCloseFuture 
 ); 
  
 } 
  
  ApiFutures 
 
 . 
  allAsList 
 
 ( 
 futuresToWaitOn 
 ). 
 get 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "All concurrent single-ranged read operations are complete." 
 ); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample performs a ranged read on a single object:

  async 
 def 
  
 storage_open_object_single_ranged_read 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 start_byte 
 , 
 size 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads a range of bytes from an object. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # requested range will be downloaded into this buffer, user may provide 
 # their own buffer or file-like object. 
 output_buffer 
 = 
 BytesIO 
 () 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 start_byte 
 , 
 size 
 , 
 output_buffer 
 )]) 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 # Downloaded size can differ from requested size if object is smaller. 
 # mrd will download at most up to the end of the object. 
 downloaded_size 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 print 
 ( 
 f 
 "Downloaded 
 { 
 downloaded_size 
 } 
 bytes from 
 { 
 object_name 
 } 
 " 
 ) 
 

The following sample performs a full read on a single object:

  async 
 def 
  
 storage_open_object_read_full_object 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads the entire content of an object using a multi-range downloader. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 # mrd = Multi-Range-Downloader 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # This could be any buffer or file-like object. 
 output_buffer 
 = 
 BytesIO 
 () 
 # A download range of (0, 0) means to read from the beginning to the end. 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 0 
 , 
 0 
 , 
 output_buffer 
 )]) 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 downloaded_bytes 
 = 
 output_buffer 
 . 
 getvalue 
 () 
 print 
 ( 
 f 
 "Downloaded all 
 { 
 len 
 ( 
 downloaded_bytes 
 ) 
 } 
 bytes from object 
 { 
 object_name 
 } 
 in bucket 
 { 
 bucket_name 
 } 
 ." 
 ) 
 

The following sample performs ranged reads on a single object:

  async 
 def 
  
 storage_open_object_multiple_ranged_read 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads multiple ranges of bytes from a single object into different buffers. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # Specify four different buffers to download ranges into. 
 buffers 
 = 
 [ 
 BytesIO 
 (), 
 BytesIO 
 (), 
 BytesIO 
 (), 
 BytesIO 
 ()] 
 # Define the ranges to download. Each range is a tuple of (start_byte, size, buffer). 
 # All ranges will download 10 bytes from different starting positions. 
 # We choose arbitrary start bytes for this example. An object should be large enough. 
 # A user can choose any start byte between 0 and `object_size`. 
 # If `start_bytes` is greater than `object_size`, mrd will throw an error. 
 ranges 
 = 
 [ 
 ( 
 0 
 , 
 10 
 , 
 buffers 
 [ 
 0 
 ]), 
 ( 
 20 
 , 
 10 
 , 
 buffers 
 [ 
 1 
 ]), 
 ( 
 40 
 , 
 10 
 , 
 buffers 
 [ 
 2 
 ]), 
 ( 
 60 
 , 
 10 
 , 
 buffers 
 [ 
 3 
 ]), 
 ] 
 await 
 mrd 
 . 
 download_ranges 
 ( 
 ranges 
 ) 
 finally 
 : 
 await 
 mrd 
 . 
 close 
 () 
 # Print the downloaded content from each buffer. 
 for 
 i 
 , 
 output_buffer 
 in 
 enumerate 
 ( 
 buffers 
 ): 
 downloaded_size 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 print 
 ( 
 f 
 "Downloaded 
 { 
 downloaded_size 
 } 
 bytes into buffer 
 { 
 i 
  
 + 
  
 1 
 } 
 from start byte 
 { 
 ranges 
 [ 
 i 
 ][ 
 0 
 ] 
 } 
 : 
 { 
 output_buffer 
 . 
 getvalue 
 () 
 } 
 " 
 ) 
 

The following sample performs ranged reads on multiple objects:

  async 
 def 
  
 storage_open_multiple_objects_ranged_read 
 ( 
 bucket_name 
 , 
 object_names 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Downloads a range of bytes from multiple objects concurrently. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 async 
 def 
  
 _download_range 
 ( 
 object_name 
 ): 
  
 """Helper coroutine to download a range from a single object.""" 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 grpc_client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 # Open the object, mrd always opens in read mode. 
 await 
 mrd 
 . 
 open 
 () 
 # Each object downloads the first 100 bytes. 
 start_byte 
 = 
 0 
 size 
 = 
 100 
 # requested range will be downloaded into this buffer, user may provide 
 # their own buffer or file-like object. 
 output_buffer 
 = 
 BytesIO 
 () 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 start_byte 
 , 
 size 
 , 
 output_buffer 
 )]) 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 # Downloaded size can differ from requested size if object is smaller. 
 # mrd will download at most up to the end of the object. 
 downloaded_size 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 print 
 ( 
 f 
 "Downloaded 
 { 
 downloaded_size 
 } 
 bytes from 
 { 
 object_name 
 } 
 " 
 ) 
 download_tasks 
 = 
 [ 
 _download_range 
 ( 
 name 
 ) 
 for 
 name 
 in 
 object_names 
 ] 
 await 
 asyncio 
 . 
 gather 
 ( 
 * 
 download_tasks 
 ) 
 

Pause and resume an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<google 
 :: 
 storage 
 :: 
 v2 
 :: 
 Object 
>  
 { 
  
 // Start an appendable upload and write some data. 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 )) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Appendable upload started. 
 \n 
 " 
 ; 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "paused data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 // The writer is closed, but the upload is not finalized. This "pauses" the 
  
 // upload, as the object remains appendable. 
  
 auto 
  
 close_status 
  
 = 
  
 co_await 
  
 writer 
 . 
 Close 
 (); 
  
 if 
  
 ( 
 ! 
 close_status 
 . 
 ok 
 ()) 
  
 throw 
  
 std 
 :: 
 runtime_error 
 ( 
 close_status 
 . 
 message 
 ()); 
  
 std 
 :: 
 cout 
 << 
 "Upload paused. 
 \n 
 " 
 ; 
  
 // To resume the upload we need the object's generation. We can use the 
  
 // regular GCS client to get the latest metadata. 
  
 auto 
  
 regular_client 
  
 = 
  
 gcs 
 :: 
 Client 
 (); 
  
 auto 
  
 metadata 
  
 = 
  
 regular_client 
 . 
 GetObjectMetadata 
 ( 
 bucket_name 
 , 
  
 object_name 
 ). 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Object generation is " 
 << 
 metadata 
 . 
 generation 
 () 
 << 
 " 
 \n 
 " 
 ; 
  
 // Now resume the upload. 
  
 std 
 :: 
 tie 
 ( 
 writer 
 , 
  
 token 
 ) 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 ResumeAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 , 
  
 metadata 
 . 
 generation 
 ())) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Upload resumed from offset " 
 << 
 absl 
 :: 
 get<std 
 :: 
 int64_t 
> ( 
 writer 
 . 
 PersistedState 
 ()) 
 << 
 " 
 \n 
 " 
 ; 
  
 // Append the rest of the data. 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "resumed data 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 // Finalize the upload and return the object metadata. 
  
 co_return 
  
 ( 
 co_await 
  
 writer 
 . 
 Finalize 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // pauseAndResumeAppendableUpload creates a new unfinalized appendable object, 
 // closes the Writer, then re-opens the object for writing using 
 // NewWriterFromAppendableObject. 
 func 
  
 pauseAndResumeAppendableUpload 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 error 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Start an appendable upload and write some data. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // The writer is closed, but the upload is not finalized. This "pauses" the 
  
 // upload, as the object remains appendable. 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded unfinalized object %v\n" 
 , 
  
 object 
 ) 
  
 // To resume the upload we need the object's generation. We can get this 
  
 // from the previous Writer after close. 
  
 gen 
  
 := 
  
 writer 
 . 
 Attrs 
 (). 
  Generation 
 
  
 // Now resume the upload. Writer options including finalization can be 
  
 // passed on calling this constructor. 
  
 appendWriter 
 , 
  
 offset 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  Generation 
 
 ( 
 gen 
 ). 
  NewWriterFromAppendableObject 
 
 ( 
  
 ctx 
 , 
  
& storage 
 . 
  AppendableWriterOpts 
 
 { 
  
 FinalizeOnClose 
 : 
  
 true 
 , 
  
 }, 
  
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewWriterFromAppendableObject: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Resuming upload from offset %v\n" 
 , 
  
 offset 
 ) 
  
 // Append the rest of the data and close the Writer to finalize. 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 append Write 
r 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "resumed data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "appendWriter.Write: %v" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 appendWriter 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded and finalized object %v\n" 
 , 
  
 object 
 ) 
  
 return 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  import 
  
 com.google.cloud.storage. Blob 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
. AppendableUploadWriteableByteChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
. CloseAction 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageChannelUtils 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 com.google.common.io.ByteStreams 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.nio.ByteBuffer 
 ; 
 import 
  
 java.nio.channels.FileChannel 
 ; 
 import 
  
 java.nio.charset.StandardCharsets 
 ; 
 import 
  
 java.nio.file.Paths 
 ; 
 import 
  
 java.util.Locale 
 ; 
 public 
  
 class 
 PauseAndResumeAppendableObjectUpload 
  
 { 
  
 public 
  
 static 
  
 void 
  
 pauseAndResumeAppendableObjectUpload 
 ( 
  
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 , 
  
 String 
  
 filePath 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 // The path to the file to upload 
  
 // String filePath = "path/to/your/file"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  BlobInfo 
 
  
 blobInfo 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 blobId 
 ). 
 build 
 (); 
  
 // --- Step 1: Initial string write (PAUSE) --- 
  
 // Default close action will be CLOSE_WITHOUT_FINALIZING 
  
  BlobAppendableUploadConfig 
 
  
 initialConfig 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 (); 
  
  BlobAppendableUpload 
 
  
 initialUploadSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 blobInfo 
 , 
  
 initialConfig 
 ); 
  
 try 
  
 ( 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 initialUploadSession 
 . 
  open 
 
 ()) 
  
 { 
  
 String 
  
 initialData 
  
 = 
  
 "Initial data segment.\n" 
 ; 
  
 ByteBuffer 
  
 buffer 
  
 = 
  
 ByteBuffer 
 . 
 wrap 
 ( 
 initialData 
 . 
 getBytes 
 ( 
 StandardCharsets 
 . 
 UTF_8 
 )); 
  
 long 
  
 totalBytesWritten 
  
 = 
  
  StorageChannelUtils 
 
 . 
  blockingEmptyTo 
 
 ( 
 buffer 
 , 
  
 channel 
 ); 
  
 channel 
 . 
  flush 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Wrote %d bytes (initial string) in first segment.\n" 
 , 
  
 totalBytesWritten 
 ); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 ex 
 ) 
  
 { 
  
 throw 
  
 new 
  
 IOException 
 ( 
 "Failed initial upload to object " 
  
 + 
  
 blobId 
 . 
  toGsUtilUri 
 
 (), 
  
 ex 
 ); 
  
 } 
  
  Blob 
 
  
 existingBlob 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 long 
  
 currentObjectSize 
  
 = 
  
 existingBlob 
 . 
  getSize 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Initial upload paused. Currently uploaded size: %d bytes\n" 
 , 
  
 currentObjectSize 
 ); 
  
 // --- Step 2: Resume upload with file content and finalize --- 
  
 // Use FINALIZE_WHEN_CLOSING to ensure the object is finalized on channel closure. 
  
  BlobAppendableUploadConfig 
 
  
 resumeConfig 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 (). 
  withCloseAction 
 
 ( 
  CloseAction 
 
 . 
 FINALIZE_WHEN_CLOSING 
 ); 
  
  BlobAppendableUpload 
 
  
 resumeUploadSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 existingBlob 
 . 
  toBuilder 
 
 (). 
 build 
 (), 
  
 resumeConfig 
 ); 
  
 try 
  
 ( 
 FileChannel 
  
 fileChannel 
  
 = 
  
 FileChannel 
 . 
 open 
 ( 
 Paths 
 . 
 get 
 ( 
 filePath 
 )); 
  
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 resumeUploadSession 
 . 
  open 
 
 ()) 
  
 { 
  
 long 
  
 bytesToAppend 
  
 = 
  
 fileChannel 
 . 
 size 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "Appending the entire file (%d bytes) after the initial string.\n" 
 , 
  
 bytesToAppend 
 ); 
  
 ByteStreams 
 . 
 copy 
 ( 
 fileChannel 
 , 
  
 channel 
 ); 
  
 } 
  
  BlobInfo 
 
  
 result 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
  
 Locale 
 . 
 US 
 , 
  
 "\nObject %s successfully resumed and finalized. Total size: %d bytes\n" 
 , 
  
  result 
 
 . 
  getBlobId 
 
 (). 
  toGsUtilUriWithGeneration 
 
 (), 
  
  result 
 
 . 
  getSize 
 
 ()); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample pauses and resumes an appendable object:

  async 
 def 
  
 storage_pause_and_resume_appendable_upload 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Demonstrates pausing and resuming an appendable object upload. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer1 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 ) 
 await 
 writer1 
 . 
 open 
 () 
 await 
 writer1 
 . 
 append 
 ( 
 b 
 "First part of the data. " 
 ) 
 print 
 ( 
 f 
 "Appended 
 { 
 writer1 
 . 
 persisted_size 
 } 
 bytes with the first writer." 
 ) 
 # 2. After appending some data, close the writer to "pause" the upload. 
 #  NOTE: you can pause indefinitely and still read the conetent uploaded so far using MRD. 
 await 
 writer1 
 . 
 close 
 () 
 print 
 ( 
 "First writer closed. Upload is 'paused'." 
 ) 
 # 3. Create a new writer, passing the generation number from the previous 
 #    writer. This is a precondition to ensure that the object hasn't been 
 #    modified since we last accessed it. 
 generation_to_resume 
 = 
 writer1 
 . 
 generation 
 print 
 ( 
 f 
 "Generation to resume from is: 
 { 
 generation_to_resume 
 } 
 " 
 ) 
 writer2 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 generation 
 = 
 generation_to_resume 
 , 
 ) 
 # 4. Open the new writer. 
 try 
 : 
 await 
 writer2 
 . 
 open 
 () 
 # 5. Append some more data using the new writer. 
 await 
 writer2 
 . 
 append 
 ( 
 b 
 "Second part of the data." 
 ) 
 print 
 ( 
 f 
 "Appended more data. Total size is now 
 { 
 writer2 
 . 
 persisted_size 
 } 
 bytes." 
 ) 
 finally 
 : 
 # 6. Finally, close the new writer. 
 if 
 writer2 
 . 
 _is_stream_open 
 : 
 await 
 writer2 
 . 
 close 
 () 
 print 
 ( 
 "Second writer closed. Full object uploaded." 
 ) 
 

Finalize an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<google 
 :: 
 storage 
 :: 
 v2 
 :: 
 Object 
>  
 { 
  
 // Start an appendable upload. 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 std 
 :: 
 move 
 ( 
 bucket_name 
 )), 
  
 std 
 :: 
 move 
 ( 
 object_name 
 ))) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Appendable upload started. 
 \n 
 " 
 ; 
  
 // Write some data. 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 "some data to finalize 
 \n 
 " 
 ))) 
  
 . 
 value 
 (); 
  
 // Finalize the upload. This makes the object non-appendable. 
  
 // No more data can be written to this writer. 
  
 auto 
  
 object 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Finalize 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ))). 
 value 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Upload finalized. 
 \n 
 " 
 ; 
  
 co_return 
  
 object 
 ; 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // finalizeAppendableObject creates, uploads and finalizes a new object in 
 // a rapid bucket. 
 func 
  
 finalizeAppendableObject 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 error 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 time 
 . 
 Second 
 * 
 10 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create a Writer and set FinalizeOnClose so that the object will be 
  
 // finalized after the write is complete. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 writer 
 . 
 FinalizeOnClose 
  
 = 
  
 true 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "some data to finalize\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Close the Writer to flush any remaining buffered data and finalize 
  
 // the upload. This makes the object non-appendable. 
  
 // No more data can be written to this object. 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Uploaded and finalized object %v\n" 
 , 
  
 object 
 ) 
  
 return 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  import 
  
 com.google.cloud.storage. Blob 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 public 
  
 class 
 FinalizeAppendableObjectUpload 
  
 { 
  
 public 
  
 static 
  
 void 
  
 finalizeAppendableObjectUpload 
 ( 
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS unfinalized appendable object 
  
 // String objectName = "your-object-name"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  Blob 
 
  
 existingBlob 
  
 = 
  
 storage 
 . 
 get 
 ( 
 blobId 
 ); 
  
 if 
  
 ( 
 existingBlob 
  
 == 
  
 null 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Object " 
  
 + 
  
 objectName 
  
 + 
  
 " not found in bucket " 
  
 + 
  
 bucketName 
 ); 
  
 return 
 ; 
  
 } 
  
  BlobInfo 
 
  
 blobInfoForTakeover 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 existingBlob 
 . 
  getBlobId 
 
 ()). 
 build 
 (); 
  
  BlobAppendableUpload 
 
  
 finalizingSession 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
  
 blobInfoForTakeover 
 , 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 () 
  
 . 
  withCloseAction 
 
 ( 
  BlobAppendableUploadConfig 
 
 . 
 CloseAction 
 . 
 FINALIZE_WHEN_CLOSING 
 )); 
  
 try 
  
 ( 
  BlobAppendableUpload 
 
 . 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 finalizingSession 
 . 
  open 
 
 ()) 
  
 { 
  
 channel 
 . 
  finalizeAndClose 
 
 (); 
  
 } 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Successfully finalized object " 
  
 + 
  
 objectName 
  
 + 
  
 " in bucket " 
  
 + 
  
 bucketName 
 ); 
  
 } 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample finalizes an appendable object:

  async 
 def 
  
 storage_finalize_appendable_object_upload 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Creates, writes to, and finalizes an appendable object. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 generation 
 = 
 0 
 , 
 # throws `FailedPrecondition` if object already exists. 
 ) 
 # This creates a new appendable object of size 0 and opens it for appending. 
 await 
 writer 
 . 
 open 
 () 
 # Appends data to the object. 
 await 
 writer 
 . 
 append 
 ( 
 b 
 "Some data" 
 ) 
 # finalize the appendable object, 
 # NOTE: 
 # 1. once finalized no more appends can be done to the object. 
 # 2. If you don't want to finalize, you can simply call `writer.close` 
 # 3. calling `.finalize()` also closes the grpc-bidi stream, calling 
 #   `.close` after `.finalize` may lead to undefined behavior. 
 object_resource 
 = 
 await 
 writer 
 . 
 finalize 
 () 
 print 
 ( 
 f 
 "Appendable object 
 { 
 object_name 
 } 
 created and finalized." 
 ) 
 print 
 ( 
 "Object Metadata:" 
 ) 
 print 
 ( 
 object_resource 
 ) 
 

Read the tail of an object

Client libraries

C++

For more information, see the Cloud Storage C++ API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  namespace 
  
 gcs 
  
 = 
  
 google 
 :: 
 cloud 
 :: 
 storage 
 ; 
 auto 
  
 coro 
  
 = 
  
 []( 
 gcs 
 :: 
 AsyncClient 
&  
 client 
 , 
  
 std 
 :: 
 string 
  
 bucket_name 
 , 
  
 std 
 :: 
 string 
  
 object_name 
 ) 
  
 - 
>  
 google 
 :: 
 cloud 
 :: 
 future<void> 
  
 { 
  
 // This coroutine simulates a "tail -f" command on a GCS object. It 
  
 // repeatedly polls an appendable object for new content. In a real 
  
 // application, the object would be written to by a separate process. 
  
 std 
 :: 
 cout 
 << 
 "Polling for content from " 
 << 
 object_name 
 << 
 "... 
 \n\n 
 " 
 ; 
  
 // Start an appendable upload. In a real application this would be done by 
  
 // a separate process, and this application would only know the object name. 
  
 auto 
  
 [ 
 writer 
 , 
  
 token 
 ] 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 StartAppendableObjectUpload 
 ( 
  
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 )) 
  
 . 
 value 
 (); 
  
 std 
 :: 
 int64_t 
  
 bytes_read 
  
 = 
  
 0 
 ; 
  
 for 
  
 ( 
 int 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
  
 != 
  
 2 
 ; 
  
 ++ 
 i 
 ) 
  
 { 
  
 // In a real application, another process would append data here. 
  
 // We simulate this by writing to the object. 
  
 auto 
  
 content 
  
 = 
  
 "More data for tail example, iteration " 
  
 + 
  
 std 
 :: 
 to_string 
 ( 
 i 
 ) 
  
 + 
  
 " 
 \n 
 " 
 ; 
  
 token 
  
 = 
  
 ( 
 co_await 
  
 writer 
 . 
 Write 
 ( 
 std 
 :: 
 move 
 ( 
 token 
 ), 
  
 gcs 
 :: 
 WritePayload 
 ( 
 content 
 ))) 
  
 . 
 value 
 (); 
  
 ( 
 void 
 ) 
 co_await 
  
 writer 
 . 
 Flush 
 (); 
  
 // Poll for new content by reading from the last known offset. 
  
 auto 
  
 payload 
  
 = 
  
 ( 
 co_await 
  
 client 
 . 
 ReadObjectRange 
 ( 
 gcs 
 :: 
 BucketName 
 ( 
 bucket_name 
 ), 
  
 object_name 
 , 
  
 bytes_read 
 , 
  
 -1 
 )) 
  
 . 
 value 
 (); 
  
 for 
  
 ( 
 auto 
  
 const 
&  
 buffer 
  
 : 
  
 payload 
 . 
 contents 
 ()) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 std::string 
 ( 
 buffer 
 . 
 begin 
 (), 
  
 buffer 
 . 
 end 
 ()); 
  
 bytes_read 
  
 += 
  
 buffer 
 . 
 size 
 (); 
  
 } 
  
 // In a real application you would wait here, e.g. with a timer. 
  
 std 
 :: 
 this_thread 
 :: 
 sleep_for 
 ( 
 std 
 :: 
 chrono 
 :: 
 seconds 
 ( 
 1 
 )); 
  
 } 
 }; 
 

Go

For more information, see the Cloud Storage Go API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  import 
  
 ( 
  
 "bytes" 
  
 "context" 
  
 "errors" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/storage" 
  
 "cloud.google.com/go/storage/experimental" 
 ) 
 // readAppendableObjectTail simulates a "tail -f" command on a GCS object. It 
 // repeatedly polls an appendable object for new content. In a real 
 // application, the object would be written to by a separate process. 
 func 
  
 readAppendableObjectTail 
 ( 
 w 
  
 io 
 . 
  Writer 
 
 , 
  
 bucket 
 , 
  
 object 
  
 string 
 ) 
  
 ([] 
 byte 
 , 
  
 error 
 ) 
  
 { 
  
 // bucket := "bucket-name" 
  
 // object := "object-name" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 storage 
 . 
  NewGRPCClient 
 
 ( 
 ctx 
 , 
  
 experimental 
 . 
  WithZonalBucketAPIs 
 
 ()) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "storage.NewGRPCClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // Set a context timeout. When this timeout is reached, the read stream 
  
 // will be closed, so omit this to tail indefinitely. 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 5 
 * 
 time 
 . 
 Minute 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Create a new appendable object and write some data. 
  
 writer 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
 If 
 ( 
 storage 
 . 
  Conditions 
 
 { 
 DoesNotExist 
 : 
  
 true 
 }). 
  NewWriter 
 
 ( 
 ctx 
 ) 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 writer 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "Some data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 writer 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "Writer.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 gen 
  
 := 
  
 writer 
 . 
 Attrs 
 (). 
  Generation 
 
  
 // Create the MultiRangeDownloader, which opens a read stream to the object. 
  
 mrd 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  NewMultiRangeDownloader 
 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewMultiRangeDownloader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // In a goroutine, poll the object. In this example we continue until all the 
  
 // bytes we expect to see were received, but in a real application this could 
  
 // continue to poll indefinitely until some signal is received. 
  
 var 
  
 buf 
  
 bytes 
 . 
 Buffer 
  
 var 
  
 mrdErr 
  
 error 
  
 done 
  
 := 
  
 make 
 ( 
 chan 
  
 bool 
 ) 
  
 go 
  
 func 
 () 
  
 { 
  
 var 
  
 currOff 
  
 int64 
  
 rangeDownloaded 
  
 := 
  
 make 
 ( 
 chan 
  
 bool 
 ) 
  
 for 
  
 buf 
 . 
 Len 
 () 
 < 
 100 
  
 { 
  
 // Add the current range and wait for it to be downloaded. 
  
 // Using a length of 0 will read to the current end of the object. 
  
 // The callback will give the actual number of bytes that were 
  
 // read in each iteration. 
  
 mrd 
 . 
  Add 
 
 ( 
& buf 
 , 
  
 currOff 
 , 
  
 0 
 , 
  
 func 
 ( 
 offset 
 , 
  
 length 
  
 int64 
 , 
  
 err 
  
 error 
 ) 
  
 { 
  
 // After each range is received, update 
  
 // the starting offset based on how many bytes were received. 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 mrdErr 
  
 = 
  
 err 
  
 } 
  
 currOff 
  
 += 
  
 length 
  
 rangeDownloaded 
  
< - 
  
 true 
  
 }) 
  
 // Wait for the range download to complete with a timeout of 10s. 
  
 select 
  
 { 
  
 case 
  
< - 
 rangeDownloaded 
 : 
  
 case 
  
< - 
 time 
 . 
 After 
 ( 
 10 
  
 * 
  
 time 
 . 
 Second 
 ): 
  
 mrdErr 
  
 = 
  
 mrd 
 . 
  Error 
 
 () 
  
 if 
  
 mrdErr 
  
 == 
  
 nil 
  
 { 
  
 mrdErr 
  
 = 
  
 errors 
 . 
 New 
 ( 
 "range request timed out after 10s" 
 ) 
  
 } 
  
 } 
  
 if 
  
 mrdErr 
  
 != 
  
 nil 
  
 { 
  
 break 
  
 } 
  
 time 
 . 
 Sleep 
 ( 
 1 
  
 * 
  
 time 
 . 
 Second 
 ) 
  
 } 
  
 // After exiting the loop, close MultiRangeDownloader and signal that 
  
 // all ranges have been read. 
  
 if 
  
 err 
  
 := 
  
 mrd 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 mrdErr 
  
 = 
  
 err 
  
 } 
  
 done 
  
< - 
  
 true 
  
 }() 
  
 // Meanwhile, continue to write 10 bytes at a time to the object. 
  
 // This could be done by calling NewWriterFromAppendable object repeatedly 
  
 // (as in the example) or calling Writer.Flush without closing the Writer. 
  
 for 
  
 range 
  
 9 
  
 { 
  
 appendWriter 
 , 
  
 offset 
 , 
  
 err 
  
 := 
  
 client 
 . 
  Bucket 
 
 ( 
 bucket 
 ). 
  Object 
 
 ( 
 object 
 ). 
  Generation 
 
 ( 
 gen 
 ). 
  NewWriterFromAppendableObject 
 
 ( 
 ctx 
 , 
  
 nil 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "NewWriterFromAppendableObject: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 _ 
 , 
  
 err 
  
 := 
  
 append Write 
r 
 . 
  Write 
 
 ([] 
 byte 
 ( 
 "more data\n" 
 )); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "appendWriter.Write: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 if 
  
 err 
  
 := 
  
 appendWriter 
 . 
 Close 
 (); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "appendWriter.Close: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Wrote 10 bytes at offset %v" 
 , 
  
 offset 
 ) 
  
 } 
  
 // Wait for tailing goroutine to exit. 
  
< - 
 done 
  
 if 
  
 mrdErr 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 nil 
 , 
  
 fmt 
 . 
 Errorf 
 ( 
 "MultiRangeDownloader: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Read %v bytes from object %v" 
 , 
  
 buf 
 . 
 Len 
 (), 
  
 object 
 ) 
  
 return 
  
 buf 
 . 
  Bytes 
 
 (), 
  
 nil 
 } 
 

Java

For more information, see the Cloud Storage Java API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUpload 
. AppendableUploadWriteableByteChannel 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobAppendableUploadConfig 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobId 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobInfo 
 
 ; 
 import 
  
 com.google.cloud.storage. BlobReadSession 
 
 ; 
 import 
  
 com.google.cloud.storage. FlushPolicy 
 
 ; 
 import 
  
 com.google.cloud.storage. RangeSpec 
 
 ; 
 import 
  
 com.google.cloud.storage. ReadProjectionConfigs 
 
 ; 
 import 
  
 com.google.cloud.storage. Storage 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageChannelUtils 
 
 ; 
 import 
  
 com.google.cloud.storage. StorageOptions 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.nio.ByteBuffer 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 ReadAppendableObjectTail 
  
 { 
  
 public 
  
 static 
  
 void 
  
 readAppendableObjectTail 
 ( 
 String 
  
 bucketName 
 , 
  
 String 
  
 objectName 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // The ID of your GCS bucket 
  
 // String bucketName = "your-unique-bucket-name"; 
  
 // The ID of your GCS object 
  
 // String objectName = "your-object-name"; 
  
 try 
  
 ( 
  Storage 
 
  
 storage 
  
 = 
  
  StorageOptions 
 
 . 
  grpc 
 
 (). 
 build 
 (). 
  getService 
 
 ()) 
  
 { 
  
  BlobId 
 
  
 blobId 
  
 = 
  
  BlobId 
 
 . 
 of 
 ( 
 bucketName 
 , 
  
 objectName 
 ); 
  
  BlobInfo 
 
  
 info 
  
 = 
  
  BlobInfo 
 
 . 
 newBuilder 
 ( 
 blobId 
 ). 
 build 
 (); 
  
 int 
  
 totalToWrite 
  
 = 
  
 64 
  
 * 
  
 1000 
 ; 
  
 // Define our flush policy to flush small increments 
  
 // This is useful for demonstration purposes, but you should use more appropriate values for 
  
 // your workload. 
  
 int 
  
 flushSize 
  
 = 
  
 totalToWrite 
  
 / 
  
 8 
 ; 
  
  FlushPolicy 
 
 . 
  MinFlushSizeFlushPolicy 
 
  
 flushPolicy 
  
 = 
  
  FlushPolicy 
 
 . 
  minFlushSize 
 
 ( 
 flushSize 
 ). 
  withMaxPendingBytes 
 
 ( 
 flushSize 
 ); 
  
  BlobAppendableUploadConfig 
 
  
 appendableUploadConfig 
  
 = 
  
  BlobAppendableUploadConfig 
 
 . 
 of 
 (). 
  withFlushPolicy 
 
 ( 
 flushPolicy 
 ); 
  
  BlobAppendableUpload 
 
  
 upload 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
  
 info 
 , 
  
 appendableUploadConfig 
 , 
  
  Storage 
 
 . 
 BlobWriteOption 
 . 
 doesNotExist 
 ()); 
  
 // Create the object, we'll takeover to write for our example. 
  
 upload 
 . 
  open 
 
 (). 
  closeWithoutFinalizing 
 
 (); 
  
  BlobInfo 
 
  
 gen1 
  
 = 
  
 upload 
 . 
  getResult 
 
 (). 
 get 
 (); 
  
  BlobAppendableUpload 
 
  
 takeover 
  
 = 
  
 storage 
 . 
  blobAppendableUpload 
 
 ( 
 gen1 
 , 
  
 appendableUploadConfig 
 ); 
  
 try 
  
 ( 
  AppendableUploadWriteableByteChannel 
 
  
 channel 
  
 = 
  
 takeover 
 . 
  open 
 
 ()) 
  
 { 
  
 // Start a background thread to write some data on a periodic basis 
  
 // In reality, you're application would probably be doing thing in another scope 
  
 Thread 
  
 writeThread 
  
 = 
  
 startWriteThread 
 ( 
 totalToWrite 
 , 
  
 channel 
 , 
  
 flushPolicy 
 ); 
  
 try 
  
 ( 
  BlobReadSession 
 
  
 readSession 
  
 = 
  
 storage 
 . 
  blobReadSession 
 
 ( 
 gen1 
 . 
  getBlobId 
 
 ()). 
 get 
 ( 
 10 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 )) 
  
 { 
  
 int 
  
 zeroCnt 
  
 = 
  
 0 
 ; 
  
 long 
  
 read 
  
 = 
  
 0 
 ; 
  
 while 
  
 ( 
 read 
 < 
 totalToWrite 
 ) 
  
 { 
  
 if 
  
 ( 
 zeroCnt 
  
> = 
  
 30 
 && 
 ! 
 channel 
 . 
 isOpen 
 ()) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "breaking" 
 ); 
  
 break 
 ; 
  
 } 
  
 ApiFuture<byte 
 [] 
>  
 future 
  
 = 
  
 readSession 
 . 
  readAs 
 
 ( 
  
  ReadProjectionConfigs 
 
 . 
  asFutureBytes 
 
 () 
  
 . 
 withRangeSpec 
 ( 
  RangeSpec 
 
 . 
 of 
 ( 
 read 
 , 
  
 flushPolicy 
 . 
  getMinFlushSize 
 
 ()))); 
  
 byte 
 [] 
  
 bytes 
  
 = 
  
 future 
 . 
 get 
 ( 
 20 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 read 
  
 += 
  
 bytes 
 . 
 length 
 ; 
  
 long 
  
 defaultSleep 
  
 = 
  
 1_500L 
 ; 
  
 if 
  
 ( 
 bytes 
 . 
 length 
  
 == 
  
 0 
 ) 
  
 { 
  
 zeroCnt 
 ++ 
 ; 
  
 long 
  
 millis 
  
 = 
  
 defaultSleep 
  
 * 
  
 zeroCnt 
 ; 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "millis = " 
  
 + 
  
 millis 
 ); 
  
 Thread 
 . 
 sleep 
 ( 
 millis 
 ); 
  
 } 
  
 else 
  
 { 
  
 zeroCnt 
  
 = 
  
 0 
 ; 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "bytes.length = " 
  
 + 
  
 bytes 
 . 
 length 
  
 + 
  
 " read = " 
  
 + 
  
 read 
 ); 
  
 Thread 
 . 
 sleep 
 ( 
 defaultSleep 
 ); 
  
 } 
  
 } 
  
 assert 
  
 read 
  
 == 
  
 totalToWrite 
  
 : 
  
 "not enough bytes" 
 ; 
  
 } 
  
 writeThread 
 . 
 join 
 (); 
  
 } 
  
 } 
  
 } 
  
 private 
  
 static 
  
 Thread 
  
 startWriteThread 
 ( 
  
 int 
  
 totalToWrite 
 , 
  
  AppendableUploadWriteableByteChannel 
 
  
 channel 
 , 
  
  FlushPolicy 
 
 . 
  MinFlushSizeFlushPolicy 
 
  
 flushPolicy 
 ) 
  
 { 
  
 Thread 
  
 writeThread 
  
 = 
  
 new 
  
 Thread 
 ( 
  
 () 
  
 - 
>  
 { 
  
 try 
  
 { 
  
 for 
  
 ( 
 long 
  
 written 
  
 = 
  
 0 
 ; 
  
 written 
 < 
 totalToWrite 
 ; 
  
 ) 
  
 { 
  
 byte 
  
 alphaOffset 
  
 = 
  
 ( 
 byte 
 ) 
  
 ( 
 written 
  
 % 
  
 0x1a 
 ); 
  
 ByteBuffer 
  
 buf 
  
 = 
  
 ByteBuffer 
 . 
 wrap 
 ( 
 new 
  
 byte 
 [] 
  
 {( 
 byte 
 ) 
  
 ( 
 0x41 
  
 + 
  
 alphaOffset 
 )}); 
  
 int 
  
 w 
  
 = 
  
  StorageChannelUtils 
 
 . 
  blockingEmptyTo 
 
 ( 
 buf 
 , 
  
 channel 
 ); 
  
 written 
  
 += 
  
 w 
 ; 
  
 if 
  
 ( 
 written 
  
 % 
  
 flushPolicy 
 . 
  getMinFlushSize 
 
 () 
  
 == 
  
 0 
 ) 
  
 { 
  
 channel 
 . 
  flush 
 
 (); 
  
 Thread 
 . 
 sleep 
 ( 
 40 
 ); 
  
 } 
  
 } 
  
 channel 
 . 
  closeWithoutFinalizing 
 
 (); 
  
 } 
  
 catch 
  
 ( 
 IOException 
  
 | 
  
 InterruptedException 
  
 e 
 ) 
  
 { 
  
 throw 
  
 new 
  
 RuntimeException 
 ( 
 e 
 ); 
  
 } 
  
 }); 
  
 writeThread 
 . 
 start 
 (); 
  
 return 
  
 writeThread 
 ; 
  
 } 
 } 
 

Python

For more information, see the Cloud Storage Python API reference documentation .

To authenticate to Cloud Storage, set up Application Default Credentials. For more information, see Set up authentication for client libraries .

The following sample reads the tail of an appendable object:

  async 
 def 
  
 appender 
 ( 
 writer 
 : 
 AsyncAppendableObjectWriter 
 , 
 duration 
 : 
 int 
 ): 
  
 """Appends 10 bytes to the object every second for a given duration.""" 
 print 
 ( 
 "Appender started." 
 ) 
 bytes_appended 
 = 
 0 
 start_time 
 = 
 time 
 . 
 monotonic 
 () 
 # Run the appender for the specified duration. 
 while 
 time 
 . 
 monotonic 
 () 
 - 
 start_time 
< duration 
 : 
 await 
 writer 
 . 
 append 
 ( 
 BYTES_TO_APPEND 
 ) 
 now 
 = 
 datetime 
 . 
 now 
 () 
 . 
 strftime 
 ( 
 "%Y-%m- 
 %d 
 %H:%M:%S. 
 %f 
 " 
 )[: 
 - 
 3 
 ] 
 bytes_appended 
 += 
 NUM_BYTES_TO_APPEND_EVERY_SECOND 
 print 
 ( 
 f 
 "[ 
 { 
 now 
 } 
 ] Appended 
 { 
 NUM_BYTES_TO_APPEND_EVERY_SECOND 
 } 
 new bytes. Total appended: 
 { 
 bytes_appended 
 } 
 bytes." 
 ) 
 await 
 asyncio 
 . 
 sleep 
 ( 
 0.1 
 ) 
 print 
 ( 
 "Appender finished." 
 ) 
 async 
 def 
  
 tailer 
 ( 
 bucket_name 
 : 
 str 
 , 
 object_name 
 : 
 str 
 , 
 duration 
 : 
 int 
 , 
 client 
 : 
 AsyncGrpcClient 
 ): 
  
 """Tails the object by reading new data as it is appended.""" 
 print 
 ( 
 "Tailer started." 
 ) 
 start_byte 
 = 
 0 
 start_time 
 = 
 time 
 . 
 monotonic 
 () 
 mrd 
 = 
 AsyncMultiRangeDownloader 
 ( 
 client 
 , 
 bucket_name 
 , 
 object_name 
 ) 
 try 
 : 
 await 
 mrd 
 . 
 open 
 () 
 # Run the tailer for the specified duration. 
 while 
 time 
 . 
 monotonic 
 () 
 - 
 start_time 
< duration 
 : 
 output_buffer 
 = 
 BytesIO 
 () 
 # A download range of (start, 0) means to read from 'start' to the end. 
 await 
 mrd 
 . 
 download_ranges 
 ([( 
 start_byte 
 , 
 0 
 , 
 output_buffer 
 )]) 
 bytes_downloaded 
 = 
 output_buffer 
 . 
 getbuffer 
 () 
 . 
 nbytes 
 if 
 bytes_downloaded 
> 0 
 : 
 now 
 = 
 datetime 
 . 
 now 
 () 
 . 
 strftime 
 ( 
 "%Y-%m- 
 %d 
 %H:%M:%S. 
 %f 
 " 
 )[: 
 - 
 3 
 ] 
 print 
 ( 
 f 
 "[ 
 { 
 now 
 } 
 ] Tailer read 
 { 
 bytes_downloaded 
 } 
 new bytes: " 
 ) 
 start_byte 
 += 
 bytes_downloaded 
 await 
 asyncio 
 . 
 sleep 
 ( 
 0.1 
 ) 
 # Poll for new data every 0.1 seconds. 
 finally 
 : 
 if 
 mrd 
 . 
 is_stream_open 
 : 
 await 
 mrd 
 . 
 close 
 () 
 print 
 ( 
 "Tailer finished." 
 ) 
 # read_appendable_object_tail simulates a "tail -f" command on a GCS object. It 
 # repeatedly polls an appendable object for new content. In a real 
 # application, the object would be written to by a separate process. 
 async 
 def 
  
 read_appendable_object_tail 
 ( 
 bucket_name 
 : 
 str 
 , 
 object_name 
 : 
 str 
 , 
 duration 
 : 
 int 
 , 
 grpc_client 
 = 
 None 
 ): 
  
 """Main function to create an appendable object and run tasks. 
 grpc_client: an existing grpc_client to use, this is only for testing. 
 """ 
 if 
 grpc_client 
 is 
 None 
 : 
 grpc_client 
 = 
 AsyncGrpcClient 
 () 
 writer 
 = 
 AsyncAppendableObjectWriter 
 ( 
 client 
 = 
 grpc_client 
 , 
 bucket_name 
 = 
 bucket_name 
 , 
 object_name 
 = 
 object_name 
 , 
 ) 
 # 1. Create an empty appendable object. 
 try 
 : 
 # 1. Create an empty appendable object. 
 await 
 writer 
 . 
 open 
 () 
 print 
 ( 
 f 
 "Created empty appendable object: 
 { 
 object_name 
 } 
 " 
 ) 
 # 2. Create the appender and tailer coroutines. 
 appender_task 
 = 
 asyncio 
 . 
 create_task 
 ( 
 appender 
 ( 
 writer 
 , 
 duration 
 )) 
 tailer_task 
 = 
 asyncio 
 . 
 create_task 
 ( 
 tailer 
 ( 
 bucket_name 
 , 
 object_name 
 , 
 duration 
 , 
 grpc_client 
 ) 
 ) 
 # 3. Execute the coroutines concurrently. 
 await 
 asyncio 
 . 
 gather 
 ( 
 appender_task 
 , 
 tailer_task 
 ) 
 finally 
 : 
 if 
 writer 
 . 
 _is_stream_open 
 : 
 await 
 writer 
 . 
 close 
 () 
 print 
 ( 
 "Writer closed." 
 ) 
 

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: