Accoda le funzioni con Cloud Tasks


Le funzioni delle code di attività sfruttano Google Cloud Tasks per eseguire app che richiedono molto tempo, risorse o larghezza di banda limitate in modo asincrono al di fuori del flusso principale dell'applicazione.

Ad esempio, immagina di voler creare backup di un grande insieme di attualmente ospitati su un'API con un limite di frequenza. Per essere un un consumatore responsabile dell'API, devi rispettarne i limiti di frequenza. Inoltre, questo tipo di job a lunga esecuzione potrebbe essere vulnerabile a errori dovuti a timeout e limiti di memoria.

Per ridurre questa complessità, puoi scrivere una funzione di coda di attività che stabilisca per le opzioni dell'attività come scheduleTime e dispatchDeadline, poi passa il in una coda in Cloud Tasks. Cloud Tasks è stato progettato specificamente per garantire un efficace controllo della congestione e nuovi criteri per questo tipo di operazioni.

L'SDK Firebase per Cloud Functions for Firebase versione 3.20.1 e successive è interoperabile con Firebase Admin SDK versione 10.2.0 e successive per supportare le funzioni di coda di attività.

L'utilizzo delle funzioni di coda di attività con Firebase può comportare addebiti per l'elaborazioneCloud Tasks. Per ulteriori informazioni, consulta la pagina relativa ai prezzi di Cloud Tasks.

Creare funzioni di coda di attività

Per utilizzare le funzioni di coda di attività, segui questo flusso di lavoro:

  1. Scrivi una funzione di coda di attività utilizzando l'SDK Firebase per Cloud Functions.
  2. Testa la funzione attivandola con una richiesta HTTP.
  3. Esegui il deployment della funzione con l'interfaccia a riga di comando Firebase. Quando esegui il deployment dell'attività funzione di coda per la prima volta, l'interfaccia a riga di comando creerà una coda di attività in Cloud Tasks con opzioni (limitazione di frequenza e nuovo tentativo) specificate nel codice sorgente.
  4. Aggiungi attività alla coda di attività appena creata, passando i parametri per impostare una pianificazione di esecuzione, se necessario. Puoi farlo scrivendo il codice utilizzando Admin SDK e eseguendone il deployment in Cloud Functions for Firebase.

Scrittura funzioni coda di attività

Gli esempi di codice in questa sezione sono basati su un'app che imposta che esegue il backup di tutte le immagini Immagine del giorno astronomica. Per iniziare, importa i moduli richiesti:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilizza onTaskDispatched oppure on_task_dispatched per le funzioni delle code di attività. Quando scrivi una funzione di coda di attività, puoi impostare la configurazione di ripetizione e limitazione della frequenza per coda.

Configura le funzioni della coda di attività

Le funzioni di coda di attività sono dotate di un potente insieme di impostazioni di configurazione per controllare con precisione i limiti di frequenza e il comportamento di nuovo tentativo di una coda di attività:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: ogni attività nella coda viene automaticamente nuovi tentativi per 5 volte. In questo modo è possibile ridurre gli errori temporanei come quelli di rete o l'interruzione temporanea del servizio di un servizio esterno dipendente.

  • retryConfig.minBackoffSeconds=60: viene riprovata ogni attività per almeno 60 secondi a parte ogni tentativo. Ciò fornisce un ampio margine tra un tentativo e l'altro per evitare di esaurire i 5 nuovi tentativi troppo rapidamente.

  • rateLimits.maxConcurrentDispatch=6: al massimo vengono inviate 6 attività a un in un determinato momento. Ciò contribuisce a garantire un flusso costante di richieste all'oggetto sottostante e aiuta a ridurre il numero di istanze attive e avvii a freddo.

Testare le funzioni della coda di attività

Nella maggior parte dei casi, l'emulatore Cloud Functions è il modo migliore per testare l'attività funzioni di coda. Consulta la documentazione di Emulator Suite per scoprire come utilizzare la tua app per l'emulazione delle funzioni delle code di attività.

Inoltre, le funzioni_sdk della coda di attività vengono esposte come semplici funzioni HTTP nel Firebase Local Emulator Suite. Puoi testare una funzione di task emulata inviando una richiesta POST HTTP con un payload di dati JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Esegui il deployment delle funzioni di coda di attività

Esegui il deployment della funzione coda di attività utilizzando l'interfaccia a riga di comando Firebase:

$ firebase deploy --only functions:backupapod

Quando esegui il deployment di una funzione coda di attività per la prima volta, l'interfaccia a riga di comando crea una coda di attività in Cloud Tasks con opzioni (limitazione di frequenza e nuovo tentativo) specificato nel codice sorgente.

Se si verificano errori relativi alle autorizzazioni durante il deployment delle funzioni, assicurati che i ruoli IAM appropriati vengono assegnate all'utente che esegue i comandi di deployment.

Accoda le funzioni della coda di attività

Le funzioni della coda di attività possono essere messe in coda in Cloud Tasks da un ambiente server attendibile come Cloud Functions for Firebase utilizzando Firebase Admin SDK per Node.js o le librerie Google Cloud per Python. Se non hai mai utilizzato i Admin SDK, consulta Aggiungere Firebase a un server per iniziare.

Un flusso tipico crea una nuova attività, la mette in coda Cloud Tasks e imposta la configurazione per l'attività:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Il codice campione cerca di distribuire l'esecuzione mediante l'associazione di un ritardo di ennesimo minuto per l'attività N-esimo. Ciò equivale a attivare circa 1 attività al minuto. Tieni presente che puoi anche utilizzare scheduleTime (Node.js) o schedule_time (Python) se vuoi Cloud Tasks attivare un'attività in un momento specifico.

  • Il codice di esempio imposta il tempo massimo che Cloud Tasks attenderà per il completamento di un'attività. Cloud Tasks riproverà l'attività dopo il nuovo tentativo configurazione della coda o fino al raggiungimento di questa scadenza. Nell'esempio, la coda è configurata per ripetere l'attività fino a cinque volte, ma l'attività annullato automaticamente se l'intero processo (inclusi i nuovi tentativi) richiede più di cinque minuti.

Recupera e includi l'URI target

A causa del modo in cui Cloud Tasks crea i token di autenticazione per autenticare le richieste alle funzioni di coda di attività sottostanti, devi specificare l'URL Cloud Run della funzione quando inserisci le attività in coda. Me consigliamo di recuperare in modo programmatico l'URL per la funzione come mostrato di seguito:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Risoluzione dei problemi

Cloud Tasks">Attiva il logging di Cloud Tasks

I log da Cloud Tasks contengono informazioni diagnostiche utili come della richiesta associata a un'attività. Per impostazione predefinita, i log di Cloud Tasks sono disattivati a causa dell'elevato volume di log che possono potenzialmente generare nel tuo progetto. Ti consigliamo di attivare i log di debug mentre sviluppa e debug attivamente le funzioni della coda delle attività. Consulta Attivazione in corso il logging.

Autorizzazioni IAM

Potresti vedere PERMISSION DENIED di errori quando accodi le attività o quando Cloud Tasks cerca di richiamare le funzioni della coda di attività. Assicurati che il progetto ha le seguenti associazioni IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identità utilizzata per mettere in coda le attività in Cloud Tasks deve disporre dell'autorizzazione per utilizzare l'account di servizio associato a un'attività in Cloud Tasks.

    Nell'esempio, questo è l'account di servizio predefinito App Engine.

Consulta la documentazione di Google Cloud IAM per istruzioni su come aggiungere l'account di servizio predefinito App Engine in qualità di utente dell'account di servizio predefinito App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker