Mettre des fonctions en file d'attente avec Cloud Tasks

Les fonctions de file d'attente de tâches tirent parti de Google Cloud Tasks pour aider votre application à exécuter des tâches longues, gourmandes en ressources ou limitées en bande passante de manière asynchrone, en dehors du flux principal de votre application.

Imaginons que vous souhaitiez créer des sauvegardes d'un grand ensemble de fichiers image actuellement hébergés sur une API avec une limite de débit. Pour être un consommateur responsable de cette API, vous devez respecter ses limites de débit. De plus, ce type de tâche de longue durée peut être vulnérable aux échecs en raison des délais avant expiration et des limites de mémoire.

Pour atténuer cette complexité, vous pouvez écrire une fonction de file d'attente de tâches qui définit des options de tâche de base telles que scheduleTime, et dispatchDeadline, puis transmet la fonction à une file d'attente dans Cloud Tasks. L'Cloud Tasks environnement est conçu spécifiquement pour assurer un contrôle efficace de la congestion et des règles de nouvelle tentative pour ces types d'opérations.

Le SDK Firebase pour Cloud Functions for Firebase v3.20.1 et versions ultérieures interagit avec Firebase Admin SDK v10.2.0 et versions ultérieures pour prendre en charge les fonctions de file d'attente de tâches.

L'utilisation de fonctions de file d'attente de tâches avec Firebase peut entraîner des frais de Cloud Tasks traitement. Pour en savoir plus, consultez Cloud Tasks tarifs.

Créer des fonctions de file d'attente de tâches

Pour utiliser les fonctions de file d'attente de tâches, suivez ce workflow :

  1. Écrivez une fonction de file d'attente de tâches à l'aide du Firebase SDK pour Cloud Functions.
  2. Testez votre fonction en la déclenchant avec une requête HTTP.
  3. Déployez votre fonction avec l'interface de ligne de commande Firebase. Lorsque vous déployez votre fonction de file d'attente de tâches pour la première fois, l'interface de ligne de commande crée une file d'attente de tâches dans Cloud Tasks avec les options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.
  4. Ajoutez des tâches à la file d'attente de tâches nouvellement créée, en transmettant des paramètres pour configurer une planification d'exécution si nécessaire. Pour ce faire, écrivez le code à l'aide du Admin SDK et déployez-le dans Cloud Functions for Firebase.

Écrire des fonctions de file d'attente de tâches

Les exemples de code de cette section sont basés sur une application qui configure un service qui sauvegarde toutes les images de la photo d'astronomie du jour de la NASA . Pour commencer, importez les modules requis :

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilisez onTaskDispatched ou on_task_dispatched pour les fonctions de file d'attente de tâches. Lorsque vous écrivez une fonction de file d'attente de tâches, vous pouvez définir une configuration de nouvelle tentative et de limitation du débit par file d'attente.

Configurer les fonctions de file d'attente de tâches

Les fonctions de file d'attente de tâches sont fournies avec un ensemble puissant de paramètres de configuration permettant de contrôler précisément les limites de débit et le comportement de nouvelle tentative d'une file d'attente de tâches :

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: chaque tâche de la file d'attente de tâches fait automatiquement l'objet de cinq tentatives au maximum. Cela permet d'atténuer les erreurs temporaires telles que les erreurs réseau ou les interruptions temporaires du service d'un service externe dépendant.

  • retryConfig.minBackoffSeconds=60: chaque tâche fait l'objet d'une nouvelle tentative au moins 60 secondes après chaque tentative. Cela fournit un tampon important entre chaque tentative afin de ne pas épuiser trop rapidement les cinq tentatives.

  • rateLimits.maxConcurrentDispatch=6: au maximum six tâches sont distribuées à un moment donné. Cela permet de garantir un flux constant de requêtes vers la fonction sous-jacente et de réduire le nombre d'instances actives et de démarrages à froid.

Tester les fonctions de file d'attente de tâches

Dans la plupart des cas, l'émulateur Cloud Functions est le meilleur moyen de tester les fonctions de file d'attente de tâches. Consultez la documentation de la suite d'émulateurs pour découvrir comment instrumenter votre application pour l'émulation des fonctions de file d'attente de tâches.

De plus, les fonctions de file d'attente de tâches sont exposées en tant que fonctions HTTP simples dans le Firebase Local Emulator Suite. Vous pouvez tester une fonction de tâche émulée en envoyant une requête HTTP POST avec une charge utile de données JSON :

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Déployer des fonctions de file d'attente de tâches

Déployez la fonction de file d'attente de tâches à l'aide de l'interface de ligne de commande Firebase :

$ firebase deploy --only functions:backupapod

Lorsque vous déployez une fonction de file d'attente de tâches pour la première fois, l'interface de ligne de commande crée une file d'attente de tâches dans Cloud Tasks avec les options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.

Si vous rencontrez des erreurs d'autorisation lors du déploiement de fonctions, assurez-vous que les rôles IAM appropriés sont attribués à l'utilisateur qui exécute les commandes de déploiement.

Mettre en file d'attente des fonctions de file d'attente de tâches

Les fonctions de file d'attente de tâches peuvent être mises en file d'attente dans Cloud Tasks à partir d'un environnement de serveur de confiance tel que Cloud Functions for Firebase à l'aide du Firebase Admin SDK pour Node.js ou des bibliothèques Google Cloud pour Python. Si vous n'avez jamais utilisé les Admin SDKs, consultez Ajouter Firebase à un serveur pour commencer.

Un flux typique crée une tâche, la met en file d'attente dans Cloud Tasks, et définit la configuration de la tâche :

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • L'exemple de code tente de répartir l'exécution des tâches en associant un délai de N minutes à la N-ième tâche. Cela se traduit par le déclenchement d'environ une tâche par minute. Notez que vous pouvez également utiliser scheduleTime (Node.js) ou schedule_time (Python) si vous souhaitez Cloud Tasks déclencher une tâche à un moment spécifique.

  • L'exemple de code définit la durée maximale pendant laquelle Cloud Tasks attendra la fin d'une tâche. Cloud Tasks tentera de nouveau d'exécuter la tâche en suivant la configuration de nouvelle tentative de la file d'attente ou jusqu'à ce que ce délai soit atteint. Dans l'exemple, la file d'attente est configurée pour réessayer d'exécuter la tâche jusqu'à cinq fois, mais la tâche est automatiquement annulée si l'ensemble du processus (y compris les tentatives) prend plus de cinq minutes.

Dépannage

Activer la journalisation Cloud Tasks

Les journaux de Cloud Tasks contiennent des informations de diagnostic utiles, telles que l' état de la requête associée à une tâche. Par défaut, les journaux de Cloud Tasks sont désactivés en raison du volume important de journaux qu'ils peuvent potentiellement générer dans votre projet. Nous vous recommandons d'activer les journaux de débogage lorsque vous développez et déboguez activement vos fonctions de file d'attente de tâches. Consultez Activer la journalisation.

Autorisations IAM

Des erreurs PERMISSION DENIED peuvent s'afficher lorsque vous mettez en file d'attente des tâches ou lorsque Cloud Tasks tente d'appeler vos fonctions de file d'attente de tâches. Assurez-vous que votre projet dispose des liaisons IAM suivantes :

  • L'identité utilisée pour mettre en file d'attente des tâches dans Cloud Tasks doit disposer de cloudtasks.tasks.create l'autorisation IAM.

    Dans l'exemple, il s'agit du App Engine compte de service par défaut

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identité utilisée pour mettre en file d'attente des tâches dans Cloud Tasks doit être autorisée à utiliser le compte de service associé à une tâche dans Cloud Tasks.

    Dans l'exemple, il s'agit du App Engine compte de service par défaut.

Consultez la documentation Google Cloud IAM pour savoir comment ajouter le compte de service par défaut App Engine en tant qu'utilisateur du compte de service par défaut App Engine.

  • L'identité utilisée pour déclencher la fonction de file d'attente de tâches doit disposer de l'autorisation cloudfunctions.functions.invoke.

    Dans l'exemple, il s'agit du App Engine compte de service par défaut

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker