Cloud Tasks ile işlevleri sıraya alma


Görev sırası işlevleri, uygulamanızın zaman alan, kaynak yoğun veya bant genişliği sınırlı görevleri ana uygulama akışınızın dışında eşzamanlı olarak çalıştırmasına yardımcı olmak için Google Cloud Tasks'dan yararlanır.

Örneğin, şu anda bir hız sınırına sahip API'de barındırılan büyük bir resim dosyası grubunun yedeklerini oluşturmak istediğinizi varsayalım. Bu API'nin sorumlu bir tüketicisi olmak için hız sınırlarına uymanız gerekir. Ayrıca, bu tür uzun süren işler, zaman aşımları ve bellek sınırları nedeniyle başarısızlığa karşı savunmasız olabilir.

Bu karmaşıklığı azaltmak için scheduleTime ve dispatchDeadline gibi temel görev seçeneklerini ayarlayan bir görev sırası işlevi yazabilir, ardından işlevi Cloud Tasks içindeki bir sıraya aktarabilirsiniz. Cloud Tasks ortamı, özellikle bu tür işlemler için etkili tıkanıklık kontrolü ve yeniden deneme politikaları sağlamak üzere tasarlanmıştır.

Cloud Functions for Firebase v3.20.1 ve üzeri sürümler için Firebase SDK, görev sırası işlevlerini desteklemek üzere Firebase Admin SDK v10.2.0 ve üzeri sürümlerle birlikte çalışır.

Firebase ile görev sırası işlevlerini kullanmak, işleme için ücret alınmasına neden olabilir.Cloud Tasks Daha fazla bilgi için Cloud Tasks fiyatlandırması bölümüne bakın.

Görev sırası işlevleri oluşturma

Görev sırası işlevlerini kullanmak için şu iş akışını izleyin:

  1. Firebase SDK'sını kullanarak Cloud Functions için bir görev sırası işlevi yazın.
  2. HTTP isteğiyle tetikleyerek işlevinizi test edin.
  3. İşlevinizi Firebase CLI ile dağıtın. Görev sırası işlevinizi ilk kez dağıtırken CLI, kaynak kodunuzda belirtilen seçeneklerle (hız sınırlama ve yeniden deneme) Cloud Tasks içinde bir görev sırası oluşturur.
  4. Yeni oluşturulan görev sırasına görevler ekleyin. Gerekirse yürütme planı ayarlamak için parametreleri iletin. Bunu, Admin SDK kullanarak kodu yazıp Cloud Functions for Firebase'ye dağıtarak yapabilirsiniz.

Görev sırası işlevlerini yazma

Bu bölümdeki kod örnekleri, NASA'nın Günün Gökbilim Fotoğrafı'ndaki tüm resimleri yedekleyen bir hizmeti ayarlayan bir uygulamaya dayanmaktadır. Başlamak için gerekli modülleri içe aktarın:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Görev sırası işlevleri için onTaskDispatched veya on_task_dispatched kullanın. Görev sırası işlevi yazarken sıra başına yeniden deneme ve sıklık sınırlama yapılandırması ayarlayabilirsiniz.

Görev sırası işlevlerini yapılandırma

Görev sırası işlevleri, görev sırasının hız sınırlarını ve yeniden deneme davranışını hassas bir şekilde kontrol etmek için güçlü bir yapılandırma ayarları grubuyla birlikte gelir:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Görev sırasındaki her görev otomatik olarak en fazla 5 kez yeniden denenir. Bu, ağ hataları veya bağımlı bir harici hizmetin geçici olarak kesintiye uğraması gibi geçici hataların azaltılmasına yardımcı olur.

  • retryConfig.minBackoffSeconds=60: Her görev, her denemeden en az 60 saniye sonra yeniden denenir. Bu sayede, 5 yeniden deneme hakkını çok hızlı bir şekilde tüketmemek için her deneme arasında büyük bir arabellek sağlanır.

  • rateLimits.maxConcurrentDispatch=6: Belirli bir zamanda en fazla 6 görev gönderilir. Bu, temel işlevlere yönelik istek akışının sabit olmasını sağlar ve etkin örneklerin sayısını ve baştan başlatma işlemlerini azaltır.

Görev sırası işlevlerini test etme

Çoğu durumda, görev sırası işlevlerini test etmenin en iyi yolu Cloud Functions emülatörüdür. Uygulamanızı görev sırası işlevleri emülasyonu için nasıl ayarlayacağınızı öğrenmek üzere Emulator Suite dokümanlarına bakın.

Ayrıca, task queue functions_sdk, Firebase Local Emulator Suite içinde basit HTTP işlevleri olarak sunulur. Bir JSON veri yüküyle HTTP POST isteği göndererek emüle edilmiş bir görev işlevini test edebilirsiniz:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Görev sırası işlevlerini dağıtma

Firebase CLI'yı kullanarak görev sırası işlevini dağıtın:

$ firebase deploy --only functions:backupapod

Bir görev sırası işlevini ilk kez dağıtırken CLI, kaynak kodunuzda belirtilen seçeneklerle (hız sınırlama ve yeniden deneme) Cloud Tasks içinde bir görev sırası oluşturur.

İşlevleri dağıtırken izin hatalarıyla karşılaşırsanız dağıtım komutlarını çalıştıran kullanıcıya uygun IAM rollerinin atandığından emin olun.

Görev sırası işlevlerini sıraya alma

Görev sırası işlevleri, Node.js için Firebase Admin SDK veya Python için Google Cloud kitaplıkları kullanılarak Cloud Functions for Firebase gibi güvenilir bir sunucu ortamından Cloud Tasks'ya sıraya alınabilir. Admin SDK kullanmaya yeni başladıysanız başlamak için Sunucuya Firebase ekleme başlıklı makaleyi inceleyin.

Tipik bir akışta yeni bir görev oluşturulur, bu görev Cloud Tasks'ya eklenir ve görevin yapılandırması ayarlanır:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Örnek kod, N. görev için N. dakikalık bir gecikme ilişkilendirerek görevlerin yürütülmesini yaymaya çalışır. Bu, dakikada yaklaşık 1 görevin tetiklenmesi anlamına gelir. Cloud Tasks ile belirli bir zamanda bir görevi tetiklemek istiyorsanız scheduleTime (Node.js) veya schedule_time (Python) kullanabileceğinizi unutmayın.

  • Örnek kod, Cloud Tasks öğesinin bir görevin tamamlanmasını bekleyeceği maksimum süreyi ayarlar. Cloud Tasks, kuyruğun yeniden deneme yapılandırmasına göre veya bu son tarihe ulaşılana kadar görevi yeniden dener. Örnekte, sıra görevi en fazla 5 kez yeniden deneyecek şekilde yapılandırılmıştır ancak tüm süreç (yeniden deneme girişimleri dahil) 5 dakikadan uzun sürerse görev otomatik olarak iptal edilir.

Hedef URI'yi alıp ekleme

Cloud Tasks, temel görev sırası işlevlerine yönelik isteklerin kimliğini doğrulamak için kimlik doğrulama jetonları oluşturma şekli nedeniyle, görevleri sıraya alırken işlevin Cloud Run URL'sini belirtmeniz gerekir. İşlevinizin URL'sini aşağıda gösterildiği gibi programatik olarak almanızı öneririz:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Sorun giderme

Cloud Tasks günlük kaydını etkinleştirme

Cloud Tasks günlükleri, bir görevle ilişkili isteğin durumu gibi faydalı teşhis bilgileri içerir. Cloud Tasks, projenizde çok sayıda günlük oluşturabileceğinden bu hizmetin günlükleri varsayılan olarak devre dışıdır. Görev sırası işlevlerinizi aktif olarak geliştirip hatalarını ayıklarken hata ayıklama günlüklerini etkinleştirmenizi öneririz. Günlüğe kaydetmeyi etkinleştirme başlıklı makaleyi inceleyin.

IAM İzinleri

Görevleri sıraya alırken veya PERMISSION DENIED görev kuyruğu işlevlerinizi çağırmaya çalışırken Cloud Tasks hataları görebilirsiniz. Projenizin aşağıdaki IAM bağlamalarına sahip olduğundan emin olun:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Görevleri Cloud Tasks'ya sıraya almak için kullanılan kimliğin, Cloud Tasks'daki bir görevle ilişkili hizmet hesabını kullanma izni olmalıdır.

    Örnekte bu, App Engine varsayılan hizmet hesabıdır.

App Engine varsayılan hizmet hesabınıApp Engine varsayılan hizmet hesabının kullanıcısı olarak ekleme talimatları için Google Cloud IAM belgelerine bakın.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker