Create topic with Cloud Storage ingestion

Create a Pub/Sub topic that ingests from a Cloud Storage bucket

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 namespace 
  
 pubsub_admin 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub_admin 
 ; 
 []( 
 pubsub_admin 
 :: 
 TopicAdminClient 
  
 client 
 , 
  
 std 
 :: 
 string 
  
 project_id 
 , 
  
 std 
 :: 
 string 
  
 topic_id 
 , 
  
 std 
 :: 
 string 
  
 bucket 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 input_format 
 , 
  
 std 
 :: 
 string 
  
 text_delimiter 
 , 
  
 std 
 :: 
 string 
  
 match_glob 
 , 
  
 std 
 :: 
 string 
  
 const 
&  
 minimum_object_create_time 
 ) 
  
 { 
  
 google 
 :: 
 pubsub 
 :: 
 v1 
 :: 
 Topic 
  
 request 
 ; 
  
 request 
 . 
 set_name 
 ( 
  
 pubsub 
 :: 
 Topic 
 ( 
 std 
 :: 
 move 
 ( 
 project_id 
 ), 
  
 std 
 :: 
 move 
 ( 
 topic_id 
 )). 
 FullName 
 ()); 
  
 auto 
&  
 cloud_storage 
  
 = 
  
 * 
 request 
 . 
 mutable_ingestion_data_source_settings 
 () 
  
 - 
> mutable_cloud_storage 
 (); 
  
 cloud_storage 
 . 
 set_bucket 
 ( 
 std 
 :: 
 move 
 ( 
 bucket 
 )); 
  
 if 
  
 ( 
 input_format 
  
 == 
  
 "text" 
 ) 
  
 { 
  
 cloud_storage 
 . 
 mutable_text_format 
 () 
 - 
> set_delimiter 
 ( 
  
 std 
 :: 
 move 
 ( 
 text_delimiter 
 )); 
  
 } 
  
 else 
  
 if 
  
 ( 
 input_format 
  
 == 
  
 "avro" 
 ) 
  
 { 
  
 cloud_storage 
 . 
 mutable_avro_format 
 (); 
  
 } 
  
 else 
  
 if 
  
 ( 
 input_format 
  
 == 
  
 "pubsub_avro" 
 ) 
  
 { 
  
 cloud_storage 
 . 
 mutable_pubsub_avro_format 
 (); 
  
 } 
  
 else 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "input_format must be in ('text', 'avro', 'pubsub_avro'); " 
  
 "got value: " 
 << 
 input_format 
 << 
 std 
 :: 
 endl 
 ; 
  
 return 
 ; 
  
 } 
  
 if 
  
 ( 
 ! 
 match_glob 
 . 
 empty 
 ()) 
  
 { 
  
 cloud_storage 
 . 
 set_match_glob 
 ( 
 std 
 :: 
 move 
 ( 
 match_glob 
 )); 
  
 } 
  
 if 
  
 ( 
 ! 
 minimum_object_create_time 
 . 
 empty 
 ()) 
  
 { 
  
 google 
 :: 
 protobuf 
 :: 
 Timestamp 
  
 timestamp 
 ; 
  
 if 
  
 ( 
 ! 
 google 
 :: 
 protobuf 
 :: 
 util 
 :: 
 TimeUtil 
 :: 
 FromString 
 ( 
  
 minimum_object_create_time 
 , 
  
 cloud_storage 
 . 
 mutable_minimum_object_create_time 
 ())) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Invalid minimum object create time: " 
 << 
 minimum_object_create_time 
 << 
 std 
 :: 
 endl 
 ; 
  
 } 
  
 } 
  
 auto 
  
 topic 
  
 = 
  
 client 
 . 
 CreateTopic 
 ( 
 request 
 ); 
  
 // Note that kAlreadyExists is a possible error when the library retries. 
  
 if 
  
 ( 
 topic 
 . 
 status 
 (). 
 code 
 () 
  
 == 
  
 google 
 :: 
 cloud 
 :: 
 StatusCode 
 :: 
 kAlreadyExists 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "The topic already exists 
 \n 
 " 
 ; 
  
 return 
 ; 
  
 } 
  
 if 
  
 ( 
 ! 
 topic 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 topic 
 ). 
 status 
 (); 
  
 std 
 :: 
 cout 
 << 
 "The topic was successfully created: " 
 << 
 topic 
 - 
> DebugString 
 () 
 << 
 " 
 \n 
 " 
 ; 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Google.Protobuf.WellKnownTypes 
 
 ; 
 using 
  
  Grpc.Core 
 
 ; 
 using 
  
 System 
 ; 
 public 
  
 class 
  
 CreateTopicWithCloudStorageIngestionSample 
 { 
  
 public 
  
 Topic 
  
 CreateTopicWithCloudStorageIngestion 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 string 
  
 bucket 
 , 
  
 string 
  
 inputFormat 
 , 
  
 string 
  
 textDelimiter 
 , 
  
 string 
  
 matchGlob 
 , 
  
 DateTimeOffset 
  
 minimumObjectCreateTime 
 ) 
  
 { 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 CloudStorage 
  
 cloudStorageSettings 
  
 = 
  
 new 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 CloudStorage 
  
 { 
  
 Bucket 
  
 = 
  
 bucket 
 , 
  
 MinimumObjectCreateTime 
  
 = 
  
  Timestamp 
 
 . 
  FromDateTimeOffset 
 
 ( 
 minimumObjectCreateTime 
 ), 
  
 }; 
  
 switch 
  
 ( 
 inputFormat 
 ) 
  
 { 
  
 case 
  
 "text" 
 : 
  
 cloudStorageSettings 
 . 
 TextFormat 
  
 = 
  
 new 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 CloudStorage 
 . 
 Types 
 . 
 TextFormat 
  
 { 
  
 Delimiter 
  
 = 
  
 textDelimiter 
  
 }; 
  
 break 
 ; 
  
 case 
  
 "avro" 
 : 
  
 cloudStorageSettings 
 . 
 AvroFormat 
  
 = 
  
 new 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 CloudStorage 
 . 
 Types 
 . 
 AvroFormat 
 (); 
  
 break 
 ; 
  
 case 
  
 "pubsub_avro" 
 : 
  
 cloudStorageSettings 
 . 
 PubsubAvroFormat 
  
 = 
  
 new 
  
 IngestionDataSourceSettings 
 . 
 Types 
 . 
 CloudStorage 
 . 
 Types 
 . 
 PubSubAvroFormat 
 (); 
  
 break 
 ; 
  
 default 
 : 
  
 throw 
  
 new 
  
  RpcException 
 
 ( 
 new 
  
  Status 
 
 ( 
  StatusCode 
 
 . 
  InvalidArgument 
 
 , 
  
 $"inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: {inputFormat}" 
 )); 
  
 } 
  
 if 
  
 ( 
 ! 
 string 
 . 
 IsNullOrEmpty 
 ( 
 matchGlob 
 )) 
  
 { 
  
 cloudStorageSettings 
 . 
  MatchGlob 
 
  
 = 
  
 matchGlob 
 ; 
  
 } 
  
  PublisherServiceApiClient 
 
  
 publisher 
  
 = 
  
  PublisherServiceApiClient 
 
 . 
  Create 
 
 (); 
  
  Topic 
 
  
 topic 
  
 = 
  
 new 
  
  Topic 
 
 () 
  
 { 
  
 Name 
  
 = 
  
  TopicName 
 
 . 
  FormatProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ), 
  
 IngestionDataSourceSettings 
  
 = 
  
 new 
  
  IngestionDataSourceSettings 
 
 () 
  
 { 
  
 CloudStorage 
  
 = 
  
 cloudStorageSettings 
  
 } 
  
 }; 
  
  Topic 
 
  
 createdTopic 
  
 = 
  
 publisher 
 . 
  CreateTopic 
 
 ( 
 topic 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Topic {createdTopic. Name 
} created." 
 ); 
  
 return 
  
 createdTopic 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" 
  
 "google.golang.org/protobuf/types/known/timestamppb" 
 ) 
 func 
  
 createTopicWithCloudStorageIngestion 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
 , 
  
 bucket 
 , 
  
 matchGlob 
 , 
  
 minimumObjectCreateTime 
 , 
  
 delimiter 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 // bucket := "my-bucket" 
  
 // matchGlob := "**.txt" 
  
 // minimumObjectCreateTime := "2006-01-02T15:04:05Z" 
  
 // delimiter := "," 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 minCreateTime 
 , 
  
 err 
  
 := 
  
 time 
 . 
 Parse 
 ( 
 time 
 . 
 RFC3339 
 , 
  
 minimumObjectCreateTime 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 err 
  
 } 
  
 topicpb 
  
 := 
  
& pubsubpb 
 . 
 Topic 
 { 
  
 Name 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/topics/%s" 
 , 
  
 projectID 
 , 
  
 topicID 
 ), 
  
 IngestionDataSourceSettings 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings 
 { 
  
 Source 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_CloudStorage_ 
 { 
  
 CloudStorage 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_CloudStorage 
 { 
  
 Bucket 
 : 
  
 bucket 
 , 
  
 // Alternatively, can be Avro or PubSubAvro formats. See 
  
 InputFormat 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_CloudStorage_TextFormat_ 
 { 
  
 TextFormat 
 : 
  
& pubsubpb 
 . 
 IngestionDataSourceSettings_CloudStorage_TextFormat 
 { 
  
 Delimiter 
 : 
  
& delimiter 
 , 
  
 }, 
  
 }, 
  
 MatchGlob 
 : 
  
 matchGlob 
 , 
  
 MinimumObjectCreateTime 
 : 
  
 timestamppb 
 . 
 New 
 ( 
 minCreateTime 
 ), 
  
 }, 
  
 }, 
  
 }, 
  
 } 
  
 t 
 , 
  
 err 
  
 := 
  
 client 
 . 
 TopicAdminClient 
 . 
 CreateTopic 
 ( 
 ctx 
 , 
  
 topicpb 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "CreateTopic: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Cloud storage topic created: %v\n" 
 , 
  
 t 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.cloud.pubsub.v1. TopicAdminClient 
 
 ; 
 import 
  
 com.google.protobuf.util. Timestamps 
 
 ; 
 import 
  
 com.google.pubsub.v1. IngestionDataSourceSettings 
 
 ; 
 import 
  
 com.google.pubsub.v1. Topic 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.text.ParseException 
 ; 
 public 
  
 class 
 CreateTopicWithCloudStorageIngestionExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 // Cloud Storage ingestion settings. 
  
 // bucket and inputFormat are required arguments. 
  
 String 
  
 bucket 
  
 = 
  
 "your-bucket" 
 ; 
  
 String 
  
 inputFormat 
  
 = 
  
 "text" 
 ; 
  
 String 
  
 textDelimiter 
  
 = 
  
 "\n" 
 ; 
  
 String 
  
 matchGlob 
  
 = 
  
 "**.txt" 
 ; 
  
 String 
  
 minimumObjectCreateTime 
  
 = 
  
 "YYYY-MM-DDThh:mm:ssZ" 
 ; 
  
 createTopicWithCloudStorageIngestionExample 
 ( 
  
 projectId 
 , 
  
 topicId 
 , 
  
 bucket 
 , 
  
 inputFormat 
 , 
  
 textDelimiter 
 , 
  
 matchGlob 
 , 
  
 minimumObjectCreateTime 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 createTopicWithCloudStorageIngestionExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 , 
  
 String 
  
 bucket 
 , 
  
 String 
  
 inputFormat 
 , 
  
 String 
  
 textDelimiter 
 , 
  
 String 
  
 matchGlob 
 , 
  
 String 
  
 minimumObjectCreateTime 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 try 
  
 ( 
  TopicAdminClient 
 
  
 topicAdminClient 
  
 = 
  
  TopicAdminClient 
 
 . 
 create 
 ()) 
  
 { 
  
  IngestionDataSourceSettings 
 
 . 
  CloudStorage 
 
 . 
 Builder 
  
 cloudStorageBuilder 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 CloudStorage 
 . 
 newBuilder 
 (). 
 setBucket 
 ( 
 bucket 
 ); 
  
 switch 
  
 ( 
 inputFormat 
 ) 
  
 { 
  
 case 
  
 "text" 
 : 
  
 cloudStorageBuilder 
 . 
  setTextFormat 
 
 ( 
  
  IngestionDataSourceSettings 
 
 . 
 CloudStorage 
 . 
 TextFormat 
 . 
 newBuilder 
 () 
  
 . 
  setDelimiter 
 
 ( 
 textDelimiter 
 ) 
  
 . 
 build 
 ()); 
  
 break 
 ; 
  
 case 
  
 "avro" 
 : 
  
 cloudStorageBuilder 
 . 
  setAvroFormat 
 
 ( 
  
  IngestionDataSourceSettings 
 
 . 
 CloudStorage 
 . 
 AvroFormat 
 . 
 getDefaultInstance 
 ()); 
  
 break 
 ; 
  
 case 
  
 "pubsub_avro" 
 : 
  
 cloudStorageBuilder 
 . 
  setPubsubAvroFormat 
 
 ( 
  
  IngestionDataSourceSettings 
 
 . 
 CloudStorage 
 . 
 PubSubAvroFormat 
 . 
 getDefaultInstance 
 ()); 
  
 break 
 ; 
  
 default 
 : 
  
 throw 
  
 new 
  
 IllegalArgumentException 
 ( 
  
 "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " 
  
 + 
  
 inputFormat 
 ); 
  
 } 
  
 if 
  
 ( 
 matchGlob 
  
 != 
  
 null 
 && 
 ! 
 matchGlob 
 . 
 isEmpty 
 ()) 
  
 { 
  
 cloudStorageBuilder 
 . 
  setMatchGlob 
 
 ( 
 matchGlob 
 ); 
  
 } 
  
 if 
  
 ( 
 minimumObjectCreateTime 
  
 != 
  
 null 
 && 
 ! 
 minimumObjectCreateTime 
 . 
 isEmpty 
 ()) 
  
 { 
  
 try 
  
 { 
  
 cloudStorageBuilder 
 . 
  setMinimumObjectCreateTime 
 
 ( 
  Timestamps 
 
 . 
 parse 
 ( 
 minimumObjectCreateTime 
 )); 
  
 } 
  
 catch 
  
 ( 
 ParseException 
  
 e 
 ) 
  
 { 
  
 System 
 . 
 err 
 . 
 println 
 ( 
 "Unable to parse timestamp: " 
  
 + 
  
 minimumObjectCreateTime 
 ); 
  
 } 
  
 } 
  
  IngestionDataSourceSettings 
 
  
 ingestionDataSourceSettings 
  
 = 
  
  IngestionDataSourceSettings 
 
 . 
 newBuilder 
 () 
  
 . 
  setCloudStorage 
 
 ( 
 cloudStorageBuilder 
 . 
 build 
 ()) 
  
 . 
 build 
 (); 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Topic 
 
  
 topic 
  
 = 
  
 topicAdminClient 
 . 
 createTopic 
 ( 
  
  Topic 
 
 . 
 newBuilder 
 () 
  
 . 
 setName 
 ( 
 topicName 
 . 
  toString 
 
 ()) 
  
 . 
  setIngestionDataSourceSettings 
 
 ( 
 ingestionDataSourceSettings 
 ) 
  
 . 
 build 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "Created topic with Cloud Storage ingestion settings: " 
  
 + 
  
 topic 
 . 
 getAllFields 
 ()); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
  
 * 
  
 TODO 
 ( 
 developer 
 ): 
  
 Uncomment 
  
 these 
  
 variables 
  
 before 
  
 running 
  
 the 
  
 sample 
 . 
  
 */ 
 // 
  
 const 
  
 topicNameOrId 
  
 = 
  
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
  
 const 
  
 bucket 
  
 = 
  
 'YOUR_BUCKET_NAME' 
 ; 
 // 
  
 const 
  
 inputFormat 
  
 = 
  
 'text' 
 ; 
 // 
  
 const 
  
 textDelimiter 
  
 = 
  
 ' 
 \n 
 ' 
 ; 
 // 
  
 const 
  
 matchGlob 
  
 = 
  
 '**.txt' 
 ; 
 // 
  
 const 
  
 minimumObjectCreateTime 
  
 = 
  
 'YYYY-MM-DDThh:mm:ssZ; 
 // 
  
 Imports 
  
 the 
  
 Google 
  
 Cloud 
  
 client 
  
 library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 '@google-cloud/pubsub' 
 ); 
 // 
  
 Creates 
  
 a 
  
 client 
 ; 
  
 cache 
  
 this 
  
 for 
  
 further 
  
 use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 createTopicWithCloudStorageIngestion 
 ( 
  
 topicNameOrId 
 , 
  
 bucket 
 , 
  
 inputFormat 
 , 
  
 textDelimiter 
 , 
  
 matchGlob 
 , 
  
 minimumObjectCreateTime 
 , 
 ) 
  
 { 
  
 const 
  
 minimumDate 
  
 = 
  
 Date 
 . 
 parse 
 ( 
 minimumObjectCreateTime 
 ); 
  
 const 
  
 topicMetadata 
  
 = 
  
 { 
  
 name 
 : 
  
 topicNameOrId 
 , 
  
 ingestionDataSourceSettings 
 : 
  
 { 
  
 cloudStorage 
 : 
  
 { 
  
 bucket 
 , 
  
 minimumObjectCreateTime 
 : 
  
 { 
  
 seconds 
 : 
  
 minimumDate 
  
 / 
  
 1000 
 , 
  
 nanos 
 : 
  
 ( 
 minimumDate 
  
 % 
  
 1000 
 ) 
  
 * 
  
 1000 
 , 
  
 }, 
  
 matchGlob 
 , 
  
 }, 
  
 }, 
  
 }; 
  
 // 
  
 Make 
  
 a 
  
 format 
  
 appropriately 
 . 
  
 switch 
  
 ( 
 inputFormat 
 ) 
  
 { 
  
 case 
  
 'text' 
 : 
  
 topicMetadata 
 . 
 ingestionDataSourceSettings 
 . 
 cloudStorage 
 . 
 textFormat 
  
 = 
  
 { 
  
 delimiter 
 : 
  
 textDelimiter 
 , 
  
 }; 
  
 break 
 ; 
  
 case 
  
 'avro' 
 : 
  
 topicMetadata 
 . 
 ingestionDataSourceSettings 
 . 
 cloudStorage 
 . 
 avroFormat 
  
 = 
  
 {}; 
  
 break 
 ; 
  
 case 
  
 'pubsub_avro' 
 : 
  
 topicMetadata 
 . 
 ingestionDataSourceSettings 
 . 
 cloudStorage 
 . 
 pubsubAvroFormat 
  
 = 
  
 {}; 
  
 break 
 ; 
  
 default 
 : 
  
 console 
 . 
 error 
 ( 
 'inputFormat must be in ("text", "avro", "pubsub_avro")' 
 ); 
  
 return 
 ; 
  
 } 
  
 // 
  
 Creates 
  
 a 
  
 new 
  
 topic 
  
 with 
  
 Cloud 
  
 Storage 
  
 ingestion 
 . 
  
 await 
  
 pubSubClient 
 . 
 createTopic 
 ( 
 topicMetadata 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
 Topic 
  
 $ 
 { 
 topicNameOrId 
 } 
  
 created 
  
 with 
  
 Cloud 
  
 Storage 
  
 ingestion 
 . 
 ` 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Node.js API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  /** 
 * 
 TODO 
 ( 
 developer 
 ): 
 Uncomment 
 these 
 variables 
 before 
 running 
 the 
 sample 
 . 
 */ 
 // 
 const 
 topicNameOrId 
 = 
 'YOUR_TOPIC_NAME_OR_ID' 
 ; 
 // 
 const 
 bucket 
 = 
 'YOUR_BUCKET_NAME' 
 ; 
 // 
 const 
 inputFormat 
 = 
 'text' 
 ; 
 // 
 const 
 textDelimiter 
 = 
 ' 
 \n 
 ' 
 ; 
 // 
 const 
 matchGlob 
 = 
 '**.txt' 
 ; 
 // 
 const 
 minimumObjectCreateTime 
 = 
 'YYYY-MM-DDThh:mm:ssZ; 
 // 
 Imports 
 the 
 Google 
 Cloud 
 client 
 library 
 import 
  
 { 
 PubSub 
 , 
 TopicMetadata 
 } 
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // 
 Creates 
 a 
 client 
 ; 
 cache 
 this 
 for 
 further 
 use 
 const 
 pubSubClient 
 = 
 new 
 PubSub 
 (); 
 async 
 function 
 createTopicWithCloudStorageIngestion 
 ( 
 topicNameOrId 
 : 
 string 
 , 
 bucket 
 : 
 string 
 , 
 inputFormat 
 : 
 string 
 , 
 textDelimiter 
 : 
 string 
 , 
 matchGlob 
 : 
 string 
 , 
 minimumObjectCreateTime 
 : 
 string 
 , 
 ) 
 { 
 const 
 minimumDate 
 = 
 Date 
 . 
 parse 
 ( 
 minimumObjectCreateTime 
 ); 
 const 
 topicMetadata 
 : 
 TopicMetadata 
 = 
 { 
 name 
 : 
 topicNameOrId 
 , 
 ingestionDataSourceSettings 
 : 
 { 
 cloudStorage 
 : 
 { 
 bucket 
 , 
 minimumObjectCreateTime 
 : 
 { 
 seconds 
 : 
 minimumDate 
 / 
 1000 
 , 
 nanos 
 : 
 ( 
 minimumDate 
 % 
 1000 
 ) 
 * 
 1000 
 , 
 }, 
 matchGlob 
 , 
 }, 
 }, 
 }; 
 // 
 Make 
 a 
 format 
 appropriately 
 . 
 switch 
 ( 
 inputFormat 
 ) 
 { 
 case 
 'text' 
 : 
 topicMetadata 
 . 
 ingestionDataSourceSettings 
 ! 
 . 
 cloudStorage 
 ! 
 . 
 textFormat 
 = 
 { 
 delimiter 
 : 
 textDelimiter 
 , 
 }; 
 break 
 ; 
 case 
 'avro' 
 : 
 topicMetadata 
 . 
 ingestionDataSourceSettings 
 ! 
 . 
 cloudStorage 
 ! 
 . 
 avroFormat 
 = 
 {}; 
 break 
 ; 
 case 
 'pubsub_avro' 
 : 
 topicMetadata 
 . 
 ingestionDataSourceSettings 
 ! 
 . 
 cloudStorage 
 ! 
 . 
 pubsubAvroFormat 
 = 
 {}; 
 break 
 ; 
 default 
 : 
 console 
 . 
 error 
 ( 
 'inputFormat must be in ("text", "avro", "pubsub_avro")' 
 ); 
 return 
 ; 
 } 
 // 
 Creates 
 a 
 new 
 topic 
 with 
 Cloud 
 Storage 
 ingestion 
 . 
 await 
 pubSubClient 
 . 
 createTopic 
 ( 
 topicMetadata 
 ); 
 console 
 . 
 log 
 ( 
 ` 
 Topic 
 $ 
 { 
 topicNameOrId 
 } 
 created 
 with 
 Cloud 
 Storage 
 ingestion 
 . 
 ` 
 ); 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub PHP API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  use Google\Cloud\PubSub\PubSubClient; 
 use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\AvroFormat; 
 use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\PubSubAvroFormat; 
 use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\TextFormat; 
 use Google\Protobuf\Timestamp; 
 /** 
 * Creates a topic with Cloud Storage Ingestion. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $topicName  The Pub/Sub topic name. 
 * @param string $bucket  Cloud Storage bucket. 
 * @param string $inputFormat  Input format for the Cloud Storage data. Must be one of text, avro, or pubsub_avro. 
 * @param string $minimumObjectCreatedTime  Only objects with a larger or equal creation timestamp will be ingested. 
 * @param string $textDelimiter  Delimiter for text format input. 
 * @param string $matchGlob  Glob pattern used to match objects that will be ingested. If unset, all objects will be ingested. 
 */ 
 function create_topic_with_cloud_storage_ingestion( 
 string $projectId, 
 string $topicName, 
 string $bucket, 
 string $inputFormat, 
 string $minimumObjectCreatedTime, 
 string $textDelimiter = '', 
 string $matchGlob = '' 
 ): void { 
 $datetime = new \DateTimeImmutable($minimumObjectCreatedTime); 
 $timestamp = (new Timestamp()) 
 ->setSeconds($datetime->getTimestamp()) 
 ->setNanos($datetime->format('u') * 1000); 
 $cloudStorageData = [ 
 'bucket' => $bucket, 
 'minimum_object_create_time' => $timestamp 
 ]; 
 $cloudStorageData[$inputFormat . '_format'] = match($inputFormat) { 
 'text' => new TextFormat(['delimiter' => $textDelimiter]), 
 'avro' => new AvroFormat(), 
 'pubsub_avro' => new PubSubAvroFormat(), 
 default => throw new \InvalidArgumentException( 
 'inputFormat must be in (\'text\', \'avro\', \'pubsub_avro\'); got value: ' . $inputFormat 
 ) 
 }; 
 if (!empty($matchGlob)) { 
 $cloudStorageData['match_glob'] = $matchGlob; 
 } 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $topic = $pubsub->createTopic($topicName, [ 
 'ingestionDataSourceSettings' => [ 
 'cloud_storage' => $cloudStorageData 
 ] 
 ]); 
 printf('Topic created: %s' . PHP_EOL, $topic->name()); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Python API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 google.protobuf 
  
 import 
 timestamp_pb2 
 from 
  
 google.pubsub_v1.types 
  
 import 
  Topic 
 
 from 
  
 google.pubsub_v1.types 
  
 import 
  IngestionDataSourceSettings 
 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 # bucket = "your-bucket" 
 # input_format = "text"  (can be one of "text", "avro", "pubsub_avro") 
 # text_delimiter = "\n" 
 # match_glob = "**.txt" 
 # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ" 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 () 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 cloud_storage_settings 
 = 
  IngestionDataSourceSettings 
 
 . 
  CloudStorage 
 
 ( 
 bucket 
 = 
 bucket 
 , 
 ) 
 if 
 input_format 
 == 
 "text" 
 : 
 cloud_storage_settings 
 . 
 text_format 
 = 
 ( 
  IngestionDataSourceSettings 
 
 . 
  CloudStorage 
 
 . 
  TextFormat 
 
 ( 
 delimiter 
 = 
 text_delimiter 
 ) 
 ) 
 elif 
 input_format 
 == 
 "avro" 
 : 
 cloud_storage_settings 
 . 
 avro_format 
 = 
 ( 
  IngestionDataSourceSettings 
 
 . 
  CloudStorage 
 
 . 
  AvroFormat 
 
 () 
 ) 
 elif 
 input_format 
 == 
 "pubsub_avro" 
 : 
 cloud_storage_settings 
 . 
 pubsub_avro_format 
 = 
 ( 
  IngestionDataSourceSettings 
 
 . 
  CloudStorage 
 
 . 
  PubSubAvroFormat 
 
 () 
 ) 
 else 
 : 
 print 
 ( 
 "Invalid input_format: " 
 + 
 input_format 
 + 
 "; must be in ('text', 'avro', 'pubsub_avro')" 
 ) 
 return 
 if 
 match_glob 
 : 
 cloud_storage_settings 
 . 
 match_glob 
 = 
 match_glob 
 if 
 minimum_object_create_time 
 : 
 try 
 : 
 minimum_object_create_time_timestamp 
 = 
 timestamp_pb2 
 . 
  Timestamp 
 
 () 
 minimum_object_create_time_timestamp 
 . 
 FromJsonString 
 ( 
 minimum_object_create_time 
 ) 
 cloud_storage_settings 
 . 
 minimum_object_create_time 
 = 
 ( 
 minimum_object_create_time_timestamp 
 ) 
 except 
 ValueError 
 : 
 print 
 ( 
 "Invalid minimum_object_create_time: " 
 + 
 minimum_object_create_time 
 ) 
 return 
 request 
 = 
 Topic 
 ( 
 name 
 = 
 topic_path 
 , 
 ingestion_data_source_settings 
 = 
 IngestionDataSourceSettings 
 ( 
 cloud_storage 
 = 
 cloud_storage_settings 
 , 
 ), 
 ) 
 topic 
 = 
 publisher 
 . 
 create_topic 
 ( 
 request 
 = 
 request 
 ) 
 print 
 ( 
 f 
 "Created topic: 
 { 
 topic 
 . 
 name 
 } 
 with Cloud Storage Ingestion Settings" 
 ) 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 # bucket = "your-bucket-id" 
 # input_format = "text" 
 # text_delimiter = "\n" 
 # match_glob = "**.txt" 
 # minimum_object_create_time = Google::Protobuf::Timestamp.new 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  Pubsub 
 
 . 
 new 
 topic_admin 
  
 = 
  
 pubsub 
 . 
 topic_admin 
 cloud_storage 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
 PubSub 
 :: 
 V1 
 :: 
 IngestionDataSourceSettings 
 :: 
 CloudStorage 
 settings 
  
 = 
  
 cloud_storage 
 . 
 new 
  
 \ 
  
 bucket 
 : 
  
 bucket 
 , 
  
 minimum_object_create_time 
 : 
  
 minimum_object_create_time 
 case 
  
 input_format 
 when 
  
 "text" 
  
 settings 
 . 
 text_format 
  
 = 
  
 cloud_storage 
 :: 
 TextFormat 
 . 
 new 
  
 \ 
  
 delimiter 
 : 
  
 text_delimiter 
 when 
  
 "avro" 
  
 settings 
 . 
 avro_format 
  
 = 
  
 cloud_storage 
 :: 
 AvroFormat 
 . 
 new 
 when 
  
 "pubsub_avro" 
  
 settings 
 . 
 pubsub_avro_format 
  
 = 
  
 cloud_storage 
 :: 
 PubSubAvroFormat 
 . 
 new 
 else 
  
 raise 
  
 "input_format must be in ('text', 'avro', 'pubsub_avro');" 
  
 \ 
  
 "got value: 
 #{ 
 input_format 
 } 
 " 
 end 
 if 
  
 ! 
 match_glob 
 . 
 nil? 
 && 
 ! 
 match_glob 
 . 
 empty? 
  
 settings 
 . 
 match_glob 
  
 = 
  
 match_glob 
 end 
 topic 
  
 = 
  
 topic_admin 
 . 
 create_topic 
  
 name 
 : 
  
 pubsub 
 . 
 topic_path 
 ( 
 topic_id 
 ), 
  
 ingestion_data_source_settings 
 : 
  
 { 
  
 cloud_storage 
 : 
  
 settings 
  
 } 
 puts 
  
 "Topic with Cloud Storage Ingestion 
 #{ 
 topic 
 . 
 name 
 } 
 created." 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: