Enqueue functions with Cloud Tasks

Task queue functions take advantage of Google Cloud Tasks to help your app run time-consuming, resource-intensive, or bandwidth-limited tasks asynchronously, outside your main application flow.

For example, imagine that you want to create backups of a large set of image files that are currently hosted on an API with a rate limit. In order to be a responsible consumer of that API, you need to respect their rate limits. Plus, this kind of long-running job could be vulnerable to failure due to timeouts and memory limits.

To mitigate this complexity, you can write a task queue function that sets basic task options like scheduleTime , and dispatchDeadline , and then hands the function off to a queue in Cloud Tasks . The Cloud Tasks environment is designed specifically to ensure effective congestion control and retry policies for these kinds of operations.

The Firebase SDK for Cloud Functions for Firebase v3.20.1 and higher interoperates with Firebase Admin SDK v10.2.0 and higher to support task queue functions.

Using task queue functions with Firebase can result in charges for Cloud Tasks processing. See Cloud Tasks pricing for more information.

Create task queue functions

To use task queue functions, follow this workflow:

  1. Write a task queue function using the Firebase SDK for Cloud Functions .
  2. Test your function by triggering it with an HTTP request.
  3. Deploy your function with the Firebase CLI. When deploying your task queue function for the first time, the CLI will create a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.
  4. Add tasks to the newly created task queue, passing along parameters to set up an execution schedule if needed. You can achieve this by writing the code using the Admin SDK and deploying it to Cloud Functions for Firebase .

Write task queue functions

Code samples in this section are based on an app that sets up a service that backs up all images from NASA's Astronomy Picture of the Day . To get started, import the required modules:

Node.js

  // Dependencies for task queue functions. 
 const 
  
 { 
 onTaskDispatched 
 } 
  
 = 
  
 require 
 ( 
 "firebase-functions/tasks" 
 ); 
 const 
  
 { 
 onRequest 
 , 
  
 HttpsError 
 } 
  
 = 
  
 require 
 ( 
 "firebase-functions/https" 
 ); 
 const 
  
 { 
 getFunctions 
 } 
  
 = 
  
 require 
 ( 
 "firebase-admin/functions" 
 ); 
 const 
  
 { 
 logger 
 } 
  
 = 
  
 require 
 ( 
 "firebase-functions" 
 ); 
 // Dependencies for image backup. 
 const 
  
 path 
  
 = 
  
 require 
 ( 
 "path" 
 ); 
 const 
  
 { 
 initializeApp 
 } 
  
 = 
  
 require 
 ( 
 "firebase-admin/app" 
 ); 
 const 
  
 { 
 getStorage 
 } 
  
 = 
  
 require 
 ( 
 "firebase-admin/storage" 
 ); 
 const 
  
 { 
 GoogleAuth 
 } 
  
 = 
  
 require 
 ( 
 "google-auth-library" 
 ); 
  
 

Python

  # Dependencies for task queue functions. 
 from 
  
 google.cloud 
  
 import 
 tasks_v2 
 import 
  
 requests 
 from 
  
 firebase_functions.options 
  
 import 
 RetryConfig 
 , 
 RateLimits 
 , 
 SupportedRegion 
 # Dependencies for image backup. 
 from 
  
 datetime 
  
 import 
 datetime 
 , 
 timedelta 
 import 
  
 json 
 import 
  
 pathlib 
 from 
  
 urllib.parse 
  
 import 
 urlparse 
 from 
  
 firebase_admin 
  
 import 
 initialize_app 
 , 
 storage 
 , 
 functions 
 from 
  
 firebase_functions 
  
 import 
 https_fn 
 , 
 tasks_fn 
 , 
 params 
 import 
  
 google.auth 
 from 
  
 google.auth.transport.requests 
  
 import 
 AuthorizedSession  
 
 . 
 py 
 

Use onTaskDispatched or on_task_dispatched for task queue functions. When writing a task queue function you can set per-queue retry and rate- limiting configuration.

Configure task queue functions

Task queue functions come with a powerful set of configuration settings to precisely control rate limits and retry behavior of a task queue:

Node.js

  exports 
 . 
 backupapod 
  
 = 
  
 onTaskDispatched 
 ( 
  
 { 
  
 retryConfig 
 : 
  
 { 
  
 maxAttempts 
 : 
  
 5 
 , 
  
 minBackoffSeconds 
 : 
  
 60 
 , 
  
 }, 
  
 rateLimits 
 : 
  
 { 
  
 maxConcurrentDispatches 
 : 
  
 6 
 , 
  
 }, 
  
 }, 
  
 async 
  
 ( 
 req 
 ) 
  
 = 
>  
 { 
  
 

Python

  @tasks_fn 
 . 
 on_task_dispatched 
 ( 
 retry_config 
 = 
 RetryConfig 
 ( 
 max_attempts 
 = 
 5 
 , 
 min_backoff_seconds 
 = 
 60 
 ), 
 rate_limits 
 = 
 RateLimits 
 ( 
 max_concurrent_dispatches 
 = 
 10 
 )) 
 def 
  
 backupapod 
 ( 
 req 
 : 
 tasks_fn 
 . 
 CallableRequest 
 ) 
 - 
> str 
 : 
  
 """Grabs Astronomy Photo of the Day (APOD) using NASA's API.""" 
  
 
  • retryConfig.maxAttempts=5 : Each task in the task queue is automatically retried up to 5 times. This helps mitigate transient errors like network errors or temporary service disruption of a dependent, external service.

  • retryConfig.minBackoffSeconds=60 : Each task is retried at least 60 seconds apart from each attempt. This provides a large buffer between each attempt so we don't rush to exhaust the 5 retry attempts too quickly.

  • rateLimits.maxConcurrentDispatch=6 : At most 6 tasks are dispatched at a given time. This helps ensure a steady stream of requests to the underlying function and helps reduce the number of active instances and cold starts.

Test task queue functions

For most cases, the Cloud Functions emulator is the best way to test task queue functions. See the Emulator Suite documentation to learn how to instrument your app for task queue functions emulation .

Additionally, task queue functions are exposed as simple HTTP functions in the Firebase Local Emulator Suite . You can test an emulated task function by sending an HTTP POST request with a JSON data payload:

   
 # 
  
 start 
  
 the 
  
  Local 
  
 Emulator 
  
 Suite 
 
  
 firebase 
  
 emulators 
 : 
 start 
  
 # 
  
 trigger 
  
 the 
  
 emulated 
  
 task 
  
 queue 
  
 function 
  
 curl 
  
\  
 - 
 X 
  
 POST 
  
 # 
  
 An 
  
 HTTP 
  
 POST 
  
 request 
 ... 
  
 - 
 H 
  
 "content-type: application/json" 
  
 \ 
  
 # 
  
 ... 
  
 with 
  
 a 
  
 JSON 
  
 body 
  
 http 
 : 
 //localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url 
  
 - 
 d 
  
 ' 
 { 
 "data" 
 : 
  
 { 
  
 ... 
  
 some 
  
 data 
  
 ... 
 . 
  
 }} 
 ' 
  
 # 
  
 ... 
  
 with 
  
 JSON 
  
 encoded 
  
 data 
 

Deploy task queue functions

Deploy task queue function using the Firebase CLI:

 $  
firebase  
deploy  
--only  
functions:backupapod 

When deploying a task queue function for the first time, the CLI creates a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.

If you encounter permissions errors when deploying functions, make sure that the appropriate IAM roles are assigned to the user running the deployment commands.

Enqueue task queue functions

Task queue functions can be enqueued in Cloud Tasks from a trusted server environment like Cloud Functions for Firebase using the Firebase Admin SDK for Node.js or Google Cloud libraries for Python. If you're new to the Admin SDK s, see Add Firebase to a server to get started.

A typical flow creates a new task, enqueues it in Cloud Tasks , and sets the configuration for the task:

Node.js

  exports 
 . 
 enqueuebackuptasks 
  
 = 
  
 onRequest 
 ( 
  
 async 
  
 ( 
 _request 
 , 
  
 response 
 ) 
  
 = 
>  
 { 
  
 const 
  
 queue 
  
 = 
  
 getFunctions 
 (). 
 taskQueue 
 ( 
 "backupapod" 
 ); 
  
 const 
  
 targetUri 
  
 = 
  
 await 
  
 getFunctionUrl 
 ( 
 "backupapod" 
 ); 
  
 const 
  
 enqueues 
  
 = 
  
 []; 
  
 for 
  
 ( 
 let 
  
 i 
  
 = 
  
 0 
 ; 
  
 i 
  
< = 
  
 BACKUP_COUNT 
 ; 
  
 i 
  
 += 
  
 1 
 ) 
  
 { 
  
 const 
  
 iteration 
  
 = 
  
 Math 
 . 
 floor 
 ( 
 i 
  
 / 
  
 HOURLY_BATCH_SIZE 
 ); 
  
 // Delay each batch by N * hour 
  
 const 
  
 scheduleDelaySeconds 
  
 = 
  
 iteration 
  
 * 
  
 ( 
 60 
  
 * 
  
 60 
 ); 
  
 const 
  
 backupDate 
  
 = 
  
 new 
  
 Date 
 ( 
 BACKUP_START_DATE 
 ); 
  
 backupDate 
 . 
 setDate 
 ( 
 BACKUP_START_DATE 
 . 
 getDate 
 () 
  
 + 
  
 i 
 ); 
  
 // Extract just the date portion (YYYY-MM-DD) as string. 
  
 const 
  
 date 
  
 = 
  
 backupDate 
 . 
 toISOString 
 (). 
 substring 
 ( 
 0 
 , 
  
 10 
 ); 
  
 enqueues 
 . 
 push 
 ( 
  
 queue 
 . 
 enqueue 
 ({ 
 date 
 }, 
  
 { 
  
 scheduleDelaySeconds 
 , 
  
 dispatchDeadlineSeconds 
 : 
  
 60 
  
 * 
  
 5 
 , 
  
 // 5 minutes 
  
 uri 
 : 
  
 targetUri 
 , 
  
 }), 
  
 ); 
  
 } 
  
 await 
  
 Promise 
 . 
 all 
 ( 
 enqueues 
 ); 
  
 response 
 . 
 sendStatus 
 ( 
 200 
 ); 
  
 }); 
  
 

Python

  @https_fn 
 . 
 on_request 
 () 
 def 
  
 enqueuebackuptasks 
 ( 
 _ 
 : 
 https_fn 
 . 
 Request 
 ) 
 - 
> https_fn 
 . 
 Response 
 : 
  
 """Adds backup tasks to a Cloud Tasks queue.""" 
 task_queue 
 = 
 functions 
 . 
 task_queue 
 ( 
 "backupapod" 
 ) 
 target_uri 
 = 
 get_function_url 
 ( 
 "backupapod" 
 ) 
 for 
 i 
 in 
 range 
 ( 
 BACKUP_COUNT 
 ): 
 batch 
 = 
 i 
 // 
 HOURLY_BATCH_SIZE 
 # Delay each batch by N hours 
 schedule_delay 
 = 
 timedelta 
 ( 
 hours 
 = 
 batch 
 ) 
 schedule_time 
 = 
 datetime 
 . 
 now 
 () 
 + 
 schedule_delay 
 dispatch_deadline_seconds 
 = 
 60 
 * 
 5 
 # 5 minutes 
 backup_date 
 = 
 BACKUP_START_DATE 
 + 
 timedelta 
 ( 
 days 
 = 
 i 
 ) 
 body 
 = 
 { 
 "data" 
 : 
 { 
 "date" 
 : 
 backup_date 
 . 
 isoformat 
 ()[: 
 10 
 ]}} 
 task_options 
 = 
 functions 
 . 
 TaskOptions 
 ( 
 schedule_time 
 = 
 schedule_time 
 , 
 dispatch_deadline_seconds 
 = 
 dispatch_deadline_seconds 
 , 
 uri 
 = 
 target_uri 
 ) 
 task_queue 
 . 
 enqueue 
 ( 
 body 
 , 
 task_options 
 ) 
 return 
 https_fn 
 . 
 Response 
 ( 
 status 
 = 
 200 
 , 
 response 
 = 
 f 
 "Enqueued 
 { 
 BACKUP_COUNT 
 } 
 tasks" 
 ) 
  
 
  • The sample code tries to spread out execution of tasks by associating a delay of Nth minutes for the Nth task. This translates to triggering ~ 1 task/minute. Note that you can also use scheduleTime (Node.js) or schedule_time (Python) if you want Cloud Tasks to trigger a task at a specific time.

  • The sample code sets the maximum amount of time Cloud Tasks will wait for a task to complete. Cloud Tasks will retry the task following the retry configuration of the queue or until this deadline is reached. In the sample, the queue is configured to retry the task up to 5 times, but the task is automatically cancelled if the whole process (including retry attempts) takes more than 5 minutes.

Troubleshooting

Turn on Cloud Tasks logging

Logs from Cloud Tasks contain useful diagnostic information like the status of the request associated with a task. By default, logs from Cloud Tasks are turned off due to the large volume of logs it can potentially generate on your project. We recommend you turn on the debug logs while you are actively developing and debugging your task queue functions. See Turning on logging .

IAM Permissions

You may see PERMISSION DENIED errors when enqueueing tasks or when Cloud Tasks tries to invoke your task queue functions. Ensure that your project has the following IAM bindings:

 gcloud  
projects  
add-iam-policy-binding  
 $PROJECT_ID 
  
 \ 
  
--member = 
serviceAccount: ${ 
 PROJECT_ID 
 } 
@appspot.gserviceaccount.com  
 \ 
  
--role = 
roles/cloudtasks.enqueuer 
  • The identity used to enqueue tasks to Cloud Tasks needs permission to use the service account associated with a task in Cloud Tasks .

    In the sample, this is the App Engine default service account .

See Google Cloud IAM documentation for instructions on how to add the App Engine default service account as a user of the App Engine default service account.

 gcloud  
functions  
add-iam-policy-binding  
 $FUNCTION_NAME 
  
 \ 
  
--region = 
us-central1  
 \ 
  
--member = 
serviceAccount: ${ 
 PROJECT_ID 
 } 
@appspot.gserviceaccount.com  
 \ 
  
--role = 
roles/cloudfunctions.invoker 
Design a Mobile Site
View Site in Mobile | Classic
Share by: