Kolejkowanie funkcji za pomocą Cloud Tasks

.


Funkcje kolejki zadań wykorzystują możliwości Google Cloud Tasks aby pomóc w działaniu aplikacji, które zajmują dużo czasu bądź inne zasoby lub usługi o ograniczonej przepustowości. asynchronicznie poza głównym przepływem aplikacji.

Załóżmy na przykład, że chcesz utworzyć kopie zapasowe dużego zestawu obrazów pliki, które są obecnie hostowane w interfejsie API z limitem liczby żądań. Jeśli chcesz być odpowiedzialnym konsumentem tego interfejsu API, należy przestrzegać związanych z nim limitów liczby żądań. Dodatkowo tego rodzaju długotrwałe zadanie może być podatne na awarie z powodu przekroczenia limitów czasu i i ograniczenia pamięci.

Aby ograniczyć tę złożoność, możesz napisać funkcję kolejki zadań, która ustawi podstawowe opcje zadania, takie jak scheduleTime i dispatchDeadline, a następnie przekaż do kolejki w zadaniu Cloud Tasks. Cloud Tasks zaprojektowano tak, aby skutecznie kontrolować zatory zasad ponawiania w przypadku tego typu operacji.

Pakiet SDK Firebase dla Cloud Functions for Firebase w wersji 3.20.1 lub nowszej jest zgodny z Firebase Admin SDK w wersji 10.2.0 lub nowszej, aby obsługiwać funkcje kolejki zadań.

Korzystanie z funkcji kolejki zadań w Firebase może spowodować naliczenie opłat za Przetwarzam: Cloud Tasks. Zobacz Cennik Cloud Tasks .

Tworzenie funkcji kolejki zadań

Aby użyć funkcji kolejki zadań, wykonaj ten przepływ pracy:

  1. Napisz funkcję kolejki zadań za pomocą pakietu SDK Firebase dla Cloud Functions.
  2. Przetestuj funkcję przez aktywowanie jej za pomocą żądania HTTP.
  3. wdrożyć funkcję za pomocą interfejsu wiersza poleceń Firebase, Podczas wdrażania zadania kolejki po raz pierwszy, interfejs wiersza poleceń utworzy kolejka zadań w: Cloud Tasks z opcjami (ograniczenie liczby żądań i ponowienie próby) określonymi w kodzie źródłowym.
  4. Do nowo utworzonej kolejki zadań możesz dodawać zadania i przekazywać parametry, aby je skonfigurować w razie potrzeby harmonogram wykonywania. Aby to zrobić, napisz kod przy użyciu interfejsu Admin SDK i wdrażam go w Cloud Functions for Firebase.

Zapisywanie funkcji kolejki zadań

Przykładowe fragmenty kodu w tej sekcji są oparte na aplikacji, która ustawia tworzy usługę tworzenia kopii zapasowych wszystkich zdjęć z NASA Zdjęcie dnia poświęcone astronomii. Aby rozpocząć, zaimportuj wymagane moduły:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Użyj formatu onTaskDispatched lub on_task_dispatched na potrzeby funkcji kolejek zadań. Podczas zapisywania funkcji kolejki zadań możesz ustawić ponawianie prób w kolejce i ograniczanie liczby żądań.

Konfigurowanie funkcji kolejki zadań

Funkcje kolejki zadań mają rozbudowany zestaw ustawień konfiguracji precyzyjnego kontrolowania limitów liczby żądań i ponownego działania kolejki zadań:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: każde zadanie w kolejce zadań jest automatycznie ponawiać próby maksymalnie 5 razy. Pomaga to ograniczyć błędy przejściowe, takie jak sieć lub tymczasowe zakłócenia w działaniu zależnej usługi zewnętrznej.

  • retryConfig.minBackoffSeconds=60: każde zadanie jest powtarzane przez co najmniej 60 sekund niezależnie od każdej próby. Powoduje to duży bufor pomiędzy każdą próbą aby nie zużywać ich zbyt szybko.

  • rateLimits.maxConcurrentDispatch=6: maksymalnie 6 zadań jest wysyłanych w jednym miejscu danego czasu. Pomaga to zapewnić stały napływ żądań do źródła i pomaga zmniejszyć liczbę aktywnych instancji oraz uruchomień „na zimno”.

Testowanie funkcji kolejki zadań

W większości przypadków najlepszym sposobem testowania zadania jest emulator Cloud Functions funkcji kolejkowania. Zapoznaj się z dokumentacją pakietu emulatorów, by dowiedzieć się, jak: dostosować aplikację do emulacji funkcji kolejki zadań.

Dodatkowo funkcje kolejki zadań (function_sdk) są udostępniane jako proste Funkcje HTTP w interfejsie Firebase Local Emulator Suite. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST żądanie z ładunkiem danych JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Wdrażanie funkcji kolejki zadań

Wdróż funkcję kolejki zadań za pomocą interfejsu wiersza poleceń Firebase:

$ firebase deploy --only functions:backupapod

Przy wdrażaniu funkcji kolejki zadań po raz pierwszy interfejs wiersza poleceń tworzy kolejka zadań w Cloud Tasks z opcjami (ograniczanie liczby żądań i ponowienie próby) podany w kodzie źródłowym.

Jeśli podczas wdrażania funkcji wystąpią błędy uprawnień, upewnij się, że odpowiednie role uprawnień są przypisane do użytkownika wykonującego polecenia wdrożeniowe.

Kolejkowanie funkcji kolejki zadań

Funkcje kolejki zadań można dodać do kolejki w usłudze Cloud Tasks z zaufanego środowiska serwera, takiego jak Cloud Functions for Firebase, używając interfejsu Firebase Admin SDK dla Node.js lub Google Cloud dla Pythona. Jeśli jeszcze nie znasz Admin SDK, zobacz Aby rozpocząć, dodaj Firebase do serwera.

Typowy przepływ tworzy nowe zadanie, dodaje je do kolejki Cloud Tasks i ustawia konfigurację zadania:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Przykładowy kod próbuje rozłożyć wykonanie zadania przez powiązanie opóźnienia n-tego minuty z n-tym zadaniem. Ten przekłada się na wyzwalanie ~ 1 zadanie/min. Pamiętaj, że możesz też użyć scheduleTime (Node.js) lub schedule_time (Python), jeśli chcesz Cloud Tasks, aby aktywować zadanie w określonym czasie.

  • Przykładowy kod ustawia maksymalny czas Cloud Tasks zaczeka na wykonanie danego zadania. Cloud Tasks spróbuje ponownie zadanie po ponownej próbie konfiguracji kolejki lub do upłynięcia tego terminu. W próbce kolejka jest skonfigurowana tak, aby ponawiać zadanie maksymalnie 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym ponowna próba) trwa to dłużej niż 5 minut.

Pobieranie i uwzględnianie docelowego identyfikatora URI

Ze względu na sposób, w jaki Cloud Tasks tworzy tokeny uwierzytelniania do uwierzytelnienia do bazowych funkcji kolejki zadań, musisz określić Adres URL funkcji w Cloud Run podczas dodawania zadań do kolejki. Śr zalecamy programowe pobieranie adresu URL dla funkcji poniżej:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Rozwiązywanie problemów

Cloud Tasks">Włącz logowanie w usłudze Cloud Tasks

Dzienniki z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stanu żądania powiązanego z zadaniem. Domyślnie logi z Funkcja Cloud Tasks została wyłączona ze względu na dużą liczbę logów, które może przesłać co możesz potencjalnie wygenerować w swoim projekcie. Zalecamy włączenie dzienników debugowania podczas aktywnego rozwijania i debugowania funkcji kolejki zadań. Zobacz Włączam .

Uprawnienia

Podczas dodawania zadań do kolejki lub podczas dodawania do kolejki mogą wystąpić PERMISSION DENIED błędy Cloud Tasks próbuje wywołać funkcje kolejki zadań. Upewnij się, że atrybuty projekt ma te powiązania uprawnień:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Tożsamość używana do kolejkowania zadań w usłudze Cloud Tasks wymaga uprawnień aby używać konta usługi powiązanego z zadaniem w Cloud Tasks.

    W przykładzie jest to domyślne konto usługi App Engine.

Zobacz dokumentację Google Cloud IAM. instrukcje dodawania domyślnego konta usługi App Engine jako użytkownik domyślnego konta usługi App Engine.

  • Tożsamość używana do aktywowania funkcji kolejki zadań wymaga Uprawnienie cloudfunctions.functions.invoke.

    W przykładzie jest to domyślne konto usługi App Engine

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker