As funções de fila de tarefas usam o Google Cloud Tasks para ajudar seu app a executar tarefas demoradas, que consomem muitos recursos ou limitadas por largura de banda de maneira assíncrona e fora do fluxo principal do aplicativo.
Por exemplo, imagine que você quer criar backups de um grande conjunto de arquivos de imagem hospedados atualmente em uma API com um limite de taxa. Para fazer o consumo responsável dessa API, precisamos respeitar esses limites. Além disso, esse tipo de job de longa duração pode ser vulnerável a falhas devido a tempos limite e limites de memória.
Para atenuar essa complexidade, grave uma função de fila de tarefas que defina opções
básicas de tarefa, como scheduleTime
e dispatchDeadline
e a distribua
para uma fila no Cloud Tasks. O ambiente do Cloud Tasks
foi projetado especificamente para garantir um controle de congestionamento e
políticas de repetição eficientes para esses tipos de operações.
O SDK do Firebase para o Cloud Functions para Firebase v3.20.1 e versões posteriores interopera com o SDK Admin do Firebase v10.2.0 e versões mais recentes para oferecer suporte a funções da fila de tarefas.
O uso de funções da fila de tarefas com o Firebase pode resultar em cobranças pelo processamento do Cloud Tasks. Consulte Preços do Cloud Tasks para mais informações.
Como criar funções da fila de tarefas
Para usar as funções da fila de tarefas, siga este fluxo de trabalho:
- Grave uma função da fila de tarefas usando o SDK do Firebase para Cloud Functions.
- Teste suas funções localmente usando o Pacote de emuladores locais do Firebase.
- Implante a função com a CLI do Firebase. Ao implantar a função de fila de tarefas pela primeira vez, a CLI vai criar uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas no código-fonte.
- Adicione tarefas à fila de tarefas recém-criada, transmitindo parâmetros para configurar uma programação de execução. Para isso, escreva o código usando o SDK Admin e implante-o no Cloud Functions para Firebase.
Como criar funções da fila de tarefas
Os exemplos de código desta seção são baseados em um app que configura um serviço de backup de todas as imagens do site de fotos astronômicas diárias (em inglês) da NASA: Para começar, importe os módulos necessários:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
const HttpsError = functions.https.HttpsError;
Python (pré-lançamento)
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Use onTaskDispatched
ou on_task_dispatched
para funções da fila de tarefas. Ao escrever uma função da fila de tarefas, você pode definir uma configuração de limitação de taxa e nova tentativa por fila.
Configuração da fila de tarefas
As funções da fila de tarefas têm um conjunto avançado de configurações para controlar com precisão os limites de taxa e o comportamento de repetição de uma fila de tarefas:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python (pré-lançamento)
@tasks_fn.on_task_dispatched(
retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: cada tarefa na fila é repetida automaticamente até cinco vezes. Isso nos ajuda a reduzir falhas transitórias, como erros de rede, ou a interrupção temporária de um serviço externo e dependente.retryConfig.minBackoffSeconds=60
: cada tarefa é executada novamente após um intervalo mínimo de 60 segundos. Com isso, temos um grande buffer entre cada tentativa, e não há pressa para realizar as cinco tentativas muito rapidamente.rateLimits.maxConcurrentDispatch=6
: são enviadas, no máximo, seis tarefas em um determinado momento. Isso ajuda a garantir um fluxo estável de solicitações para a função, além de reduzir o número de instâncias ativas e inicializações a frio.
Como testar funções da fila de tarefas usando o Pacote de emuladores locais do Firebase
As funções da fila de tarefas no Pacote de emuladores locais do Firebase são expostas como funções HTTP simples. Para testar uma função de tarefa emulada, envie uma solicitação HTTP POST com um payload de dados json:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Como implantar a função da fila de tarefas
Implante a função da fila de tarefas usando a CLI do Firebase:
$ firebase deploy --only functions:backupapod
Ao implantar a função de fila de tarefas pela primeira vez, a CLI cria uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas no código-fonte.
Se você encontrar erros de permissão ao implantar funções, verifique se os papéis do IAM apropriados estão atribuídos ao usuário que executa os comandos de implantação.
Enfileirar a função
As funções da fila de tarefas podem ser enfileiradas no Cloud Tasks a partir de um ambiente de servidor confiável, como o Cloud Functions para Firebase, usando o SDK Admin do Firebase para Node.js ou as bibliotecas do Google Cloud para Python. Se você não tiver experiência com os SDKs Admin, consulte Adicionar o Firebase a um servidor para começar.
Um fluxo típico cria uma nova tarefa, a enfileira no Cloud Tasks e define a configuração dela:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python (pré-lançamento)
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
tasks_client = tasks_v2.CloudTasksClient()
task_queue = tasks_client.queue_path(
params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1, "backupapod"
)
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task = tasks_v2.Task(
http_request={
"http_method": tasks_v2.HttpMethod.POST,
"url": target_uri,
"headers": {"Content-type": "application/json"},
"body": json.dumps(body).encode(),
},
schedule_time=schedule_time,
)
tasks_client.create_task(parent=task_queue, task=task)
return https_fn.Response(
status=200, response=f"Enqueued {BACKUP_COUNT} tasks"
)
O exemplo de código tenta distribuir a execução das tarefas associando um atraso de N minutos à Nª tarefa. Isso se traduz no acionamento de aproximadamente 1 tarefa/minuto. Observe que você também pode usar
scheduleTime
(Node.js) ouschedule_time
(Python) se quiser que o Cloud Tasks acione uma tarefa em um horário específico.O código de amostra define o tempo máximo que o Cloud Tasks aguardará a conclusão de uma tarefa. O Cloud Tasks vai tentar realizar a tarefa novamente após a configuração da nova tentativa da fila ou até que esse prazo seja atingido. No exemplo, a fila é configurada para tentar realizar a tarefa novamente até cinco vezes, mas a tarefa é cancelada automaticamente se todo o processo (incluindo novas tentativas) levar mais de cinco minutos.
Recuperar e incluir o URI de destino
O Cloud Tasks cria um token de autenticação para autenticar solicitações para a função da fila de tarefas. Sendo assim, você precisa especificar o URL do Cloud Run da função ao enfileirar uma tarefa. Recomendamos que você recupere de maneira programática o URL da função, conforme demonstrado abaixo:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python (pré-lançamento)
def get_function_url(
name: str, location: str = SupportedRegion.US_CENTRAL1
) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = AuthorizedSession(credentials)
url = (
"https://cloudfunctions.googleapis.com/v2beta/"
+ f"projects/{project_id}/locations/{location}/functions/{name}"
)
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Solução de problemas
Ativar a geração de registros do Cloud Tasks
Os registros do Cloud Tasks têm informações de diagnóstico úteis, como o status da solicitação associada a uma tarefa. Por padrão, os registros do Cloud Tasks são desativados devido ao grande volume de registros que podem ser gerados no projeto. Recomendamos ativar os registros de depuração enquanto você desenvolve e depura ativamente as funções da fila de tarefas. Consulte Como ativar a geração de registros.
Permissões do IAM
É possível ver erros PERMISSION DENIED
ao enfileirar tarefas ou quando o Cloud Tasks
tenta invocar as funções da fila de tarefas. Verifique se o projeto tem as
seguintes vinculações do IAM:
A identidade usada para enfileirar tarefas para o Cloud Tasks precisa da permissão
cloudtasks.tasks.create
do IAM.No exemplo, a conta de serviço padrão do App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
A identidade usada para enfileirar tarefas para o Cloud Tasks precisa de permissão para usar a conta de serviço associada a uma tarefa no Cloud Tasks.
No exemplo, a conta de serviço padrão do App Engine.
Consulte a documentação do Google Cloud IAM para ver instruções sobre como adicionar a conta de serviço padrão do App Engine como um usuário da conta de serviço padrão do App Engine.
A identidade usada para acionar a função da fila de tarefas precisa da permissão
cloudfunctions.functions.invoke
.No exemplo, a conta de serviço padrão do App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker