Funktionen mit Cloud Tasks in die Warteschlange stellen

Aufgabenwarteschlangen-Funktionen nutzen Google Cloud Tasks , damit Ihre App zeitaufwendige, ressourcenintensive oder bandbreitenbeschränkte Aufgaben asynchron außerhalb des Hauptanwendungsablaufs ausführen kann.

Angenommen, Sie möchten Back-ups einer großen Anzahl von Bilddateien erstellen, die derzeit auf einer API mit einer Ratenbegrenzung gehostet werden. Um diese API verantwortungsvoll zu nutzen, müssen Sie die Ratenbegrenzungen einhalten. Außerdem kann es bei dieser Art von lang andauernden Jobs aufgrund von Zeitüberschreitungen und Arbeitsspeicherlimits zu Fehlern kommen.

Um diese Komplexität zu verringern, können Sie eine Aufgabenwarteschlangen-Funktion schreiben, die grundlegende Aufgabenoptionen wie scheduleTime und dispatchDeadline festlegt und die Funktion dann an eine Warteschlange in Cloud Tasks übergibt. Die Cloud Tasks Umgebung wurde speziell entwickelt, um eine effektive Überlastungskontrolle und Wiederholungsrichtlinien für diese Arten von Vorgängen zu gewährleisten.

Das Firebase SDK for Cloud Functions for Firebase v3.20.1 und höher ist mit Firebase Admin SDK v10.2.0 und höher kompatibel und unterstützt Aufgabenwarteschlangen-Funktionen.

Die Verwendung von Aufgabenwarteschlangen-Funktionen mit Firebase kann zu Gebühren für die Cloud Tasks Verarbeitung führen. Weitere Informationen finden Sie unter Cloud Tasks Preise.

Aufgabenwarteschlangen-Funktionen erstellen

So verwenden Sie Aufgabenwarteschlangen-Funktionen:

  1. Schreiben Sie eine Aufgabenwarteschlangen-Funktion mit dem Firebase SDK für Cloud Functions.
  2. Testen Sie die Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
  3. Stellen Sie die Funktion mit der Firebase CLI bereit. Wenn Sie Ihre Aufgaben warteschlangen-Funktion zum ersten Mal bereitstellen, erstellt die CLI eine Aufgabenwarteschlange in Cloud Tasks mit den in Ihrem Quellcode angegebenen Optionen (Ratenbegrenzung und Wiederholung).
  4. Fügen Sie der neu erstellten Aufgabenwarteschlange Aufgaben hinzu und übergeben Sie bei Bedarf Parameter, um einen Ausführungszeitplan einzurichten. Dazu schreiben Sie den Code mit dem Admin SDK und stellen ihn in Cloud Functions for Firebase bereit.

Aufgabenwarteschlangen-Funktionen schreiben

Die Codebeispiele in diesem Abschnitt basieren auf einer App, die einen Dienst einrichtet, der alle Bilder von „Astronomy Picture of the Day“ der NASA sichert. Importieren Sie zuerst die erforderlichen Module:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Verwenden Sie onTaskDispatched oder on_task_dispatched für Aufgabenwarteschlangen-Funktionen. Beim Schreiben einer Aufgabenwarteschlangen-Funktion können Sie die Konfiguration für Wiederholungen und Ratenbegrenzung pro Warteschlange festlegen.

Aufgabenwarteschlangen-Funktionen konfigurieren

Aufgabenwarteschlangen-Funktionen bieten eine Reihe von Konfigurationseinstellungen, mit denen Sie Ratenbegrenzungen und das Wiederholungsverhalten einer Aufgabenwarteschlange genau steuern können:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünf Mal wiederholt. So können Sie vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen externen Dienstes vermeiden.

  • retryConfig.minBackoffSeconds=60: Jede Aufgabe wird mindestens 60 Sekunden nach jedem Versuch wiederholt. So haben Sie zwischen den Versuchen einen großen Puffer, damit die fünf Wiederholungsversuche nicht zu schnell aufgebraucht werden.

  • rateLimits.maxConcurrentDispatch=6: Es werden maximal sechs Aufgaben gleichzeitig gesendet. So wird ein stetiger Stream von Anfragen an die zugrunde liegende Funktion gewährleistet und die Anzahl der aktiven Instanzen und Kaltstarts reduziert.

Aufgabenwarteschlangen-Funktionen testen

In den meisten Fällen ist der Cloud Functions Emulator die beste Möglichkeit, Aufgaben warteschlangen-Funktionen zu testen. In der Dokumentation zur Emulator Suite erfahren Sie, wie Sie Ihre App für die Emulation von Aufgabenwarteschlangen-Funktionen instrumentieren.

Außerdem werden Aufgabenwarteschlangen-Funktionen in der Firebase Local Emulator Suite als einfache HTTP-Funktionen verfügbar gemacht. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage mit einer JSON-Datennutzlast senden:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Aufgabenwarteschlangen-Funktionen bereitstellen

Stellen Sie Aufgabenwarteschlangen-Funktionen mit der Firebase CLI bereit:

$ firebase deploy --only functions:backupapod

Wenn Sie eine Aufgabenwarteschlangen-Funktion zum ersten Mal bereitstellen, erstellt die CLI eine Aufgabenwarteschlange in Cloud Tasks mit den in Ihrem Quellcode angegebenen Optionen (Ratenbegrenzung und Wiederholung).

Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, prüfen Sie, ob dem Nutzer, der die Bereitstellungsbefehle ausführt, die entsprechenden IAM-Rollen zugewiesen sind.

Aufgabenwarteschlangen-Funktionen in die Warteschlange stellen

Aufgabenwarteschlangen-Funktionen können in Cloud Tasks aus einer vertrauenswürdigen Serverumgebung wie Cloud Functions for Firebase mit dem Firebase Admin SDK für Node.js oder den Google Cloud-Clientbibliotheken für Python in die Warteschlange gestellt werden. Wenn Sie noch keine Erfahrung mit den Admin SDKs haben, lesen Sie den Artikel Firebase zu einem Server hinzufügen.

Ein typischer Ablauf erstellt eine neue Aufgabe, stellt sie in Cloud Tasks, und legt die Konfiguration für die Aufgabe fest:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(
            schedule_time=schedule_time,
            dispatch_deadline_seconds=dispatch_deadline_seconds,
            uri=target_uri,
        )
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Der Beispielcode versucht, die Ausführung von Aufgaben zu verteilen, indem er für die N. Aufgabe eine Verzögerung von N Minuten festlegt. Das bedeutet, dass etwa eine Aufgabe pro Minute ausgelöst wird. Sie können auch scheduleTime (Node.js) oder schedule_time (Python) verwenden, wenn Sie Cloud Tasks eine Aufgabe zu einem bestimmten Zeitpunkt auslösen möchten.

  • Der Beispielcode legt die maximale Zeit fest, Cloud Tasks die gewartet wird, bis eine Aufgabe abgeschlossen ist. Cloud Tasks wiederholt die Aufgabe gemäß der Wiederholungsk0nfiguration der Warteschlange oder bis diese Frist erreicht ist. Im Beispiel ist die Warteschlange so konfiguriert, dass die Aufgabe bis zu fünf Mal wiederholt wird. Die Aufgabe wird jedoch automatisch abgebrochen, wenn der gesamte Vorgang (einschließlich Wiederholungsversuchen) länger als fünf Minuten dauert.

Admin SDK

Fehlerbehebung

Cloud Tasks Protokollierung aktivieren

Protokolle von Cloud Tasks enthalten nützliche Diagnoseinformationen wie den Status der mit einer Aufgabe verknüpften Anfrage. Standardmäßig sind Protokolle von Cloud Tasks deaktiviert, da sie potenziell eine große Menge an Protokollen für Ihr Projekt generieren können. Wir empfehlen, die Debug-Protokolle zu aktivieren, während Sie Ihre Aufgabenwarteschlangen-Funktionen aktiv entwickeln und debuggen. Weitere Informationen finden Sie unter Protokollierung aktivieren.

IAM-Berechtigungen

Beim In-die-Warteschlange-Stellen von Aufgaben oder wenn Cloud Tasks versucht, Ihre Aufgabenwarteschlangen-Funktionen aufzurufen, können PERMISSION DENIED-Fehler auftreten. Prüfen Sie, ob Ihr Projekt die folgenden IAM-Bindungen hat:

  • Die Identität, die zum In-die-Warteschlange-Stellen von Aufgaben in Cloud Tasks verwendet wird, benötigt cloudtasks.tasks.create IAM-Berechtigung.

    Im Beispiel ist dies das App Engine Standarddienstkonto.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Die Identität, die zum In-die-Warteschlange-Stellen von Aufgaben in Cloud Tasks verwendet wird, benötigt die Berechtigung zur Verwendung des Dienstkontos, das mit einer Aufgabe in Cloud Tasks verknüpft ist.

    Im Beispiel ist dies das App Engine Standarddienstkonto.

In der Google Cloud IAM-Dokumentation finden Sie eine Anleitung zum Hinzufügen des App Engine Standarddienstkontos als Nutzer des App Engine Standarddienstkontos.

  • Die Identität, die zum Auslösen der Aufgabenwarteschlangen-Funktion verwendet wird, benötigt die Berechtigung cloudfunctions.functions.invoke.

    Im Beispiel ist dies das App Engine Standarddienstkonto.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker