Mengantrekan fungsi dengan Cloud Tasks


Fungsi task queue memanfaatkan Google Cloud Tasks untuk membantu aplikasi Anda menjalankan tugas yang memakan waktu, membutuhkan banyak resource, atau dibatasi bandwidth secara asinkron, di luar alur aplikasi utama Anda.

Misalnya, Anda ingin membuat cadangan untuk banyak kumpulan file gambar yang saat ini dihosting di API yang menerapkan pembatasan kapasitas. Untuk menjadi konsumen API yang bertanggung jawab, Anda perlu mematuhi batas kapasitasnya. Selain itu, pekerjaan jangka panjang semacam ini rentan mengalami kegagalan karena waktu tunggu habis dan batas memori.

Untuk mengurangi kompleksitas ini, Anda dapat menulis fungsi task queue yang menetapkan opsi tugas dasar seperti scheduleTime, dan dispatchDeadline, lalu melimpahkan fungsi tersebut ke antrean di Cloud Tasks. Lingkungan Cloud Tasks dirancang khusus untuk memastikan kontrol kemacetan dan kebijakan percobaan ulang yang efektif untuk jenis operasi ini.

Firebase SDK untuk Cloud Functions for Firebase v3.20.1 dan yang lebih tinggi memiliki interoperabilitas dengan Firebase Admin SDK v10.2.0 dan yang lebih tinggi untuk mendukung fungsi task queue.

Penggunaan fungsi task queue dengan Firebase dapat menyebabkan tagihan untuk pemrosesan Cloud Tasks. Baca artikel Harga Cloud Tasks untuk mengetahui informasi selengkapnya.

Membuat fungsi task queue

Untuk menggunakan fungsi task queue, ikuti alur kerja ini:

  1. Tulis fungsi task queue menggunakan Firebase SDK untuk Cloud Functions.
  2. Uji fungsi Anda menggunakan Firebase Local Emulator Suite.
  3. Deploy fungsi Anda dengan Firebase CLI. Saat men-deploy fungsi task queue untuk pertama kalinya, CLI akan membuat task queue di Cloud Tasks dengan opsi (pembatasan kapasitas dan percobaan ulang) yang ditentukan dalam kode sumber Anda.
  4. Tambahkan tugas ke task queue yang baru dibuat, dengan meneruskan parameter untuk menyiapkan jadwal eksekusi jika diperlukan. Anda dapat mencapainya dengan menulis kode menggunakan Admin SDK dan men-deploy-nya ke Cloud Functions for Firebase.

Menulis fungsi task queue

Contoh kode di bagian ini didasarkan pada aplikasi yang menyiapkan layanan yang mencadangkan semua gambar dari Astronomy Picture of the Day milik NASA: Untuk memulai, impor modul yang diperlukan:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
const HttpsError = functions.https.HttpsError;

Python (pratinjau)

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Gunakan onTaskDispatched atau on_task_dispatched untuk fungsi task queue. Saat menulis fungsi task queue, Anda dapat menetapkan percobaan ulang per antrean dan konfigurasi pembatasan kapasitas.

Konfigurasi task queue

Fungsi task queue dilengkapi dengan kumpulan setelan konfigurasi yang canggih untuk mengontrol secara akurat batas kapasitas dan perilaku percobaan ulang task queue:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python (pratinjau)

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Setiap tugas dalam task queue secara otomatis dicoba ulang hingga 5 kali. Hal ini membantu mengurangi error sementara seperti error jaringan atau gangguan layanan sementara dari layanan eksternal yang dependen.

  • retryConfig.minBackoffSeconds=60: Setiap tugas dicoba ulang dengan interval antar percobaan minimal 60 detik. Hal ini memberikan buffer besar di antara setiap percobaan sehingga batas 5 kali percobaan tidak akan terlalu cepat habis.

  • rateLimits.maxConcurrentDispatch=6: Maksimal 6 tugas dikirim secara bersamaan. Hal ini membantu memastikan aliran permintaan yang stabil ke fungsi yang mendasarinya serta membantu mengurangi jumlah instance aktif dan cold start.

Menguji fungsi task queue menggunakan Firebase Local Emulator Suite

Fungsi task queue di Firebase Local Emulator Suite diekspos sebagai fungsi HTTP sederhana. Anda dapat menguji fungsi tugas yang diemulasi dengan mengirimkan permintaan HTTP POST dengan payload data json:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Men-deploy fungsi task queue

Deploy fungsi task queue menggunakan Firebase CLI:

$ firebase deploy --only functions:backupapod

Saat men-deploy fungsi task queue untuk pertama kalinya, CLI akan membuat task queue di Cloud Tasks dengan opsi (pembatasan kapasitas dan percobaan ulang) yang ditentukan dalam kode sumber Anda.

Jika Anda mengalami error izin saat men-deploy fungsi, pastikan peran IAM yang sesuai ditetapkan kepada pengguna yang menjalankan perintah deployment.

Mengantrekan fungsi

Fungsi task queue dapat ditambahkan ke dalam antrean di Cloud Tasks dari lingkungan server tepercaya seperti Cloud Functions for Firebase menggunakan Firebase Admin SDK untuk Node.js atau library Google Cloud untuk Python. Jika Anda baru pertama kali menggunakan Admin SDK, baca artikel Menambahkan Firebase ke server untuk memulai.

Flow standar akan membuat tugas baru, menambahkan tugas ke dalam antrean di Cloud Tasks, dan menetapkan konfigurasi untuk tugas tersebut:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python (pratinjau)

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(
        params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1, "backupapod"
    )
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(
            http_request={
                "http_method": tasks_v2.HttpMethod.POST,
                "url": target_uri,
                "headers": {"Content-type": "application/json"},
                "body": json.dumps(body).encode(),
            },
            schedule_time=schedule_time,
        )
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(
        status=200, response=f"Enqueued {BACKUP_COUNT} tasks"
    )


  • Kode contoh akan mencoba menyebarkan eksekusi tugas dengan mengaitkan penundaan menit ke-N untuk tugas ke-N. Hal ini akan memicu ~ 1 tugas/menit. Perlu diperhatikan bahwa Anda juga dapat menggunakan scheduleTime (Node.js) atau schedule_time (Python) jika ingin Cloud Tasks memicu tugas pada waktu tertentu.

  • Kode contoh akan menetapkan jumlah waktu maksimum Cloud Tasks akan menunggu hingga tugas selesai. Cloud Tasks akan mencoba kembali tugas ini setelah konfigurasi percobaan ulang antrean atau sampai batas waktu ini tercapai. Dalam contoh, antrean dikonfigurasi untuk mencoba ulang tugas hingga 5 kali, tetapi tugas tersebut akan otomatis dibatalkan jika seluruh proses (termasuk upaya percobaan ulang) memerlukan waktu lebih dari 5 menit.

Mengambil dan menyertakan URI target

Terkait cara Cloud Tasks membuat token autentikasi untuk melakukan autentikasi permintaan pada fungsi task queue dasar, Anda harus menentukan URL Cloud Run dari fungsi tersebut saat menambahkan tugas ke dalam antrean. Sebaiknya Anda mengambil URL secara terprogram untuk fungsi Anda seperti yang ditunjukkan di bawah ini:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python (pratinjau)

def get_function_url(
    name: str, location: str = SupportedRegion.US_CENTRAL1
) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    authed_session = AuthorizedSession(credentials)
    url = (
        "https://cloudfunctions.googleapis.com/v2beta/"
        + f"projects/{project_id}/locations/{location}/functions/{name}"
    )
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url


Pemecahan masalah

Mengaktifkan logging Cloud Tasks

Log dari Cloud Tasks berisi informasi diagnostik yang berguna, seperti status permintaan yang terkait dengan tugas. Secara default, log dari Cloud Tasks dinonaktifkan karena banyaknya log yang dapat dihasilkan di project Anda. Sebaiknya aktifkan log debug saat Anda aktif mengembangkan dan men-debug fungsi task queue Anda. Baca artikel Mengaktifkan logging.

Izin IAM

Anda mungkin melihat error PERMISSION DENIED saat mengantrekan tugas atau saat Cloud Tasks mencoba memanggil fungsi task queue Anda. Pastikan project Anda memiliki binding IAM berikut:

  • Identitas yang digunakan untuk mengantrekan tugas ke Cloud Tasks memerlukan izin IAM cloudtasks.tasks.create.

    Dalam contoh, hal ini adalah akun layanan default App Engine

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Identitas yang digunakan untuk mengantrekan tugas ke Cloud Tasks memerlukan izin untuk menggunakan akun layanan yang terkait dengan tugas di Cloud Tasks.

    Dalam contoh, hal ini adalah akun layanan default App Engine.

Baca dokumentasi Google Cloud IAM untuk mengetahui petunjuk mengenai cara menambahkan akun layanan default App Engine sebagai pengguna akun layanan default App Engine.

  • Identitas yang digunakan untuk memicu fungsi task queue memerlukan izin cloudfunctions.functions.invoke.

    Dalam contoh, hal ini adalah akun layanan default App Engine

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker