使用 Cloud Tasks 将函数加入队列


任务队列 (task queue) 函数充分利用 Google Cloud Tasks 来帮助您的应用在主应用流之外异步运行耗时、资源密集型或带宽受限的任务。

例如,假设您想要为当前通过某个 API 托管的大量图片文件创建备份,但该 API 设有速率限制。为了负责任地使用该 API,您需要遵循其速率限制。此外,由于超时和内存限制,这种长时间运行的作业可能很容易发生错误。

为了降低这种复杂性,您可以编写一个任务队列函数来设置 scheduleTimedispatchDeadline 等基本任务选项,然后将该函数传递到 Cloud Tasks 中的某个队列。Cloud Tasks 环境经过特别设计,可确保针对此类操作实现有效的拥塞控制和重试政策。

Firebase SDK for Cloud Functions for Firebase v3.20.1 及更高版本可与 Firebase Admin SDK v10.2.0 及更高版本进行交互,以支持任务队列函数。

在 Firebase 中使用任务队列函数可能会产生 Cloud Tasks 处理费用。如需了解详情,请参阅 Cloud Tasks 价格

创建任务队列函数

如需使用任务队列函数,请按以下流程操作:

  1. 使用 Firebase SDK for Cloud Functions 编写任务队列函数。
  2. 使用 Firebase Local Emulator Suite 在本地测试您的函数。
  3. 使用 Firebase CLI 部署您的函数。首次部署任务队列函数时,CLI 会在 Cloud Tasks 中创建一个采用源代码中指定的选项(速率限制和重试)的任务队列。
  4. 将任务添加到新建的任务队列中,并传递参数以设置执行时间(如果需要)。您可以使用 Admin SDK 编写代码并将其部署到 Cloud Functions for Firebase 来实现此目的。

编写任务队列函数

本部分中的代码示例基于一个应用,该应用设置了一项服务,可备份美国国家航空航天局 (NASA) 的每日一天文图中的所有图片。首先,导入所需的模块:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
const HttpsError = functions.https.HttpsError;

Python(预览版)

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

对于任务队列函数,请使用 onTaskDispatchedon_task_dispatched。在编写任务队列函数时,您可以设置每个队列的重试和速率限制配置。

任务队列配置

任务队列函数附带一组强大的配置,可用于精确控制任务队列的速率限制和重试行为:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python(预览版)

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5:任务队列中的每个任务最多自动重试 5 次。这有助于减少暂时性错误,例如网络错误或所依赖的外部服务临时中断。

  • retryConfig.minBackoffSeconds=60:每个任务的重试间隔时间至少为 60 秒。这样可以在上一次和下一次尝试之间留出较长的缓冲时间,避免过早用尽 5 次重试机会。

  • rateLimits.maxConcurrentDispatch=6:同时最多调度 6 个任务。这有助于确保稳定地向底层函数发送请求,同时有助于减少活跃实例数和冷启动次数。

使用 Firebase Local Emulator Suite 测试任务队列函数

Firebase Local Emulator Suite 中的任务队列函数以简单的 HTTP 函数的形式公开。您可以通过发送包含 json 数据载荷的 HTTP POST 请求来测试模拟任务函数:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

部署任务队列函数

使用 Firebase CLI 部署任务队列函数:

$ firebase deploy --only functions:backupapod

首次部署任务队列函数时,CLI 会在 Cloud Tasks 中创建一个采用源代码中指定的选项(速率限制和重试)的任务队列。

如果在部署函数时遇到权限错误,请确保已将适当的 IAM 角色分配给运行部署命令的用户。

将函数加入队列

可以使用 Node.js 版 Firebase Admin SDK 或 Python 版 Google Cloud 库在 Cloud Tasks 中将任务队列函数从受信任的服务器环境(如 Cloud Functions for Firebase)加入队列。如果您刚开始接触 Admin SDK,请先参阅将 Firebase 添加到服务器

在典型流程中,系统通常会创建新任务,在 Cloud Tasks 中将其加入队列,并选择该任务的配置:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python(预览版)

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(
        params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1, "backupapod"
    )
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(
            http_request={
                "http_method": tasks_v2.HttpMethod.POST,
                "url": target_uri,
                "headers": {"Content-type": "application/json"},
                "body": json.dumps(body).encode(),
            },
            schedule_time=schedule_time,
        )
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(
        status=200, response=f"Enqueued {BACKUP_COUNT} tasks"
    )


  • 此示例代码会尝试将第 N 个任务推迟在第 N 分钟执行,从而分散执行任务。这意味着,每分钟大约会触发 1 个任务。请注意,如果您希望 Cloud Tasks 在特定时间触发任务,也可以使用 scheduleTime (Node.js) 或 schedule_time (Python)。

  • 示例代码会设置 Cloud Tasks 等待任务完成的最长时间。Cloud Tasks 会根据队列的重试配置或此截止期限来重试任务。在该示例中,队列配置为最多重试任务 5 次,但如果整个过程(包括重试过程)超过 5 分钟,则任务会自动取消。

检索并包含目标 URI

由于 Cloud Tasks 是通过创建身份验证令牌来对向底层任务队列函数发出的请求进行身份验证的,您必须在将任务加入队列时指定该函数的 Cloud Run 网址。我们建议您通过编程方式检索函数的网址,如下所示:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python(预览版)

def get_function_url(
    name: str, location: str = SupportedRegion.US_CENTRAL1
) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    authed_session = AuthorizedSession(credentials)
    url = (
        "https://cloudfunctions.googleapis.com/v2beta/"
        + f"projects/{project_id}/locations/{location}/functions/{name}"
    )
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url


问题排查

开启 Cloud Tasks 日志记录

Cloud Tasks 中的日志包含有用的诊断信息,例如与任务关联的请求的状态。默认情况下,Cloud Tasks 日志处于关闭状态,因为项目可能会生成大量日志。我们建议您在开发和调试任务队列函数时开启调试日志。请参阅开启日志记录

IAM 权限

将任务加入队列或在 Cloud Tasks 尝试调用任务队列函数时,您可能会看到 PERMISSION DENIED 错误。请确保您的项目具有以下 IAM 绑定:

  • 用于将任务加入 Cloud Tasks 队列的身份需要 cloudtasks.tasks.create IAM 权限。

    在本示例中,该身份为 App Engine 默认服务账号

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 用于将任务加入 Cloud Tasks 队列的身份需要相应权限,才能使用与 Cloud Tasks 中的任务关联的服务账号。

    在本示例中,该身份为 App Engine 默认服务账号

请参阅 Google Cloud IAM 文档,了解如何将 App Engine 默认服务账号添加为该账号本身的用户。

  • 用于触发任务队列函数的身份需要 cloudfunctions.functions.invoke 权限。

    在本示例中,该身份为 App Engine 默认服务账号

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker