Cloud Tasks ile işlevleri sıraya alma


Görev sırası işlevleri, uygulamanızın zaman alıcı, yoğun kaynak kullanan veya bant genişliği sınırlı görevleri ana uygulama akışınızın dışında eşzamansız olarak çalıştırmasına yardımcı olmak için Google Cloud Tasks'den yararlanır.

Örneğin, şu anda hız sınırı olan bir API'de barındırılan büyük bir resim dosyası grubunun yedeklerini oluşturmak istediğinizi varsayalım. Bu API'nin sorumlu bir kullanıcısı olmak için hız sınırlamalarına uymanız gerekir. Ayrıca, bu tür uzun süreli işler zaman aşımları ve bellek sınırları nedeniyle başarısızlığa uğrayabilir.

Bu karmaşıklığı azaltmak için scheduleTime ve dispatchDeadline gibi temel görev seçeneklerini ayarlayan ve ardından işlevi Cloud Tasks'deki bir kuyruğa aktaran bir görev kuyruğu işlevi yazabilirsiniz. Cloud Tasks ortamı, bu tür işlemler için etkili tıkanıklık kontrolü ve yeniden deneme politikaları sağlamak üzere özel olarak tasarlanmıştır.

Cloud Functions for Firebase v3.20.1 ve sonraki sürümler için Firebase SDK'sı, görev sırası işlevlerini desteklemek amacıyla Firebase Admin SDK v10.2.0 ve sonraki sürümlerle birlikte çalışır.

Görev kuyruğu işlevlerini Firebase ile kullanmak, işleme ücretlerine yol açabilir.Cloud Tasks Daha fazla bilgi için Cloud Tasks fiyatlandırmasına göz atın.

Görev sırası işlevleri oluşturma

Görev sırası işlevlerini kullanmak için aşağıdaki iş akışını uygulayın:

  1. Cloud Functions için Firebase SDK'sını kullanarak bir görev sırası işlevi yazın.
  2. İşlevinizi bir HTTP isteğiyle tetikleyerek test edin.
  3. İşlevinizi Firebase CLI ile dağıtın. Görev kuyruğu işlevinizi ilk kez dağıtırken CLI, kaynak kodunuzda belirtilen seçenekleri (hız sınırlaması ve yeniden deneme) içeren bir görev kuyruğu oluşturur.Cloud Tasks
  4. Gerekirse yürütme planı oluşturmak için parametreleri iletmek üzere yeni oluşturulan görev kuyruğuna görev ekleyin. Bunu, Admin SDK'ü kullanarak kodu yazıp Cloud Functions for Firebase'e dağıtarak yapabilirsiniz.

Görev sırası işlevleri yazma

Bu bölümdeki kod örnekleri, NASA'nın Günün Astronomi Fotoğrafı'ndaki tüm resimleri yedekleyen bir hizmet oluşturan bir uygulamaya dayanır. Başlamak için gerekli modülleri içe aktarın:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Görev kuyruğu işlevleri için onTaskDispatched veya on_task_dispatched kullanın. Görev sırası işlevi yazarken sıra başına yeniden deneme ve hız sınırlama yapılandırmasını ayarlayabilirsiniz.

Görev sırası işlevlerini yapılandırma

Görev sırası işlevleri, bir görev sırasının hız sınırlarını ve yeniden deneme davranışını hassas bir şekilde kontrol etmek için güçlü bir yapılandırma ayarları grubuyla birlikte gelir:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Görev kuyruğundaki her görev otomatik olarak en fazla 5 kez yeniden denenir. Bu, ağ hataları veya bağımlı bir harici hizmetin geçici olarak kesintiye uğraması gibi geçici hataları azaltmaya yardımcı olur.

  • retryConfig.minBackoffSeconds=60: Her görev, her deneme arasında en az 60 saniye bekledikten sonra tekrar denenir. Bu sayede, 5 yeniden deneme denemesini çok hızlı bir şekilde tüketmemek için her deneme arasında büyük bir tampon sağlanır.

  • rateLimits.maxConcurrentDispatch=6: Belirli bir zamanda en fazla 6 görev gönderilir. Bu, temel işleve sabit bir istek akışı sağlamaya ve etkin örneklerin ve baştan başlatma işlemlerinin sayısını azaltmaya yardımcı olur.

Görev sırası işlevlerini test etme

Çoğu durumda, görev kuyruğu işlevlerini test etmenin en iyi yolu Cloud Functions emülatörüdür. Uygulamanızı görev sırası işlevleri emülasyonu için nasıl donatacağınızı öğrenmek üzere Emulator Suite dokümanlarını inceleyin.

Ayrıca, görev kuyruğu functions_sdk, Firebase Local Emulator Suite içinde basit HTTP işlevleri olarak gösterilir. JSON veri yükü içeren bir HTTP POST isteği göndererek taklit edilmiş bir görev işlevini test edebilirsiniz:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Görev sırası işlevlerini dağıtma

Görev kuyruğu işlevini Firebase CLI'yi kullanarak dağıtma:

$ firebase deploy --only functions:backupapod

CLI, bir görev kuyruğu işlevini ilk kez dağıtırken kaynak kodunuzda belirtilen seçenekleri (hız sınırlaması ve yeniden deneme) kullanarak Cloud Tasks içinde bir görev kuyruğu oluşturur.

İşlevleri dağıtırken izin hatalarıyla karşılaşırsanız dağıtım komutlarını çalıştıran kullanıcıya uygun IAM rollerinin atandığından emin olun.

Görev sırası işlevlerini sıraya ekleme

Görev sırası işlevleri, Node.js için Firebase Admin SDK veya Python için Google Cloud kitaplıkları kullanılarak Cloud Functions for Firebase gibi güvenilir bir sunucu ortamından Cloud Tasks'e eklenebilir. Admin SDK'lerde yeniyseniz başlamak için Firebase'i bir sunucuya ekleme başlıklı makaleyi inceleyin.

Tipik bir akış, yeni bir görev oluşturur, bu görevi Cloud Tasks'te sıraya ekler ve görevin yapılandırmasını ayarlar:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Örnek kod, N. görev için N. dakikalık bir gecikme ilişkilendirerek görevlerin yürütülmesini dağıtmaya çalışır. Bu, dakika başına yaklaşık 1 görev tetiklenmesi anlamına gelir. Bir görevi belirli bir zamanda tetiklemek isterseniz scheduleTime (Node.js) veya schedule_time (Python) kullanabileceğinizi de unutmayın.Cloud Tasks

  • Örnek kod, Cloud Tasks'nin bir görevin tamamlanması için bekleyeceği maksimum süreyi belirler. Cloud Tasks, sıranın yeniden deneme yapılandırmasından sonra veya bu son tarihe kadar görevi yeniden dener. Örnekte, sıra, görevi 5 defaya kadar yeniden deneyecek şekilde yapılandırılmıştır ancak tüm işlem (yeniden deneme denemeleri dahil) 5 dakikadan uzun sürerse görev otomatik olarak iptal edilir.

Hedef URI'yi alıp dahil etme

Cloud Tasks, temel görev sırası işlevlerine yapılan istekleri doğrulamak için kimlik doğrulama jetonları oluşturma şekli nedeniyle, görevleri sıraya eklerken işlevin Cloud Run URL'sini belirtmeniz gerekir. Aşağıda gösterildiği gibi işlevinizin URL'sini programatik olarak almanızı öneririz:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Sorun giderme

Cloud Tasks">Cloud Tasks günlük kaydını etkinleştir

Cloud Tasks kaynaklı günlükler, bir görevle ilişkili isteğin durumu gibi yararlı teşhis bilgilerini içerir. Cloud Tasks kaynaklı günlükler, projenizde oluşturabileceği büyük miktardaki günlükler nedeniyle varsayılan olarak devre dışıdır. Görev kuyruğu işlevlerinizi aktif olarak geliştirip hata ayıklıyorken hata ayıklama günlüklerini etkinleştirmenizi öneririz. Günlüğe kaydetmeyi etkinleştirme başlıklı makaleyi inceleyin.

IAM İzinleri

Görevleri sıraya eklerken veya Cloud Tasks görev sırası işlevlerinizi çağırmaya çalışırken PERMISSION DENIED hataları görebilirsiniz. Projenizde aşağıdaki IAM bağlamalarının bulunduğundan emin olun:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

App Engine varsayılan hizmet hesabını, App Engine varsayılan hizmet hesabının kullanıcısı olarak ekleme talimatları için Google Cloud IAM belgelerine bakın.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker