Ставьте в очередь функции с помощью Cloud Tasks


Функции очереди задач используют преимущества Google Cloud Tasks , чтобы помочь вашему приложению выполнять трудоемкие, ресурсоемкие или ограниченные по пропускной способности задачи асинхронно, вне основного потока приложения.

Например, представьте, что вы хотите создать резервные копии большого набора файлов изображений, которые в настоящее время размещены в API с ограничением скорости. Чтобы быть ответственным потребителем этого API, вам необходимо соблюдать их ограничения по скорости. Кроме того, такого рода длительные задания могут быть подвержены сбоям из-за тайм-аутов и ограничений памяти.

Чтобы смягчить эту сложность, вы можете написать функцию очереди задач, которая устанавливает основные параметры задачи, такие как scheduleTime и dispatchDeadline , а затем передает функцию в очередь в Cloud Tasks. Среда облачных задач разработана специально для обеспечения эффективного контроля перегрузки и политики повторных попыток для подобных операций.

Firebase SDK для облачных функций для Firebase v3.20.1 и более поздних версий взаимодействует с Firebase Admin SDK v10.2.0 и более поздних версий для поддержки функций очереди задач.

Использование функций очереди задач с Firebase может привести к взиманию платы за обработку облачных задач. Дополнительную информацию см. в разделе цены на облачные задачи .

Создание функций очереди задач

Чтобы использовать функции очереди задач, следуйте следующему рабочему процессу:

  1. Напишите функцию очереди задач, используя Firebase SDK для облачных функций.
  2. Проверьте свою функцию, запустив ее с помощью HTTP-запроса.
  3. Разверните свою функцию с помощью Firebase CLI. При первом развертывании функции очереди задач интерфейс командной строки создаст очередь задач в Cloud Tasks с параметрами (ограничение скорости и повтор), указанными в исходном коде.
  4. Добавьте задачи во вновь созданную очередь задач, передав параметры для настройки расписания выполнения, если это необходимо. Этого можно добиться, написав код с помощью Admin SDK и развернув его в Cloud Functions for Firebase.

Написание функций очереди задач

Примеры кода в этом разделе основаны на приложении, которое настраивает службу, создающую резервные копии всех изображений из программы NASA Astronomy Picture of the Day . Для начала импортируйте необходимые модули:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Питон

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Используйте onTaskDispatched или on_task_dispatched для функций очереди задач. При написании функции очереди задач вы можете установить повторную попытку для каждой очереди и конфигурацию ограничения скорости.

Настройка функций очереди задач

Функции очереди задач имеют мощный набор настроек конфигурации для точного контроля ограничений скорости и повторного поведения очереди задач:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Питон

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : каждая задача в очереди задач автоматически повторяется до 5 раз. Это помогает смягчить временные ошибки, такие как сетевые ошибки или временные сбои в работе зависимой внешней службы.

  • retryConfig.minBackoffSeconds=60 : каждая задача повторяется с интервалом не менее 60 секунд после каждой попытки. Это обеспечивает большой буфер между каждой попыткой, поэтому мы не спешим слишком быстро исчерпать 5 повторных попыток.

  • rateLimits.maxConcurrentDispatch=6 : одновременно отправляется не более 6 задач. Это помогает обеспечить постоянный поток запросов к базовой функции и помогает сократить количество активных экземпляров и холодных запусков.

Функции очереди задач тестирования

Функции очереди задач в пакете локального эмулятора Firebase представлены как простые функции HTTP. Вы можете протестировать эмулируемую функцию задачи, отправив запрос HTTP POST с полезными данными JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Развертывание функций очереди задач

Разверните функцию очереди задач с помощью Firebase CLI:

$ firebase deploy --only functions:backupapod

При первом развертывании функции очереди задач интерфейс командной строки создает очередь задач в Cloud Tasks с параметрами (ограничение скорости и повтор), указанными в исходном коде.

Если вы столкнулись с ошибками разрешений при развертывании функций, убедитесь, что соответствующие роли IAM назначены пользователю, выполняющему команды развертывания.

Поставить в очередь функции очереди задач

Функции очереди задач можно поставить в очередь в Cloud Tasks из доверенной серверной среды, такой как Cloud Functions для Firebase, с помощью Firebase Admin SDK для Node.js или библиотек Google Cloud для Python. Если вы новичок в Admin SDK, для начала ознакомьтесь со статьей «Добавление Firebase на сервер ».

Типичный поток создает новую задачу, помещает ее в очередь в Cloud Tasks и устанавливает для нее конфигурацию:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Питон

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
                                         "backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(http_request={
            "http_method": tasks_v2.HttpMethod.POST,
            "url": target_uri,
            "headers": {
                "Content-type": "application/json"
            },
            "body": json.dumps(body).encode()
        },
                             schedule_time=schedule_time)
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Пример кода пытается распределить выполнение задач, связывая задержку N-й минуты для N-й задачи. Это означает запуск ~ 1 задачи в минуту. Обратите внимание, что вы также можете использовать scheduleTime (Node.js) или schedule_time (Python), если хотите, чтобы облачные задачи запускали задачу в определенное время.

  • В примере кода задается максимальное время, в течение которого Cloud Tasks будет ожидать завершения задачи. Cloud Tasks повторит задачу в соответствии с настройкой повтора очереди или до истечения этого крайнего срока. В примере очередь настроена на повторную попытку задачи до 5 раз, но задача автоматически отменяется, если весь процесс (включая попытки повторения) занимает более 5 минут.

Получить и включить целевой URI

Из-за того, что Cloud Tasks создает токены аутентификации для аутентификации запросов к базовым функциям очереди задач, при постановке задач в очередь необходимо указать URL-адрес Cloud Run для функции. Мы рекомендуем программно получить URL-адрес вашей функции, как показано ниже:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Питон

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Поиск неисправностей

Включите ведение журнала облачных задач

Журналы облачных задач содержат полезную диагностическую информацию, например статус запроса, связанного с задачей. По умолчанию журналы Cloud Tasks отключены из-за большого объема журналов, которые они потенциально могут создать в вашем проекте. Мы рекомендуем включать журналы отладки во время активной разработки и отладки функций очереди задач. См. Включение ведения журнала .

IAM-разрешения

Вы можете увидеть ошибки PERMISSION DENIED при постановке задач в очередь или когда Cloud Tasks пытается вызвать функции очереди задач. Убедитесь, что ваш проект имеет следующие привязки IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Удостоверению, используемому для постановки задач в очередь в Cloud Tasks, требуется разрешение на использование сервисной учетной записи, связанной с задачей в Cloud Tasks.

    В примере это учетная запись службы App Engine по умолчанию .

Инструкции по добавлению учетной записи службы по умолчанию App Engine в качестве пользователя учетной записи службы по умолчанию App Engine см. в документации Google Cloud IAM .

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker