Funkcje kolejki zadań korzystają z Google Cloud Tasks , aby pomóc Twojej aplikacji asynchronicznie wykonywać czasochłonne, wymagające dużej ilości zasobów lub ograniczonej przepustowości zadania, poza głównym przepływem aplikacji.
Załóżmy na przykład, że chcesz utworzyć kopie zapasowe dużego zestawu plików obrazów, które są obecnie hostowane w interfejsie API z ograniczeniem szybkości. Aby być odpowiedzialnym konsumentem tego API, musisz przestrzegać jego limitów stawek. Ponadto tego rodzaju długotrwałe zadanie może być podatne na awarie ze względu na przekroczenia limitu czasu i limity pamięci.
Aby złagodzić tę złożoność, możesz napisać funkcję kolejki zadań, która ustawia podstawowe opcje zadania, takie scheduleTime
i dispatchDeadline
, a następnie przekazuje tę funkcję do kolejki w Cloud Tasks. Środowisko Cloud Tasks zostało zaprojektowane specjalnie w celu zapewnienia skutecznej kontroli zatorów i zasad ponawiania prób dla tego rodzaju operacji.
Pakiet Firebase SDK dla Cloud Functions dla Firebase w wersji 3.20.1 i nowszych współpracuje z Firebase Admin SDK w wersji 10.2.0 i nowszych, aby obsługiwać funkcje kolejki zadań.
Korzystanie z funkcji kolejki zadań w Firebase może skutkować naliczeniem opłat za przetwarzanie zadań w chmurze. Więcej informacji znajdziesz w cenniku Cloud Tasks .
Utwórz funkcje kolejki zadań
Aby skorzystać z funkcji kolejki zadań, postępuj zgodnie z poniższą procedurą:
- Napisz funkcję kolejki zadań, korzystając z pakietu SDK Firebase dla Cloud Functions.
- Przetestuj swoją funkcję, uruchamiając ją za pomocą żądania HTTP.
- Wdróż swoją funkcję za pomocą interfejsu CLI Firebase. Podczas pierwszego wdrażania funkcji kolejki zadań interfejs CLI utworzy kolejkę zadań w Cloud Tasks z opcjami (ograniczanie szybkości i ponawianie prób) określonymi w kodzie źródłowym.
- Dodaj zadania do nowo utworzonej kolejki zadań, przekazując parametry, aby w razie potrzeby ustawić harmonogram wykonania. Można to osiągnąć, pisząc kod przy użyciu pakietu Admin SDK i wdrażając go w Cloud Functions for Firebase.
Napisz funkcje kolejki zadań
Przykłady kodu w tej sekcji opierają się na aplikacji, która konfiguruje usługę tworzenia kopii zapasowych wszystkich zdjęć z Astronomicznego Zdjęcie Dnia NASA. Aby rozpocząć, zaimportuj wymagane moduły:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Pyton
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Użyj onTaskDispatched
lub on_task_dispatched
dla funkcji kolejki zadań. Pisząc funkcję kolejki zadań, możesz ustawić liczbę ponownych prób dla każdej kolejki i konfigurację ograniczającą szybkość.
Skonfiguruj funkcje kolejki zadań
Funkcje kolejki zadań obejmują rozbudowany zestaw ustawień konfiguracyjnych umożliwiających precyzyjną kontrolę limitów szybkości i zachowania kolejki zadań ponawiania:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Pyton
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Każde zadanie w kolejce zadań jest automatycznie ponawiane maksymalnie 5 razy. Pomaga to ograniczyć błędy przejściowe, takie jak błędy sieciowe lub tymczasowe zakłócenia usług zależnej usługi zewnętrznej.retryConfig.minBackoffSeconds=60
: Każde zadanie jest ponawiane w odstępie co najmniej 60 sekund od każdej próby. Zapewnia to duży bufor pomiędzy każdą próbą, więc nie śpieszymy się z wyczerpaniem 5 ponownych prób zbyt szybko.rateLimits.maxConcurrentDispatch=6
: W danym momencie wysyłanych jest maksymalnie 6 zadań. Pomaga to zapewnić stały strumień żądań do podstawowej funkcji i pomaga zmniejszyć liczbę aktywnych instancji i zimnych startów.
Przetestuj funkcje kolejki zadań
Funkcje kolejki zadań w pakiecie Firebase Local Emulator Suite są udostępniane jako proste funkcje HTTP. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST z ładunkiem danych JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Wdróż funkcje kolejki zadań
Wdróż funkcję kolejki zadań za pomocą interfejsu CLI Firebase:
$ firebase deploy --only functions:backupapod
Podczas pierwszego wdrażania funkcji kolejki zadań interfejs CLI tworzy kolejkę zadań w Cloud Tasks z opcjami (ograniczanie szybkości i ponawianie prób) określonymi w kodzie źródłowym.
Jeśli podczas wdrażania funkcji napotkasz błędy uprawnień, upewnij się, że użytkownikowi uruchamiającemu polecenia wdrażania przypisano odpowiednie role IAM .
Kolejkuj funkcje kolejki zadań
Funkcje kolejki zadań można kolejkować w Cloud Tasks z zaufanego środowiska serwerowego, takiego jak Cloud Functions dla Firebase, przy użyciu pakietu Firebase Admin SDK dla Node.js lub bibliotek Google Cloud dla Pythona. Jeśli dopiero zaczynasz korzystać z zestawów Admin SDK, zobacz Dodawanie Firebase do serwera , aby rozpocząć.
Typowy przepływ tworzy nowe zadanie, umieszcza je w kolejce w Cloud Tasks i ustawia konfigurację zadania:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Pyton
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
tasks_client = tasks_v2.CloudTasksClient()
task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
"backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task = tasks_v2.Task(http_request={
"http_method": tasks_v2.HttpMethod.POST,
"url": target_uri,
"headers": {
"Content-type": "application/json"
},
"body": json.dumps(body).encode()
},
schedule_time=schedule_time)
tasks_client.create_task(parent=task_queue, task=task)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Przykładowy kod próbuje rozłożyć wykonanie zadań, powiązując opóźnienie N-tych minut z N-tym zadaniem. Przekłada się to na uruchomienie ~ 1 zadania/minutę. Pamiętaj, że możesz także użyć
scheduleTime
(Node.js) lubschedule_time
(Python), jeśli chcesz, aby Cloud Tasks uruchamiało zadanie o określonej godzinie.Przykładowy kod określa maksymalny czas oczekiwania Cloud Tasks na zakończenie zadania. Cloud Tasks ponawia próbę wykonania zadania po ponownej konfiguracji kolejki lub do momentu osiągnięcia tego terminu. W przykładzie kolejka jest skonfigurowana tak, aby ponawiać wykonanie zadania maksymalnie 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym ponowne próby) zajmie więcej niż 5 minut.
Pobierz i dołącz docelowy identyfikator URI
Ze względu na sposób, w jaki Cloud Tasks tworzy tokeny uwierzytelniające w celu uwierzytelniania żądań kierowanych do podstawowych funkcji kolejki zadań, podczas kolejkowania zadań należy określić adres URL funkcji Cloud Run. Zalecamy programowe pobranie adresu URL funkcji, jak pokazano poniżej:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Pyton
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Rozwiązywanie problemów
Włącz rejestrowanie zadań w chmurze
Dzienniki z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stan żądania powiązanego z zadaniem. Domyślnie logi z Cloud Tasks są wyłączone ze względu na dużą liczbę logów, które mogą potencjalnie wygenerować w Twoim projekcie. Zalecamy włączenie dzienników debugowania podczas aktywnego tworzenia i debugowania funkcji kolejki zadań. Zobacz Włączanie rejestrowania .
Uprawnienia
Podczas kolejkowania zadań lub gdy Cloud Tasks próbuje wywołać funkcje kolejki zadań, mogą pojawić się błędy PERMISSION DENIED
. Upewnij się, że Twój projekt ma następujące powiązania uprawnień:
Tożsamość używana do kolejkowania zadań w Cloud Tasks wymaga uprawnień
cloudtasks.tasks.create
.W przykładzie jest to domyślne konto usługi App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Tożsamość używana do kolejkowania zadań w Cloud Tasks wymaga uprawnień do korzystania z konta usługi powiązanego z zadaniem w Cloud Tasks.
W przykładzie jest to domyślne konto usługi App Engine .
Instrukcje dodawania domyślnego konta usługi App Engine jako użytkownika domyślnego konta usługi App Engine znajdziesz w dokumentacji Google Cloud IAM .
Tożsamość używana do wyzwalania funkcji kolejki zadań wymaga uprawnień
cloudfunctions.functions.invoke
.W przykładzie jest to domyślne konto usługi App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker