Dodawanie funkcji do kolejki za pomocą Cloud Tasks


Funkcje kolejki zadań korzystają z zadań w chmurze Google, aby ułatwić aplikacji asynchroniczne wykonywanie zadań, które zajmują czasochłonne, wymagające znacznych zasobów lub z ograniczoną przepustowością, poza głównym przepływem aplikacji.

Załóżmy na przykład, że chcesz utworzyć kopie zapasowe dużego zestawu plików graficznych, które są obecnie hostowane w interfejsie API z limitem liczby żądań. Aby korzystać z tego interfejsu API w sposób odpowiedzialny, musisz przestrzegać związanych z nim limitów liczby żądań. Dodatkowo tego rodzaju długotrwałe zadanie może być podatne na awarie z powodu przekroczenia limitów czasu i limitów pamięci.

Aby zmniejszyć złożoność, możesz napisać funkcję kolejki zadań, która ustawia podstawowe opcje zadań, takie jak scheduleTime i dispatchDeadline, a następnie przekazać ją do kolejki w Cloud Tasks. Środowisko Cloud Tasks zostało zaprojektowane tak, aby zapewnić skuteczną kontrolę nad zatorami oraz konieczność ponawiania zasad w przypadku tego typu operacji.

Pakiet Firebase SDK dla Cloud Functions dla Firebase w wersji 3.20.1 lub nowszej współpracuje z pakietem Firebase Admin SDK w wersji 10.2.0 lub nowszej, aby obsługiwać funkcje kolejki zadań.

Korzystanie z funkcji kolejki zadań w Firebase może spowodować naliczenie opłat za przetwarzanie w Cloud Tasks. Więcej informacji znajdziesz w cenniku Cloud Tasks.

Tworzenie funkcji kolejki zadań

Aby użyć funkcji kolejki zadań, wykonaj ten przepływ pracy:

  1. Napisz funkcję kolejki zadań za pomocą pakietu SDK Firebase dla Cloud Functions.
  2. Przetestuj funkcję przez aktywowanie jej za pomocą żądania HTTP.
  3. wdrożyć funkcję za pomocą interfejsu wiersza poleceń Firebase, Przy pierwszym wdrożeniu funkcji kolejki zadań interfejs wiersza poleceń utworzy w Cloud Tasks kolejkę zadań z opcjami (ograniczenie liczby żądań i ponowienie próby) określonymi w kodzie źródłowym.
  4. Do nowo utworzonej kolejki zadań możesz dodawać zadania, przekazując parametry, aby w razie potrzeby skonfigurować harmonogram wykonywania. Aby to zrobić, napisz kod za pomocą pakietu Admin SDK i wdróż go w Cloud Functions dla Firebase.

Zapisywanie funkcji kolejki zadań

Przykładowe fragmenty kodu w tej sekcji opierają się na aplikacji konfigurującej usługę tworzącą kopie zapasowe wszystkich zdjęć ze zdjęcia dnia NASA Astronomy Picture of the Day. Aby rozpocząć, zaimportuj wymagane moduły:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Aby korzystać z funkcji kolejki zadań, użyj onTaskDispatched lub on_task_dispatched. Przy pisaniu funkcji kolejki zadań możesz ustawić ponawianie prób w kolejce i konfigurację ograniczania liczby żądań.

Konfigurowanie funkcji kolejki zadań

Funkcje kolejki zadań mają rozbudowany zestaw ustawień, które pozwalają precyzyjnie kontrolować limity liczby żądań i ponawiać próby działania kolejki zadań:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: każde zadanie w kolejce zadań jest automatycznie powtarzane maksymalnie 5 razy. Pomaga to ograniczyć przejściowe błędy, takie jak błędy sieci lub tymczasowe zakłócenia w działaniu zależnej usługi zewnętrznej.

  • retryConfig.minBackoffSeconds=60: każde zadanie jest powtarzane co najmniej 60 sekund po każdej próbie. Pozwala to utworzyć duży bufor pomiędzy każdą kolejną próbą, aby nie spieszyć się do zbyt szybkiego wykorzystania 5 prób.

  • rateLimits.maxConcurrentDispatch=6: wysyłanych jest maksymalnie 6 zadań naraz. Pomaga to zapewnić stały strumień żądań do funkcji bazowej oraz zmniejszyć liczbę aktywnych instancji i uruchomień „na zimno”.

Testowanie funkcji kolejki zadań

Funkcje kolejki zadań w Pakiecie emulatorów lokalnych Firebase są dostępne jako proste funkcje HTTP. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST z ładunkiem danych JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Wdrażanie funkcji kolejki zadań

Wdróż funkcję kolejki zadań za pomocą interfejsu wiersza poleceń Firebase:

$ firebase deploy --only functions:backupapod

Przy pierwszym wdrażaniu funkcji kolejki zadań interfejs wiersza poleceń tworzy w Cloud Tasks kolejkę zadań z opcjami (ograniczanie liczby żądań i ponawianie prób) określonymi w kodzie źródłowym.

Jeśli podczas wdrażania funkcji wystąpią błędy uprawnień, sprawdź, czy do użytkownika, który wykonuje polecenia wdrażania, przypisane są odpowiednie role uprawnień.

Kolejkowanie funkcji kolejki zadań

Funkcje kolejki zadań w Cloud Tasks można dodawać do kolejki w zaufanym środowisku serwera, takim jak Cloud Functions dla Firebase, za pomocą pakietu Firebase Admin SDK dla Node.js lub bibliotek Google Cloud dla Pythona. Jeśli nie znasz jeszcze pakietów Admin SDK, przeczytaj artykuł Dodawanie Firebase do serwera.

Typowy przepływ tworzy nowe zadanie, dodaje je do kolejki w Cloud Tasks i ustawia jego konfigurację:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Przykładowy kod próbuje rozłożyć wykonywanie zadań przez powiązanie z n-tym zadaniem opóźnienie wynoszące n-te minuty. Oznacza to wyzwalanie ok. 1 zadania na minutę. Możesz też użyć polecenia scheduleTime (Node.js) lub schedule_time (Python), jeśli chcesz, aby usługa Cloud Tasks wyzwalała zadanie w określonym czasie.

  • Przykładowy kod określa maksymalny czas oczekiwania Cloud Tasks na wykonanie zadania. Cloud Tasks spróbuje ponownie wykonać zadanie po ponownej konfiguracji kolejki lub do upłynięcia tego terminu. W przykładzie kolejka jest skonfigurowana tak, aby ponawiać zadanie nawet 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym ponowna próba) trwa dłużej niż 5 minut.

Pobieranie i uwzględnianie docelowego identyfikatora URI

Ze względu na sposób, w jaki Cloud Tasks tworzy tokeny uwierzytelniania w celu uwierzytelniania żądań do bazowych funkcji kolejki zadań, podczas dodawania zadań do kolejki musisz podać adres URL funkcji Cloud Run. Zalecamy programowe pobieranie adresu URL funkcji, jak pokazano poniżej:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Rozwiązywanie problemów

Włącz logowanie w Cloud Tasks

Logi z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stan żądania powiązanego z zadaniem. Domyślnie logi z Cloud Tasks są wyłączone ze względu na dużą liczbę logów, które może wygenerować w projekcie. Zalecamy włączenie logów debugowania na czas aktywnego rozwijania i debugowania funkcji kolejki zadań. Zobacz Włączanie rejestrowania.

Uprawnienia

Podczas dodawania zadań do kolejki lub próby wywołania funkcji kolejki zadań Cloud Tasks może wystąpić błąd PERMISSION DENIED. Sprawdź, czy Twój projekt ma te powiązania uprawnień:

  • Tożsamość używana do umieszczania zadań w kolejce w Cloud Tasks wymaga uprawnienia cloudtasks.tasks.create.

    W przykładzie jest to domyślne konto usługi App Engine.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Tożsamość używana do umieszczania zadań w kolejce w Cloud Tasks wymaga uprawnień do korzystania z konta usługi powiązanego z zadaniem w Cloud Tasks.

    W przykładzie jest to domyślne konto usługi App Engine.

Instrukcje dodawania domyślnego konta usługi App Engine jako użytkownika domyślnego konta usługi App Engine znajdziesz w dokumentacji Google Cloud IAM.

  • Tożsamość używana do aktywowania funkcji kolejki zadań wymaga uprawnienia cloudfunctions.functions.invoke.

    W przykładzie jest to domyślne konto usługi App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker