Pon funciones en cola con Cloud Tasks


Las funciones de la lista de tareas en cola aprovechan Google Cloud Tasks para ayudar a tu app a ejecutar tareas que consumen mucho tiempo, usan muchos recursos o tienen un ancho de banda limitado, de manera asíncrona y fuera del flujo de tu aplicación principal.

Por ejemplo, imagina que deseas crear copias de seguridad de un gran conjunto de archivos de imagen que están alojados en una API con un límite de frecuencia. A fin de ser un consumidor responsable de esa API, debes respetar tales límites. Además, este tipo de trabajo de larga duración podría ser vulnerable a fallas debido a los tiempos de espera y los límites de memoria.

Para mitigar esta complejidad, puedes escribir una función de lista de tareas en cola que establezca opciones básicas de tareas, como scheduleTime y dispatchDeadline, y, luego, entregue la función a una cola de Cloud Tasks. El entorno de Cloud Tasks está diseñado específicamente para garantizar el control eficaz de la congestión y las políticas de reintento de estos tipos de operaciones.

El SDK de Cloud Functions para Firebase 3.20.1 y las versiones posteriores funcionan en conjunto con el SDK de Firebase Admin 10.2.0 y las versiones posteriores para admitir las funciones de lista de tareas en cola.

El uso de funciones de la lista de tareas en cola con Firebase puede generar cargos por el procesamiento de Cloud Tasks. Consulta Precios de Cloud Tasks para obtener más información.

Crea funciones de lista de tareas en cola

Para usar las funciones de la lista de tareas en cola, sigue este flujo de trabajo:

  1. Escribe una función de lista de tareas en cola con el SDK de Firebase para Cloud Functions.
  2. Prueba tus funciones con Firebase Local Emulator Suite.
  3. Implementa la función con Firebase CLI. Cuando implementas tu función de lista de tareas en cola por primera vez, la CLI crea una lista de tareas en cola en Cloud Tasks con las opciones (límite de frecuencia y reintento) especificadas en el código fuente.
  4. Agrega tareas a la lista de tareas en cola nueva y pasa parámetros para configurar una programación de ejecución si es necesario. Para lograrlo, escribe el código mediante el SDK de Admin y, luego, impleméntalo en Cloud Functions para Firebase.

Escribe funciones de lista de tareas en cola

Las muestras de código de esta sección se basan en una app que configura un servicio capaz de crear una copia de seguridad de todas las imágenes de la Astronomy Picture of the Day de la NASA: Para comenzar, importa los módulos obligatorios:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
const HttpsError = functions.https.HttpsError;

Python (versión preliminar)

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Usa onTaskDispatched o on_task_dispatched para las funciones de lista de tareas en cola. Cuando escribes una función de lista de tareas en cola, puedes establecer el reintento por cola y la configuración de límite de frecuencia.

Configuración de lista de tareas en cola

Las funciones de lista de tareas en cola vienen con un conjunto potente de opciones de configuración para controlar con precisión los límites de frecuencia y el comportamiento de reintentos de una lista de tareas en cola:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python (vista previa)

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Cada tarea de la lista de tareas en cola se vuelve a intentar de manera automática hasta 5 veces, lo que ayuda a mitigar errores transitorios, como errores de red, o interrupciones temporales de un servicio externo dependiente.

  • retryConfig.minBackoffSeconds=60: Cada tarea se reintenta al menos durante 60 segundos fuera de cada intento. Esto proporciona un gran búfer entre cada intento, de manera que no nos apresuremos a agotar los 5 reintentos demasiado rápido.

  • rateLimits.maxConcurrentDispatch=6: Como máximo, se envían 6 tareas a la vez. Esto garantiza un flujo constante de solicitudes a la función subyacente y, a su vez, reduce la cantidad de instancias activas y los inicios en frío.

Prueba las funciones de lista de tareas en cola con Firebase Local Emulator Suite

Las funciones de lista de tareas en cola de Firebase Local Emulator Suite se exponen como funciones de HTTP simples. Para probar una función de tarea emulada, envía una solicitud HTTP POST con una carga útil de datos JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Implementa la función de lista de tareas en cola

Implementa la función de lista de tareas en cola con Firebase CLI:

$ firebase deploy --only functions:backupapod

Cuando implementas la función de lista de tareas en cola por primera vez, la CLI crea una lista de tareas en cola en Cloud Tasks con las opciones (límite de frecuencia y reintento) especificadas en el código fuente.

Si ves errores de permisos cuando implementes funciones, asegúrate de que se asignen los roles de IAM adecuados al usuario que ejecuta los comandos de implementación.

Coloca la función en cola

Las funciones de la lista de tareas en cola se pueden poner en cola en Cloud Tasks desde un entorno de servidor confiable, como Cloud Functions para Firebase, mediante el SDK de Firebase Admin para Node.js o las bibliotecas de Google Cloud para Python. Si no conoces los SDKs de Admin, consulta Agrega Firebase a un servidor para comenzar.

Un flujo típico crea una tarea nueva, la pone en cola en Cloud Tasks y define la configuración de la tarea:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python (vista previa)

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(
        params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1, "backupapod"
    )
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(
            http_request={
                "http_method": tasks_v2.HttpMethod.POST,
                "url": target_uri,
                "headers": {"Content-type": "application/json"},
                "body": json.dumps(body).encode(),
            },
            schedule_time=schedule_time,
        )
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(
        status=200, response=f"Enqueued {BACKUP_COUNT} tasks"
    )


  • El código de muestra intenta distribuir la ejecución de tareas asociando un retraso de n minutos a la enésima tarea. Esto se traduce en la activación de aproximadamente 1 tarea por minuto. Ten en cuenta que también puedes usar scheduleTime (Node.js) o schedule_time (Python) si quieres que Cloud Tasks active una tarea en un momento específico.

  • El código de muestra establece la cantidad máxima de tiempo que Cloud Tasks esperará para que se complete una tarea. Cloud Tasks volverá a intentar la tarea después de volver a configurar la cola o hasta que se cumpla el plazo. En la muestra, la cola se configura para reintentar la tarea hasta 5 veces, pero la tarea se cancela automáticamente si todo el proceso (incluidos los reintentos) tarda más de 5 minutos.

Recupera e incluye el URI de destino

Debido a la forma en que Cloud Tasks crea tokens de autenticación para autenticar solicitudes a las funciones subyacentes de la lista de tareas en cola, debes especificar la URL de Cloud Run de la función cuando agregues tareas a una cola. Te recomendamos que recuperes de manera programática la URL de tu función como se muestra a continuación:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python (vista previa)

def get_function_url(
    name: str, location: str = SupportedRegion.US_CENTRAL1
) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    authed_session = AuthorizedSession(credentials)
    url = (
        "https://cloudfunctions.googleapis.com/v2beta/"
        + f"projects/{project_id}/locations/{location}/functions/{name}"
    )
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url


Soluciona problemas

Activa el registro de Cloud Tasks

Los registros de Cloud Tasks contienen información de diagnóstico útil, como el estado de la solicitud asociada con una tarea. De forma predeterminada, los registros de Cloud Tasks están desactivados debido al gran volumen de registros que se puede generar en tu proyecto. Te recomendamos que actives los registros de depuración mientras desarrollas y depuras de forma activa las funciones de tu lista de tareas en cola. Consulta Activa el registro.

Permisos de IAM

Es posible que veas errores PERMISSION DENIED cuando pongas en cola las tareas o cuando Cloud Tasks intente invocar las funciones de la lista de tareas en cola. Asegúrate de que tu proyecto tenga las siguientes vinculaciones de IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

Consulta la documentación de Google Cloud IAM y obtén instrucciones para agregar la cuenta de servicio predeterminada de App Engine como usuario de ella.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker