Pon funciones en cola con Cloud Tasks


Las funciones de la lista de tareas en cola aprovechan Google Cloud Tasks para ayudar a tu app a ejecutar tareas que consumen mucho tiempo, usan muchos recursos o tienen un ancho de banda limitado, de manera asíncrona y fuera del flujo de tu aplicación principal.

Por ejemplo, imagina que deseas crear copias de seguridad de un gran conjunto de archivos de imagen que están alojados en una API con un límite de frecuencia. A fin de ser un consumidor responsable de esa API, debes respetar tales límites. Además, este tipo de trabajo de larga duración podría ser vulnerable a fallas debido a los tiempos de espera y los límites de memoria.

Para mitigar esta complejidad, puedes escribir una función de lista de tareas en cola que establezca opciones básicas de tareas, como scheduleTime y dispatchDeadline, y, luego, entregue la función a una cola de Cloud Tasks. El entorno de Cloud Tasks está diseñado específicamente para garantizar el control eficaz de la congestión y las políticas de reintento de estos tipos de operaciones.

El SDK de Firebase para Cloud Functions for Firebase 3.20.1 y versiones posteriores funcionan en conjunto con Firebase Admin SDK 10.2.0 y versiones posteriores para admitir las funciones de lista de tareas en cola.

El uso de funciones de la lista de tareas en cola con Firebase puede generar cargos por el procesamiento de Cloud Tasks. Consulta Cloud Tasks Precios para obtener más información.

Crea funciones de lista de tareas en cola

Para usar las funciones de la lista de tareas en cola, sigue este flujo de trabajo:

  1. Escribe una función de lista de tareas en cola con el SDK de Firebase para Cloud Functions.
  2. Para probar la función, actívala con una solicitud HTTP.
  3. Implementa la función con Firebase CLI. Cuando implementas tu función de lista de tareas en cola por primera vez, la CLI crea una lista de tareas en cola en Cloud Tasks con las opciones (límite de frecuencia y reintento) especificadas en el código fuente.
  4. Agrega tareas a la lista de tareas en cola nueva y pasa parámetros para configurar una programación de ejecución si es necesario. Para lograrlo, puedes escribir el código con Admin SDK y, luego, implementarlo en Cloud Functions for Firebase.

Escribe funciones de lista de tareas en cola

Las muestras de código de esta sección se basan en una app que configura un servicio capaz de crear una copia de seguridad de todas las imágenes de la Astronomy Picture of the Day de la NASA: Para comenzar, importa los módulos obligatorios:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Usa onTaskDispatched o on_task_dispatched para las funciones de lista de tareas en cola. Cuando escribes una función de lista de tareas en cola, puedes establecer el reintento por cola y la configuración de límite de frecuencia.

Configura funciones de lista de tareas en cola

Las funciones de lista de tareas en cola vienen con un conjunto potente de opciones de configuración para controlar con precisión los límites de frecuencia y el comportamiento de reintentos de una lista de tareas en cola:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Cada tarea de la lista de tareas en cola se vuelve a intentar de manera automática hasta 5 veces, lo que ayuda a mitigar errores transitorios, como errores de red, o interrupciones temporales de un servicio externo dependiente.

  • retryConfig.minBackoffSeconds=60: Cada tarea se reintenta al menos durante 60 segundos fuera de cada intento. Esto proporciona un gran búfer entre cada intento, de manera que no nos apresuremos a agotar los 5 reintentos demasiado rápido.

  • rateLimits.maxConcurrentDispatch=6: Como máximo, se envían 6 tareas a la vez. Esto garantiza un flujo constante de solicitudes a la función subyacente y, a su vez, reduce la cantidad de instancias activas y los inicios en frío.

Prueba las funciones de lista de tareas en cola

En la mayoría de los casos, el emulador de Cloud Functions es la mejor manera de probar las funciones de lista de tareas en cola. Consulta la documentación de Emulator Suite para aprender a instrumentar tu app para emular las funciones de lista de tareas en cola.

Además, functions_sdk de la lista de tareas en cola se expone como funciones de HTTP simples en Firebase Local Emulator Suite. Para probar una función de tarea emulada, envía una solicitud HTTP POST con una carga útil de datos JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Implementa funciones de lista de tareas en cola

Implementa la función de lista de tareas en cola con Firebase CLI:

$ firebase deploy --only functions:backupapod

Cuando implementas la función de lista de tareas en cola por primera vez, la CLI crea una lista de tareas en cola en Cloud Tasks con las opciones (límite de frecuencia y reintento) especificadas en el código fuente.

Si ves errores de permisos cuando implementes funciones, asegúrate de que se asignen los roles de IAM adecuados al usuario que ejecuta los comandos de implementación.

Cómo poner funciones de lista de tareas en cola

Las funciones de lista de tareas en cola se pueden poner en cola en Cloud Tasks desde un entorno de servidor confiable, como Cloud Functions for Firebase, con Firebase Admin SDK para Node.js o las bibliotecas de Google Cloud para Python. Si no conoces los Admin SDK, consulta Agrega Firebase a un servidor para comenzar.

Un flujo típico crea una tarea nueva, la pone en cola en Cloud Tasks y establece la configuración de la tarea:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • El código de muestra intenta distribuir la ejecución de tareas asociando un retraso de n minutos a la enésima tarea. Esto se traduce en la activación de aproximadamente 1 tarea por minuto. Ten en cuenta que también puedes usar scheduleTime (Node.js) o schedule_time (Python) si quieres que Cloud Tasks active una tarea en un momento específico.

  • El código de muestra establece la cantidad máxima de tiempo que Cloud Tasks esperará para que se complete una tarea. Cloud Tasks volverá a intentar la tarea después de volver a configurar la cola o hasta que se cumpla el plazo. En la muestra, la cola se configura para reintentar la tarea hasta 5 veces, pero la tarea se cancela automáticamente si todo el proceso (incluidos los reintentos) tarda más de 5 minutos.

Recupera e incluye el URI de destino

Debido a la forma en que Cloud Tasks crea tokens de autenticación para autenticar solicitudes a las funciones subyacentes de la lista de tareas en cola, debes especificar la URL de Cloud Run de la función cuando agregues tareas a una cola. Te recomendamos que recuperes de manera programática la URL de tu función como se muestra a continuación:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Soluciona problemas

Cómo activar el registro de Cloud Tasks

Los registros de Cloud Tasks contienen información de diagnóstico útil, como el estado de la solicitud asociada con una tarea. De forma predeterminada, los registros de Cloud Tasks están desactivados debido al gran volumen de registros que se puede generar en tu proyecto. Te recomendamos que actives los registros de depuración mientras desarrollas y depuras de forma activa las funciones de tu lista de tareas en cola. Consulta Activa el registro.

Permisos de IAM

Es posible que veas errores PERMISSION DENIED cuando pongas en cola las tareas o cuando Cloud Tasks intente invocar las funciones de la lista de tareas en cola. Asegúrate de que tu proyecto tenga las siguientes vinculaciones de IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

Consulta la documentación de Google Cloud IAM y obtén instrucciones para agregar la cuenta de servicio predeterminada de App Engine como usuario de la cuenta de servicio predeterminada de App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker