Функции постановки в очередь с помощью Cloud Tasks

Функции очереди задач используют возможности Google Cloud Tasks , чтобы помочь вашему приложению выполнять ресурсоемкие, требующие больших затрат времени или ограниченные по пропускной способности задачи асинхронно, вне основного потока работы приложения.

Например, представьте, что вы хотите создать резервные копии большого набора файлов изображений, которые в настоящее время размещены на API с ограничением скорости запросов. Чтобы быть ответственным пользователем этого API, вам необходимо соблюдать его ограничения. Кроме того, подобная длительная задача может быть уязвима для сбоев из-за таймаутов и ограничений памяти.

Чтобы уменьшить эту сложность, можно написать функцию очереди задач, которая устанавливает основные параметры задачи, такие как scheduleTime и dispatchDeadline , а затем передает эту функцию в очередь в Cloud Tasks . Среда Cloud Tasks специально разработана для обеспечения эффективного управления перегрузкой и политик повторных попыток для таких операций.

Firebase SDK для Cloud Functions for Firebase версии 3.20.1 и выше взаимодействует с Firebase Admin SDK версии 10.2.0 и выше для поддержки функций очереди задач.

Использование функций очереди задач с Firebase может привести к взиманию платы за обработку Cloud Tasks . Дополнительную информацию см. в разделе «Цены Cloud Tasks .

Создание функций очереди задач

Для использования функций очереди задач следуйте этому алгоритму:

  1. Напишите функцию очереди задач, используя Firebase SDK для Cloud Functions .
  2. Протестируйте свою функцию, запустив её с помощью HTTP-запроса.
  3. Разверните свою функцию с помощью Firebase CLI. При первом развертывании функции очереди задач CLI создаст очередь задач в Cloud Tasks с параметрами (ограничение скорости и повторные попытки), указанными в вашем исходном коде.
  4. Добавьте задачи в созданную очередь задач, передавая параметры для настройки расписания выполнения, если это необходимо. Этого можно добиться, написав код с использованием Admin SDK и развернув его в Cloud Functions for Firebase .

Напишите функции для управления очередью задач.

Примеры кода в этом разделе основаны на приложении, которое настраивает службу резервного копирования всех изображений из проекта NASA «Астрономическая фотография дня» . Для начала импортируйте необходимые модули:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Для функций управления очередью задач используйте onTaskDispatched или on_task_dispatched . При написании функции управления очередью задач можно задать параметры повторных попыток для каждой очереди и ограничение скорости запросов.

Настройка функций очереди задач

Функции очереди задач обладают мощным набором параметров конфигурации для точного управления ограничениями скорости и поведением повторных попыток очереди задач:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : Каждая задача в очереди задач автоматически повторяется до 5 раз. Это помогает снизить вероятность временных ошибок, таких как сетевые ошибки или временное нарушение работы зависимой внешней службы.

  • retryConfig.minBackoffSeconds=60 : Каждая задача повторяется с интервалом не менее 60 секунд. Это обеспечивает большой буфер между попытками, чтобы мы не спешили исчерпать все 5 попыток повтора слишком быстро.

  • rateLimits.maxConcurrentDispatch=6 : Одновременно может быть запущено не более 6 задач. Это помогает обеспечить стабильный поток запросов к базовой функции и сократить количество активных экземпляров и холодных запусков.

Функции очереди тестовых задач

В большинстве случаев эмулятор Cloud Functions — лучший способ протестировать функции очереди задач. См. документацию по Emulator Suite, чтобы узнать, как настроить ваше приложение для эмуляции функций очереди задач .

Кроме того, функции очереди задач доступны в виде простых HTTP-функций в Firebase Local Emulator Suite . Вы можете протестировать эмулируемую функцию задачи, отправив HTTP POST-запрос с данными в формате JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Развернуть функции очереди задач

Разверните функцию очереди задач с помощью Firebase CLI:

$ firebase deploy --only functions:backupapod

При первом развертывании функции очереди задач CLI создает очередь задач в Cloud Tasks с параметрами (ограничение скорости и количество повторных попыток), указанными в исходном коде.

Если при развертывании функций возникают ошибки доступа, убедитесь, что пользователю, выполняющему команды развертывания, назначены соответствующие роли IAM .

Функции очереди задач

Функции очереди задач можно добавлять в очередь в Cloud Tasks из доверенной серверной среды, такой как Cloud Functions for Firebase используя Firebase Admin SDK для Node.js или библиотеки Google Cloud для Python. Если вы новичок в Admin SDK , см. раздел «Добавление Firebase на сервер» , чтобы начать работу.

Типичный рабочий процесс создает новую задачу, добавляет ее в очередь в Cloud Tasks и задает конфигурацию для задачи:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • В приведенном примере кода предпринимается попытка распределить выполнение задач, устанавливая задержку в N минут для N-й задачи. Это означает запуск примерно 1 задачи в минуту. Обратите внимание, что вы также можете использовать scheduleTime (Node.js) или schedule_time (Python), если хотите, чтобы Cloud Tasks запускал задачу в определенное время.

  • В приведенном примере кода задается максимальное время, в течение которого Cloud Tasks будет ожидать завершения задачи. Cloud Tasks будет повторять выполнение задачи в соответствии с настройками очереди повторных попыток или до истечения этого срока. В примере очередь настроена на повторную попытку выполнения задачи до 5 раз, но задача автоматически отменяется, если весь процесс (включая попытки повторной попытки) занимает более 5 минут.

Поиск неисправностей

Включите ведение журнала Cloud Tasks .

Журналы Cloud Tasks содержат полезную диагностическую информацию, например, статус запроса, связанного с задачей. По умолчанию ведение журналов Cloud Tasks отключено из-за большого объема данных, которые оно может генерировать в вашем проекте. Мы рекомендуем включить отладочные журналы во время активной разработки и отладки функций очереди задач. См. раздел «Включение ведения журналов» .

Разрешения IAM

При добавлении задач в очередь или при попытке Cloud Tasks вызвать функции вашей очереди задач могут появляться ошибки PERMISSION DENIED . Убедитесь, что ваш проект имеет следующие привязки IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Для добавления задач в очередь в Cloud Tasks пользователю, использующему учетную запись службы, связанную с задачей в Cloud Tasks необходимы права доступа.

    В приведенном примере это учетная запись службы App Engine по умолчанию .

Инструкции по добавлению учетной записи службы App Engine App Engine умолчанию в качестве пользователя этой учетной записи см. в документации Google Cloud IAM.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker