Kolejkowanie funkcji za pomocą Cloud Tasks


Funkcje kolejki zadań wykorzystują technologię Google Cloud Tasks, aby ułatwić aplikacji asynchroniczne wykonywanie zadań, które zajmują czasochłonne, wymagające znacznych zasobów lub z ograniczoną przepustowością poza głównym przepływem aplikacji.

Załóżmy na przykład, że chcesz utworzyć kopie zapasowe dużego zestawu plików graficznych, które są obecnie hostowane w interfejsie API z ograniczonym limitem liczby żądań. Aby być odpowiedzialnym użytkownikiem tego interfejsu API, musisz przestrzegać jego limitów szybkości. Poza tym tego typu długotrwałe zadania mogą być podatne na błędy z powodu limitów czasu i pamięci.

Aby zmniejszyć złożoność, możesz napisać funkcję kolejki zadań, która ustawia podstawowe opcje zadań, takie jak scheduleTime i dispatchDeadline, a następnie przekazać ją do kolejki w Cloud Tasks. Środowisko Cloud Tasks zostało zaprojektowane pod kątem skutecznego kontrolowania natężenia ruchu i zapewniania zasad ponownych prób w przypadku tego rodzaju operacji.

Pakiet SDK Firebase na Cloud Functions for Firebase w wersji 3.20.1 lub nowszej współpracuje z Firebase Admin SDK w wersji 10.2.0 lub nowszej, aby obsługiwać funkcje kolejki zadań.

Korzystanie z funkcji kolejki zadań w Firebase może spowodować obciążenia za przetwarzanieCloud Tasks. Więcej informacji znajdziesz w cenniku.

Tworzenie funkcji kolejki zadań

Aby używać funkcji kolejki zadań:

  1. Napisz funkcję kolejki zadań za pomocą pakietu SDK Firebase dla Cloud Functions.
  2. Przetestuj funkcję przez aktywowanie jej za pomocą żądania HTTP.
  3. wdrożyć funkcję za pomocą interfejsu wiersza poleceń Firebase, Podczas pierwszego wdrażania funkcji kolejki zadań narzędzie wiersza poleceń utworzy kolejkę zadań w Cloud Tasks z opcjami (ograniczeniem szybkości i ponowną próbą) określonymi w kodzie źródłowym.
  4. Do nowo utworzonej kolejki zadań możesz dodawać zadania, przekazując parametry, aby w razie potrzeby skonfigurować harmonogram wykonywania. Aby to zrobić, napisz kod za pomocą funkcji Admin SDK i wdróż go w kontekście Cloud Functions for Firebase.

Pisanie funkcji kolejki zadań

Przykłady kodu w tej sekcji są oparte na aplikacji, która konfiguruje usługę tworzącą kopię zapasową wszystkich obrazów z Astronomy Picture of the Day NASA. Aby rozpocząć, zaimportuj wymagane moduły:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Użyj onTaskDispatched lub on_task_dispatched do obsługi funkcji kolejki zadań. Podczas pisania funkcji kolejki zadań możesz skonfigurować ponowne próby i ograniczenie szybkości na poziomie kolejki.

Konfigurowanie funkcji kolejki zadań

Funkcje kolejki zadań są wyposażone w zaawansowany zestaw ustawień konfiguracji, który umożliwia dokładne kontrolowanie limitów szybkości i zachowania funkcji ponownego próbowania kolejki zadań:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: każde zadanie w kolejce zadań jest automatycznie powtarzane do 5 razy. Pomaga to ograniczyć przejściowe błędy, takie jak błędy sieci lub tymczasowe zakłócenia w działaniu zależnej usługi zewnętrznej.

  • retryConfig.minBackoffSeconds=60: każde zadanie jest powtarzane co co najmniej 60 sekund. Pozwala to utworzyć duży bufor pomiędzy każdą kolejną próbą, aby nie spieszyć się do zbyt szybkiego wykorzystania 5 prób.

  • rateLimits.maxConcurrentDispatch=6: w danym momencie wysyłanych jest maksymalnie 6 zadań. Pomaga to zapewnić stały napływ żądań do funkcji podstawowej i zmniejsza liczbę aktywnych instancji oraz uruchomień „na zimno”.

Testowanie funkcji kolejki zadań

W większości przypadków najlepszym sposobem na przetestowanie funkcji kolejki zadań jest użycie emulatora Cloud Functions. W dokumentacji pakietu emulatorów dowiesz się, jak przetestować aplikację pod kątem emulacji funkcji kolejki zadań.

Dodatkowo funkcje kolejki zadań functions_sdk są udostępniane jako proste funkcje HTTP w Firebase Local Emulator Suite. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST z ładunkiem danych JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Wdrażanie funkcji kolejki zadań

Wdróż funkcję kolejki zadań za pomocą interfejsu wiersza poleceń Firebase:

$ firebase deploy --only functions:backupapod

Przy wdrażaniu funkcji kolejki zadań po raz pierwszy interfejs wiersza poleceń tworzy w Cloud Tasks kolejkę zadań z opcjami (ograniczanie liczby żądań i ponowienie prób) określonymi w kodzie źródłowym.

Jeśli podczas wdrażania funkcji wystąpią błędy uprawnień, sprawdź, czy do użytkownika, który wykonuje polecenia wdrażania, przypisane są odpowiednie role uprawnień.

Kolejkowanie funkcji kolejki zadań

Funkcje kolejki zadań można umieszczać w kolei Cloud Tasks z zaufanego środowiska serwera, takiego jak Cloud Functions for Firebase, za pomocą funkcji Firebase Admin SDK w Node.js lub bibliotek Google Cloud w Pythonie. Jeśli dopiero zaczynasz korzystać z Admin SDK, zapoznaj się z artykułem Dodawanie Firebase do serwera.

Typowy przepływ tworzy nowe zadanie, dodaje je do kolejki w zadaniu Cloud Tasks i ustawia jego konfigurację:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Przykładowy kod stara się rozłożyć wykonywanie zadań w czasie, przypisując opóźnienie N minut do N-tego zadania. Oznacza to wyzwalanie ok. 1 zadania na minutę. Jeśli chcesz, aby zadanie Cloud Tasks zostało uruchomione w określonym czasie, możesz użyć funkcji scheduleTime (Node.js) lub schedule_time (Python).

  • Przykładowy kod określa maksymalny czas, przez jaki Cloud Tasks będzie czekać na wykonanie zadania. Cloud Tasks spróbuje ponownie wykonać zadanie zgodnie z konfiguracją kolejki lub do momentu osiągnięcia tego terminu. W tym przykładzie kolejka jest skonfigurowana tak, aby próbować wykonać zadanie maksymalnie 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym próby ponownego wykonania) trwa dłużej niż 5 minut.

Pobierz i uwzględnij docelowy identyfikator URI

Ze względu na sposób, w jaki Cloud Tasks tworzy tokeny uwierzytelniania, aby uwierzytelniać żądania kierowane do funkcji kolejki zadań, musisz podać adres URL funkcji Cloud Run podczas kolejkowania zadań. Zalecamy programowe pobieranie adresu URL funkcji, jak pokazano poniżej:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Rozwiązywanie problemów

„Zadania w chmurze” > Włącz logowanie Cloud Tasks

Logi z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stan żądania powiązanego z zadaniem. Domyślnie logi z usługi Cloud Tasks są wyłączone ze względu na dużą liczbę logów, które mogą potencjalnie wygenerować w Twoim projekcie. Zalecamy włączenie dzienników debugowania podczas aktywnego tworzenia i debugowania funkcji kolejki zadań. Zobacz Włączanie logowania.

Uprawnienia

Podczas kolejkowania zadań lub wywoływania funkcji kolejki zadań przez Cloud Tasks mogą wystąpić błędy PERMISSION DENIED. Sprawdź, czy Twój projekt ma te powiązania uprawnień:

  • Tożsamość używana do umieszczania zadań w kolejce w usłudze Cloud Tasks wymaga uprawnienia cloudtasks.tasks.create.

    W tym przykładzie jest to App Engine domyślne konto usługi.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Tożsamość używana do kolejkowania zadań w Cloud Tasks musi mieć uprawnienia do korzystania z konta usługi powiązanego z zadaniem w Cloud Tasks.

    W przykładzie jest to domyślne konto usługi App Engine.

Więcej informacji o dodawaniu domyślnego konta usługi App Engine jako użytkownika domyślnego konta usługi App Engine znajdziesz w dokumentacji Google Cloud IAM.

  • Tożsamość użyta do wywołania funkcji kolejki zadań musi mieć uprawnienia cloudfunctions.functions.invoke.

    W tym przykładzie jest to App Engine domyślne konto usługi.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker