Mettre des fonctions en file d'attente avec Cloud Tasks


Les fonctions de file d'attente de tâches exploitent Google Cloud Tasks pour permettre à votre application d'exécuter une application qui demande beaucoup de temps, de ressources ou de bande passante les tâches de manière asynchrone, en dehors du flux principal de votre application.

Par exemple, imaginez que vous souhaitiez créer des sauvegardes pour un vaste ensemble d'images qui sont actuellement hébergés sur une API avec une limite de débit. Pour devenir responsable de l'API, vous devez respecter ses limites de débit. De plus, ce type de job de longue durée peut être vulnérable aux échecs en raison des délais avant expiration et les limites de mémoire.

Pour atténuer cette complexité, vous pouvez écrire une fonction de file d'attente de tâches qui définit des options de tâche telles que scheduleTime et dispatchDeadline, puis transmet au dans une file d'attente dans Cloud Tasks. Cloud Tasks est conçu spécifiquement pour garantir un contrôle efficace des embouteillages stratégies de nouvelle tentative pour ce type d'opérations.

Le SDK Firebase pour Cloud Functions for Firebase version 3.20.1 ou ultérieure fonctionne avec Firebase Admin SDK v10.2.0 ou version ultérieure pour prendre en charge les fonctions de file d'attente de tâches.

L'utilisation de fonctions de file d'attente de tâches avec Firebase peut entraîner des frais de traitement Cloud Tasks. Voir Tarifs de Cloud Tasks pour en savoir plus.

Créer des fonctions de file d'attente de tâches

Pour utiliser les fonctions de file d'attente de tâches, procédez comme suit:

  1. Écrivez une fonction de file d'attente de tâches à l'aide du SDK Firebase pour Cloud Functions.
  2. Testez votre fonction en la déclenchant avec une requête HTTP.
  3. Déployez votre fonction avec la CLI Firebase. Lors du déploiement de votre tâche file d'attente pour la première fois, la CLI crée file d'attente de tâches dans Cloud Tasks avec des options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.
  4. Ajoutez des tâches à la file d'attente de tâches que vous venez de créer, en transmettant des paramètres pour configurer un calendrier d'exécution si nécessaire. Pour ce faire, vous pouvez écrire le code en utilisant Admin SDK et en le déployant dans Cloud Functions for Firebase.

Écrire des fonctions de file d'attente des tâches

Les exemples de code présentés dans cette section sont basés sur une application qui définit un service qui sauvegarde toutes les images Image Astronomie du jour. Pour commencer, importez les modules requis:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utiliser onTaskDispatched ou on_task_dispatched pour les fonctions de file d'attente de tâches. Lorsque vous écrivez une fonction de file d'attente de tâches, vous pouvez définir la configuration de la nouvelle tentative et de la limitation de débit par file d'attente.

Configurer les fonctions de la file d'attente de tâches

Les fonctions de file d'attente de tâches sont fournies avec un ensemble performant de paramètres de configuration pour contrôler avec précision les limites de débit et le comportement de nouvelle tentative d'une file d'attente de tâches:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : chaque tâche de la file d'attente de tâches est automatiquement réessayée jusqu'à cinq fois. Cela permet de limiter les erreurs temporaires, telles que les erreurs ou l'interruption temporaire d'un service externe dépendant.

  • retryConfig.minBackoffSeconds=60: chaque tâche est relancée pendant au moins 60 secondes à l'exception de chaque tentative. Ainsi, vous disposez d'un espace de stockage important entre chaque tentative. Nous ne nous précipitons donc pas pour épuiser trop rapidement les 5 tentatives de relance.

  • rateLimits.maxConcurrentDispatch=6: six tâches au maximum sont envoyées à la fois à un moment donné. Cela permet de garantir un flux constant de requêtes au niveau et permet de réduire le nombre d'instances actives et de démarrages à froid.

Tester les fonctions de la file d'attente de tâches

Dans la plupart des cas, l'émulateur Cloud Functions est le meilleur moyen de tester les tâches les fonctions de file d'attente. Consultez la documentation de la suite d'émulateurs pour savoir comment : instrumenter votre application pour l'émulation des fonctions de file d'attente de tâches.

De plus, les fonctions_sdk de la file d'attente de tâches sont exposées de manière simple Fonctions HTTP dans le Firebase Local Emulator Suite. Vous pouvez tester une fonction de tâche émulée en envoyant une requête HTTP POST avec une charge utile de données JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Déployer des fonctions de file d'attente de tâches

Déployez la fonction de file d'attente de tâches à l'aide de la CLI Firebase:

$ firebase deploy --only functions:backupapod

Lors du déploiement d'une fonction de file d'attente de tâches pour la première fois, la CLI crée un file d'attente de tâches dans Cloud Tasks avec des options (limitation du débit et nouvelle tentative) spécifié dans votre code source.

Si vous rencontrez des erreurs d'autorisation lors du déploiement de fonctions, assurez-vous que les rôles IAM appropriés sont attribués à l'utilisateur qui exécute les commandes de déploiement.

Fonctions de file d'attente de tâches

Les fonctions de file d'attente de tâches peuvent être mises en file d'attente dans Cloud Tasks à partir d'un environnement serveur approuvé tel que Cloud Functions for Firebase à l'aide de Firebase Admin SDK pour Node.js ou des bibliothèques Google Cloud pour Python. Si vous ne connaissez pas les Admin SDK, consultez Pour commencer, ajoutez Firebase à un serveur.

Un flux type crée une tâche et la met en file d'attente Cloud Tasks, puis définit la configuration de la tâche:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • L'exemple de code tente de répartir l'exécution des tâches en associant un délai de N minutes à la N ième tâche. Ce se traduit par un déclenchement d'environ 1 tâche/minute. Notez que vous pouvez également utiliser scheduleTime (Node.js) ou schedule_time (Python) si vous le souhaitez Cloud Tasks pour déclencher une tâche à un moment précis.

  • L'exemple de code définit la durée maximale Cloud Tasks va patienter pour une tâche. Cloud Tasks va réessayer la tâche après la requête la configuration de la file d'attente ou jusqu'à ce que ce délai soit atteint. Dans l'exemple, la file d'attente est configurée pour retenter la tâche jusqu'à 5 fois, mais la tâche est sont automatiquement annulées si l'ensemble du processus (y compris les nouvelles tentatives) prend plus de cinq minutes.

Récupérer et inclure l'URI cible

En raison de la façon dont Cloud Tasks crée des jetons d'authentification pour s'authentifier aux fonctions de file d'attente de tâches sous-jacentes, vous devez spécifier URL Cloud Run de la fonction lors de la mise en file d'attente des tâches. Mer nous vous recommandons de récupérer l'URL de votre fonction de manière programmatique, comme indiqué ci-dessous:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Dépannage

Cloud Tasks">Activer la journalisation Cloud Tasks

Les journaux de Cloud Tasks contiennent des informations de diagnostic utiles telles que le état de la requête associée à une tâche. Par défaut, les journaux de Cloud Tasks sont désactivés en raison du grand volume de journaux qu'ils peuvent potentiellement générer sur votre projet. Nous vous recommandons d'activer les journaux de débogage pendant que vous développez et déboguez activement vos fonctions de file d'attente de tâches. Voir L'activation Logging.

Autorisations IAM

Des erreurs PERMISSION DENIED peuvent s'afficher lors de la mise en file d'attente de tâches ou lorsque Cloud Tasks tente d'appeler vos fonctions de file d'attente de tâches. Vérifiez que vos comporte les liaisons IAM suivantes:

  • L'identité utilisée pour mettre les tâches en file d'attente dans les besoins de Cloud Tasks Autorisation IAM cloudtasks.tasks.create.

    Dans l'exemple, il s'agit du compte de service par défaut App Engine.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identité utilisée pour mettre des tâches en file d'attente dans Cloud Tasks a besoin d'une autorisation pour utiliser le compte de service associé à une tâche dans Cloud Tasks.

    Dans l'exemple, il s'agit du compte de service par défaut App Engine.

Consultez la documentation Google Cloud IAM pour savoir comment ajouter le compte de service par défaut App Engine en tant qu'utilisateur du compte de service par défaut App Engine.

  • L'identité utilisée pour déclencher la fonction de file d'attente de tâches a besoin Autorisation cloudfunctions.functions.invoke.

    Dans l'exemple, il s'agit du compte de service par défaut App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker