Funktionen mit Cloud Tasks in die Warteschlange stellen


Funktionen für Aufgabenwarteschlangen nutzen Google Cloud Tasks damit deine App zeit-, ressourcen- oder bandbreitenintensiv ist asynchron, also außerhalb Ihres Hauptanwendungsablaufs, ausgeführt werden.

Stellen Sie sich beispielsweise vor, Sie möchten Sicherungen eines großen Satzes von Images erstellen. -Dateien, die derzeit in einer API mit Ratenbegrenzung gehostet werden. Um ein Nutzer dieser API verantwortlich sind, müssen Sie deren Ratenbegrenzungen einhalten. Außerdem Diese Art von lang andauernden Jobs könnte aufgrund von Zeitüberschreitungen und Arbeitsspeicherlimits.

Um diese Komplexität zu verringern, können Sie eine Aufgabenwarteschlangenfunktion schreiben, wie scheduleTime und dispatchDeadline und gibt dann das an eine Warteschlange in Cloud Tasks übergeben. Das Cloud Tasks für eine effektive Überlastungskontrolle und Wiederholungsrichtlinien für diese Arten von Vorgängen.

Das Firebase SDK für die Interoperabilität von Cloud Functions for Firebase ab Version 3.20.1 mit Firebase Admin SDK v10.2.0 und höher, um Warteschlangenfunktionen zu unterstützen.

Die Verwendung von Aufgabenwarteschlangen-Funktionen mit Firebase kann zu Kosten für Cloud Tasks wird verarbeitet. Weitere Informationen finden Sie unter Cloud Tasks-Preise .

Funktionen für Aufgabenwarteschlangen erstellen

Gehen Sie folgendermaßen vor, um Funktionen für Aufgabenwarteschlangen zu verwenden:

  1. Mit dem Firebase SDK für Cloud Functions eine Task-Warteschlangenfunktion schreiben
  2. Testen Sie die Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
  3. Stellen Sie die Funktion mit der Firebase-Befehlszeile bereit. Beim Bereitstellen der Aufgabe Warteschlangenfunktion zum ersten Mal verwendet, erstellt die Befehlszeile Aufgabenwarteschlange in Cloud Tasks mit Optionen (Ratenbegrenzung und Wiederholungsversuch), die im Quellcode angegeben sind.
  4. Fügen Sie der neu erstellten Aufgabenwarteschlange Aufgaben hinzu und geben Sie bei Bedarf Parameter für die Einrichtung eines Ausführungszeitplans an. Sie erreichen dies, indem Sie den Code mithilfe von Admin SDK und Bereitstellen in Cloud Functions for Firebase.

Aufgabenwarteschlangenfunktionen schreiben

Codebeispiele in diesem Abschnitt basieren auf einer App, die einen Dienst, der alle Bilder der NASA Astronomiebild des Tages. Importieren Sie zuerst die erforderlichen Module:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

onTaskDispatched verwenden oder on_task_dispatched für Aufgabenwarteschlangen-Funktionen. Beim Schreiben einer Aufgabenwarteschlangenfunktion können Sie Wiederholungs- und Ratenbegrenzungskonfigurationen festlegen.

Funktionen für Aufgabenwarteschlangen konfigurieren

Funktionen für Aufgabenwarteschlangen enthalten leistungsstarke Konfigurationseinstellungen können Sie Ratenbegrenzungen und das Wiederholungsverhalten einer Aufgabenwarteschlange genau steuern:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünfmal versucht. So lassen sich vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen externen Dienstes abmildern.

  • retryConfig.minBackoffSeconds=60: Jede Aufgabe wird mindestens 60 Sekunden wiederholt. abgesehen von jedem Versuch. Dadurch entsteht ein großer Puffer zwischen den damit wir die fünf Wiederholungsversuche nicht zu schnell erschöpfen.

  • rateLimits.maxConcurrentDispatch=6: Es werden maximal 6 Aufgaben in einem Schritt zu einer bestimmten Zeit. Dadurch wird ein stetiger Strom von Anfragen an die zugrunde liegende und reduziert die Anzahl aktiver Instanzen und Kaltstarts.

Funktionen der Aufgabenwarteschlange testen

In den meisten Fällen eignet sich der Cloud Functions-Emulator am besten zum Testen von Aufgabenwarteschlangenfunktionen. In der Emulator Suite-Dokumentation erfahren Sie, wie Sie Ihre App für die Emulation von Funktionen in Aufgabenwarteschlangen instrumentieren.

Darüber hinaus werden die Task-Queue-functions_sdk in der Firebase Local Emulator Suite als einfache HTTP-Funktionen bereitgestellt. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage senden. Anfrage mit einer JSON-Datennutzlast:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Funktionen für Aufgabenwarteschlangen bereitstellen

Stellen Sie die Funktion der Aufgabenwarteschlange mithilfe der Firebase-Befehlszeile bereit:

$ firebase deploy --only functions:backupapod

Bei der ersten Bereitstellung einer Aufgabenwarteschlangenfunktion erstellt die Befehlszeile ein Aufgabenwarteschlange in Cloud Tasks mit Optionen (Ratenbegrenzung und Wiederholungsversuch) die im Quellcode angegeben sind.

Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, prüfen Sie, ob der geeignete IAM-Rollen werden dem Nutzer zugewiesen, der die Bereitstellungsbefehle ausführt.

Funktionen für Aufgabenwarteschlangen in Warteschlange

Aufgabenwarteschlangen-Funktionen können in Cloud Tasks von einem vertrauenswürdigen wie Cloud Functions for Firebase mit dem Firebase Admin SDK für Node.js- oder Google Cloud-Bibliotheken für Python. Wenn Sie neu bei Admin SDK sind, lesen Sie Fügen Sie Firebase einem Server hinzu, um zu beginnen.

Bei einem typischen Ablauf wird eine neue Aufgabe erstellt, Cloud Tasks und legt die Konfiguration für die Aufgabe fest:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Der Beispielcode versucht, die Ausführung durch Zuordnen einer Verzögerung von N-ten Minuten für die N-te Aufgabe. Das entspricht etwa einer Aufgabe pro Minute. Sie können auch scheduleTime (Node.js) oder schedule_time (Python) verwenden, wenn Cloud Tasks eine Aufgabe zu einer bestimmten Zeit auslösen soll.

  • Der Beispielcode legt den maximalen Zeitraum fest, Cloud Tasks wartet zu erledigen. Cloud Tasks versucht es noch einmal die Aufgabe nach dem Wiederholungsversuch. Konfiguration der Warteschlange oder bis diese Frist erreicht ist. Im Beispiel Die Warteschlange ist so konfiguriert, dass die Aufgabe bis zu fünfmal wiederholt wird, aber die Aufgabe ist automatisch abgebrochen, wenn der gesamte Vorgang (einschließlich Wiederholungsversuche) dauert mehr als 5 Minuten.

Ziel-URI abrufen und einschließen

Aufgrund der Art und Weise, wie Cloud Tasks Authentifizierungstokens zur Authentifizierung erstellt an die zugrunde liegenden Aufgabenwarteschlangenfunktionen sendet, müssen Sie die Methode Cloud Run-URL der Funktion beim Einfügen von Aufgaben in die Warteschlange. Mi. empfehlen Ihnen, die URL für Ihre Funktion programmatisch abzurufen als unten gezeigt:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Fehlerbehebung

Cloud Tasks">Cloud Tasks-Logging aktivieren

Logs von Cloud Tasks enthalten nützliche Diagnoseinformationen, z. B.: Status der Anfrage, die einer Aufgabe zugeordnet ist. Standardmäßig sind Logs von Cloud Tasks deaktiviert, da sie möglicherweise eine große Menge an Logs für Ihr Projekt generieren. Wir empfehlen, die Fehlerbehebungsprotokolle zu aktivieren während Sie Ihre Aufgabenwarteschlangen-Funktionen aktiv entwickeln und Fehler beheben. Weitere Informationen finden Sie unter Wird eingeschaltet Logging

IAM-Berechtigungen

Möglicherweise werden PERMISSION DENIED-Fehler angezeigt, wenn Aufgaben in die Warteschlange gestellt werden oder Cloud Tasks versucht, die Funktionen der Aufgabenwarteschlange aufzurufen. Achten Sie darauf, dass die Projekt hat die folgenden IAM-Bindungen:

  • Die Identität, mit der Aufgaben in Cloud Tasks eingereiht werden, benötigt die IAM-Berechtigung cloudtasks.tasks.create.

    Im Beispiel ist das das App Engine-Standarddienstkonto.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Die Identität, die verwendet wird, um Aufgaben in Cloud Tasks in die Warteschlange zu stellen, benötigt eine Berechtigung , um das Dienstkonto zu verwenden, das mit einer Aufgabe in Cloud Tasks verknüpft ist.

    Im Beispiel ist dies das App Engine-Standarddienstkonto.

Siehe Google Cloud IAM-Dokumentation für Anweisungen zum Hinzufügen des App Engine-Standarddienstkontos als Nutzer des Standarddienstkontos App Engine.

  • Die zum Auslösen der Aufgabenwarteschlangenfunktion verwendete Identität muss Berechtigung „cloudfunctions.functions.invoke“.

    Im Beispiel ist dies das App Engine-Standarddienstkonto

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker