Funktionen mit Cloud Tasks in die Warteschlange stellen


Aufgabenwarteschlangen nutzen Google Cloud Tasks, um zeitaufwendige, ressourcenintensive oder bandbreitenbegrenzte Aufgaben asynchron außerhalb des Hauptanwendungsablaufs auszuführen.

Angenommen, Sie möchten Sicherungen einer großen Anzahl von Bilddateien erstellen, die derzeit in einer API mit einem Ratelimit gehostet werden. Wenn Sie diese API verantwortungsvoll nutzen möchten, müssen Sie die Ratenlimits einhalten. Außerdem sind diese Arten von lang laufenden Jobs aufgrund von Zeitüberschreitungen und Arbeitsspeicherlimits anfällig für Fehler.

Um diese Komplexität zu verringern, können Sie eine Task-Queue-Funktion schreiben, die grundlegende Aufgabenoptionen wie scheduleTime und dispatchDeadline festlegt und die Funktion dann an eine Queue in Cloud Tasks weitergibt. Die Cloud Tasks-Umgebung wurde speziell entwickelt, um eine effektive Überlastungskontrolle und Wiederholungsrichtlinien für diese Arten von Vorgängen zu gewährleisten.

Das Firebase SDK für Cloud Functions for Firebase ab Version 3.20.1 ist mit Firebase Admin SDK ab Version 10.2.0 kompatibel, um Aufgabenwarteschlangenfunktionen zu unterstützen.

Die Verwendung von TaskQueue-Funktionen mit Firebase kann zu Kosten für die Cloud Tasks-Verarbeitung führen. Weitere Informationen finden Sie unter Cloud Tasks-Preise.

Funktionen für Aufgabenwarteschlangen erstellen

Gehen Sie folgendermaßen vor, um Funktionen für Aufgabenwarteschlangen zu verwenden:

  1. Eine Task-Warteschlangenfunktion mit dem Firebase SDK für Cloud Functions schreiben
  2. Testen Sie die Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
  3. Stellen Sie die Funktion mit der Firebase-Befehlszeile bereit. Wenn Sie die Aufgabenwarteschlangenfunktion zum ersten Mal bereitstellen, erstellt die Befehlszeile in Cloud Tasks eine Aufgabenwarteschlange mit Optionen (Ratenbegrenzung und Wiederholungsversuch), die im Quellcode angegeben sind.
  4. Fügen Sie der neu erstellten Aufgabenwarteschlange Aufgaben hinzu und geben Sie bei Bedarf Parameter für die Einrichtung eines Ausführungszeitplans an. Sie können dies erreichen, indem Sie den Code mit Admin SDK schreiben und auf Cloud Functions for Firebase bereitstellen.

Funktionen für Aufgabenwarteschlangen schreiben

Codebeispiele in diesem Abschnitt basieren auf einer App, die einen Dienst einrichtet, der alle Bilder von Astronomy Picture of the Day der NASA sichert. Importieren Sie zuerst die erforderlichen Module:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Verwenden Sie onTaskDispatched oder on_task_dispatched für Funktionen der Aufgabenwarteschlange. Beim Erstellen einer Task-Warteschlangenfunktion können Sie die Wiederholungs- und Ratenbegrenzung pro Warteschlange konfigurieren.

Funktionen für Aufgabenwarteschlangen konfigurieren

Aufgabenwarteschlangen-Funktionen umfassen leistungsstarke Konfigurationseinstellungen, mit denen Sie Ratenbegrenzungen und Wiederholungsverhalten einer Aufgabenwarteschlange genau steuern können:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünfmal wiederholt. So lassen sich vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen externen Dienstes abmildern.

  • retryConfig.minBackoffSeconds=60: Bei jeder Aufgabe wird mindestens 60 Sekunden nach dem vorherigen Versuch ein neuer Versuch unternommen. So haben wir zwischen den einzelnen Versuchen einen großen Puffer, sodass wir die fünf Wiederholungsversuche nicht zu schnell aufbrauchen.

  • rateLimits.maxConcurrentDispatch=6: Es werden jeweils maximal 6 Aufgaben gesendet. So wird ein konstanter Strom von Anfragen an die zugrunde liegende Funktion sichergestellt und die Anzahl der aktiven Instanzen und Kaltstarts reduziert.

Funktionen der Aufgabenwarteschlange testen

In den meisten Fällen eignet sich der Cloud Functions-Emulator am besten, um Funktionen für Aufgabenwarteschlangen zu testen. In der Emulator Suite-Dokumentation erfahren Sie, wie Sie Ihre Anwendung für die Emulation von Aufgabenwarteschlangenfunktionen instrumentieren.

Darüber hinaus wird „functions_sdk“ der Aufgabenwarteschlange als einfache HTTP-Funktionen im Firebase Local Emulator Suite bereitgestellt. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage mit einer JSON-Datennutzlast senden:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Aufgabenwarteschlangenfunktionen bereitstellen

Task-Queue-Funktion mit der Firebase-Befehlszeile bereitstellen:

$ firebase deploy --only functions:backupapod

Wenn Sie eine Aufgabenwarteschlangenfunktion zum ersten Mal bereitstellen, erstellt die Befehlszeile in Cloud Tasks eine Aufgabenwarteschlange mit Optionen (Ratenbegrenzung und Wiederholung), die im Quellcode angegeben sind.

Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, prüfen Sie, ob dem Nutzer, der die Bereitstellungsbefehle ausführt, die entsprechenden IAM-Rollen zugewiesen sind.

Aufgabenwarteschlangenfunktionen in die Warteschlange stellen

Aufgabenwarteschlangenfunktionen können in Cloud Tasks aus einer vertrauenswürdigen Serverumgebung wie Cloud Functions for Firebase mithilfe der Firebase Admin SDK für Node.js oder der Google Cloud-Bibliotheken für Python in die Warteschlange gestellt werden. Wenn Sie die Admin SDK zum ersten Mal verwenden, lesen Sie Firebase zu einem Server hinzufügen.

Bei einem typischen Ablauf wird eine neue Aufgabe erstellt, in Cloud Tasks eingereiht und die Konfiguration für die Aufgabe festgelegt:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Im Beispielcode wird versucht, die Ausführung von Aufgaben zu verteilen, indem der n-ten Aufgabe eine Verzögerung von n Minuten zugewiesen wird. Dies entspricht etwa 1 Aufgabe/Minute. Sie können auch scheduleTime (Node.js) oder schedule_time (Python) verwenden, wenn Cloud Tasks eine Aufgabe zu einer bestimmten Zeit auslösen soll.

  • Im Beispielcode wird die maximale Zeit festgelegt, die Cloud Tasks auf die Ausführung einer Aufgabe warten soll. Bei Cloud Tasks wird die Aufgabe gemäß der Wiederholungskonfiguration der Warteschlange oder bis zu diesem Termin wiederholt. Im Beispiel ist die Warteschlange so konfiguriert, dass die Aufgabe bis zu fünfmal wiederholt wird. Die Aufgabe wird jedoch automatisch abgebrochen, wenn der gesamte Prozess (einschließlich Wiederholungsversuche) mehr als 5 Minuten dauert.

Ziel-URI abrufen und einfügen

Da Cloud Tasks Authentifizierungstokens zum Authentifizieren von Anfragen an die zugrunde liegenden Aufgabenwarteschlangenfunktionen erstellt, müssen Sie die Cloud Run-URL der Funktion angeben, wenn Sie Aufgaben in die Warteschlange stellen. Wir empfehlen, die URL für Ihre Funktion wie unten gezeigt programmatisch abzurufen:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Fehlerbehebung

Cloud Tasks">Cloud Tasks-Logging aktivieren

Logs von Cloud Tasks enthalten nützliche Diagnoseinformationen wie den Status der Anfrage, die mit einer Aufgabe verknüpft ist. Standardmäßig sind Logs von Cloud Tasks deaktiviert, da sie möglicherweise eine große Menge an Logs für Ihr Projekt generieren. Wir empfehlen, die Debug-Logs zu aktivieren, während Sie Ihre Task-Queue-Funktionen aktiv entwickeln und debuggen. Weitere Informationen finden Sie unter Logging aktivieren.

IAM-Berechtigungen

Möglicherweise werden PERMISSION DENIED-Fehler angezeigt, wenn Aufgaben in die Warteschlange gestellt werden oder Cloud Tasks versucht, die Funktionen der Aufgabenwarteschlange aufzurufen. Achten Sie darauf, dass Ihr Projekt die folgenden IAM-Bindungen hat:

  • Die Identität, mit der Aufgaben in Cloud Tasks eingereiht werden, benötigt die IAM-Berechtigung cloudtasks.tasks.create.

    Im Beispiel ist das das App Engine-Standarddienstkonto.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Die Identität, die zum Einreihen von Aufgaben in Cloud Tasks verwendet wird, benötigt die Berechtigung zur Verwendung des Dienstkontos, das mit einer Aufgabe in Cloud Tasks verknüpft ist.

    Im Beispiel ist das das App Engine-Standarddienstkonto.

In der Google Cloud IAM-Dokumentation wird beschrieben, wie Sie das Standarddienstkonto App Engine als Nutzer des Standarddienstkontos App Engine hinzufügen.

  • Die zum Auslösen der Aufgabenwarteschlangenfunktion verwendete Identität benötigt die Berechtigung cloudfunctions.functions.invoke.

    Im Beispiel ist dies das App Engine-Standarddienstkonto

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker