Fonctions de mise en file d'attente avec Cloud Tasks


Les fonctions de file d'attente de tâches tirent parti de Google Cloud Tasks pour aider votre application à exécuter de manière asynchrone des tâches chronophages, gourmandes en ressources ou limitées en bande passante, en dehors de votre flux d'application principal.

Par exemple, imaginez que vous souhaitiez créer des sauvegardes d'un grand nombre de fichiers image actuellement hébergés sur une API avec une limite de débit. Afin d'être un consommateur responsable de cette API, vous devez respecter leurs limites de débit. De plus, ce type de travail de longue durée pourrait être vulnérable aux échecs en raison de délais d'attente et de limites de mémoire.

Pour atténuer cette complexité, vous pouvez écrire une fonction de file d'attente de tâches qui définit les options de tâche de base telles que scheduleTime et dispatchDeadline , puis transmettre la fonction à une file d'attente dans Cloud Tasks. L'environnement Cloud Tasks est spécifiquement conçu pour garantir un contrôle efficace de la congestion et des politiques de nouvelle tentative pour ce type d'opérations.

Le SDK Firebase pour Cloud Functions pour Firebase v3.20.1 et versions ultérieures interagit avec le SDK Firebase Admin v10.2.0 et versions ultérieures pour prendre en charge les fonctions de file d'attente des tâches.

L'utilisation des fonctions de file d'attente des tâches avec Firebase peut entraîner des frais pour le traitement des tâches Cloud. Consultez la tarification de Cloud Tasks pour plus d’informations.

Créer des fonctions de file d'attente de tâches

Pour utiliser les fonctions de file d'attente de tâches, suivez ce workflow :

  1. Écrivez une fonction de file d'attente de tâches à l'aide du SDK Firebase pour Cloud Functions.
  2. Testez votre fonction en la déclenchant avec une requête HTTP.
  3. Déployez votre fonction avec la CLI Firebase. Lors du premier déploiement de votre fonction de file d'attente de tâches, la CLI créera une file d'attente de tâches dans Cloud Tasks avec des options (limitation de débit et nouvelle tentative) spécifiées dans votre code source.
  4. Ajoutez des tâches à la file d'attente des tâches nouvellement créée, en transmettant des paramètres pour configurer un calendrier d'exécution si nécessaire. Vous pouvez y parvenir en écrivant le code à l'aide du SDK Admin et en le déployant sur Cloud Functions pour Firebase.

Écrire des fonctions de file d'attente de tâches

Les exemples de code de cette section sont basés sur une application qui configure un service qui sauvegarde toutes les images de Astronomy Picture of the Day de la NASA. Pour commencer, importez les modules requis :

Noeud.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilisez onTaskDispatched ou on_task_dispatched pour les fonctions de file d'attente de tâches. Lors de l'écriture d'une fonction de file d'attente de tâches, vous pouvez définir une nouvelle tentative par file d'attente et une configuration de limitation de débit.

Configurer les fonctions de file d'attente des tâches

Les fonctions de file d'attente de tâches sont accompagnées d'un ensemble puissant de paramètres de configuration permettant de contrôler avec précision les limites de débit et le comportement des nouvelles tentatives d'une file d'attente de tâches :

Noeud.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : Chaque tâche de la file d'attente des tâches est automatiquement réessayée jusqu'à 5 fois. Cela permet d’atténuer les erreurs passagères telles que les erreurs réseau ou l’interruption temporaire d’un service externe dépendant.

  • retryConfig.minBackoffSeconds=60 : chaque tâche est réessayée à au moins 60 secondes d'intervalle entre chaque tentative. Cela fournit un grand tampon entre chaque tentative afin que nous ne nous précipitions pas pour épuiser trop rapidement les 5 nouvelles tentatives.

  • rateLimits.maxConcurrentDispatch=6 : Au maximum 6 tâches sont distribuées à un moment donné. Cela permet de garantir un flux constant de requêtes vers la fonction sous-jacente et de réduire le nombre d'instances actives et de démarrages à froid.

Fonctions de file d’attente de tâches de test

Les fonctions de file d'attente de tâches de Firebase Local Emulator Suite sont exposées sous forme de simples fonctions HTTP. Vous pouvez tester une fonction de tâche émulée en envoyant une requête HTTP POST avec une charge utile de données JSON :

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Déployer les fonctions de file d'attente de tâches

Déployez la fonction de file d'attente des tâches à l'aide de la CLI Firebase :

$ firebase deploy --only functions:backupapod

Lors du premier déploiement d'une fonction de file d'attente de tâches, la CLI crée une file d'attente de tâches dans Cloud Tasks avec des options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.

Si vous rencontrez des erreurs d'autorisations lors du déploiement de fonctions, assurez-vous que les rôles IAM appropriés sont attribués à l'utilisateur exécutant les commandes de déploiement.

Mettre en file d'attente les fonctions de file d'attente des tâches

Les fonctions de file d'attente de tâches peuvent être mises en file d'attente dans Cloud Tasks à partir d'un environnement de serveur de confiance tel que Cloud Functions pour Firebase à l'aide du SDK d'administration Firebase pour Node.js ou des bibliothèques Google Cloud pour Python. Si vous débutez avec les SDK d'administration, consultez Ajouter Firebase à un serveur pour commencer.

Un flux typique crée une nouvelle tâche, la met en file d'attente dans Cloud Tasks et définit la configuration de la tâche :

Noeud.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • L'exemple de code tente d'étaler l'exécution des tâches en associant un délai de Nième minutes à la Nième tâche. Cela se traduit par le déclenchement d’environ 1 tâche/minute. Notez que vous pouvez également utiliser scheduleTime (Node.js) ou schedule_time (Python) si vous souhaitez que Cloud Tasks déclenche une tâche à une heure précise.

  • L'exemple de code définit la durée maximale pendant laquelle Cloud Tasks attendra la fin d'une tâche. Cloud Tasks réessayera la tâche après la configuration des nouvelles tentatives de la file d'attente ou jusqu'à ce que cette date limite soit atteinte. Dans l'exemple, la file d'attente est configurée pour réessayer la tâche jusqu'à 5 fois, mais la tâche est automatiquement annulée si l'ensemble du processus (y compris les nouvelles tentatives) prend plus de 5 minutes.

Récupérer et inclure l'URI cible

En raison de la manière dont Cloud Tasks crée des jetons d'authentification pour authentifier les requêtes adressées aux fonctions de file d'attente des tâches sous-jacentes, vous devez spécifier l'URL Cloud Run de la fonction lors de la mise en file d'attente des tâches. Nous vous recommandons de récupérer par programme l'URL de votre fonction, comme illustré ci-dessous :

Noeud.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Dépannage

Activer la journalisation Cloud Tasks

Les journaux de Cloud Tasks contiennent des informations de diagnostic utiles telles que l'état de la demande associée à une tâche. Par défaut, les journaux de Cloud Tasks sont désactivés en raison du grand volume de journaux qu'il peut potentiellement générer sur votre projet. Nous vous recommandons d'activer les journaux de débogage pendant que vous développez et déboguez activement vos fonctions de file d'attente de tâches. Voir Activer la journalisation .

Autorisations IAM

Vous pouvez voir des erreurs PERMISSION DENIED lors de la mise en file d'attente des tâches ou lorsque Cloud Tasks tente d'appeler vos fonctions de file d'attente de tâches. Assurez-vous que votre projet dispose des liaisons IAM suivantes :

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identité utilisée pour mettre les tâches en file d'attente dans Cloud Tasks doit être autorisée à utiliser le compte de service associé à une tâche dans Cloud Tasks.

    Dans l'exemple, il s'agit du compte de service App Engine par défaut .

Consultez la documentation Google Cloud IAM pour savoir comment ajouter le compte de service par défaut App Engine en tant qu'utilisateur du compte de service par défaut App Engine.

  • L'identité utilisée pour déclencher la fonction de file d'attente des tâches nécessite l'autorisation cloudfunctions.functions.invoke .

    Dans l'exemple, il s'agit du compte de service App Engine par défaut.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker