הוספת פונקציות לתור באמצעות Cloud Tasks


הפונקציות של תור המשימות מנצלות את Cloud Tasks של Google כדי לעזור לאפליקציה להריץ משימות שדורשות זמן רב, משימות שצורכות הרבה משאבים או משימות עם מגבלת רוחב פס באופן אסינכרוני, מחוץ לתהליך העיבוד הראשי של האפליקציה.

לדוגמה, נניח שאתם רוצים ליצור גיבויים של קבוצה גדולה של קובצי תמונות שמתארחים כרגע ב-API עם הגבלת קצב שליחה. כדי להיות צרכנים אחראים של ה-API הזה, עליכם לפעול בהתאם למגבלות הקצב שלו. בנוסף, משימות ארוכות כאלה עלולות להיכשל בגלל זמן קצוב לתפוגה ומגבלות זיכרון.

כדי לצמצם את המורכבות הזו, אפשר לכתוב פונקציה של תור משימות שמגדירה אפשרויות משימות בסיסיות כמו scheduleTime ו-dispatchDeadline, ואז מעבירה את הפונקציה לתור ב-Cloud Tasks. הסביבה Cloud Tasks תוכננה במיוחד כדי להבטיח בקרה יעילה על עומסים וכללי מדיניות חוזרים לפעולות מהסוג הזה.

‏Firebase SDK עבור Cloud Functions for Firebase מגרסה 3.20.1 ואילך פועל בשילוב עם Firebase Admin SDK מגרסה 10.2.0 ואילך כדי לתמוך בפונקציות של תור המשימות.

שימוש בפונקציות של תור משימות עם Firebase עלול לגרום לחיוב על עיבוד Cloud Tasks. למידע נוסף, ראו תמחור Cloud Tasks.

יצירת פונקציות של רשימת משימות

כדי להשתמש בפונקציות של תור המשימות, פועלים לפי תהליך העבודה הבא:

  1. כותבים פונקציה של תור משימות באמצעות ה-SDK של Firebase ל-Cloud Functions.
  2. כדי לבדוק את הפונקציה, מפעילים אותה באמצעות בקשת HTTP.
  3. פורסים את הפונקציה באמצעות ה-CLI של Firebase. כשפורסים את הפונקציה של תור המשימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.
  4. מוסיפים משימות לתור המשימות החדש שנוצר, ומעבירים פרמטרים כדי להגדיר לוח זמנים לביצוע אם צריך. כדי לעשות זאת, כותבים את הקוד באמצעות Admin SDK ופורסים אותו ב-Cloud Functions for Firebase.

כתיבת פונקציות של תור משימות

דוגמאות הקוד בקטע הזה מבוססות על אפליקציה שמגדירה שירות לגיבוי כל התמונות מ-Astronomy Picture of the Day של NASA. כדי להתחיל, מייבאים את המודולים הנדרשים:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

משתמשים ב-onTaskDispatched או ב-on_task_dispatched בפונקציות של תור המשימות. כשכותבים פונקציה של תור משימות, אפשר להגדיר הגדרות של ניסיונות חוזרים לכל תור והגבלת קצב.

הגדרת פונקציות של תור משימות

פונקציות של תור משימות מגיעות עם קבוצה חזקה של הגדרות אישיות, שמאפשרות לשלוט במדויק במגבלות הקצב ובהתנהגות של ניסיונות חוזרים של תור משימות:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: כל משימה בתור המשימות תנסה שוב באופן אוטומטי עד 5 פעמים. כך אפשר לצמצם שגיאות חולפות כמו שגיאות רשת או שיבושים זמניים בשירות של שירות חיצוני תלוי.

  • retryConfig.minBackoffSeconds=60: המערכת תנסה שוב כל משימה לפחות 60 שניות אחרי כל ניסיון. כך נוצר מאגר גדול בין כל ניסיון, כדי שלא נמהר להשתמש בכל 5 הניסיונות החוזרים מהר מדי.

  • rateLimits.maxConcurrentDispatch=6: עד 6 משימות נשלחות בכל רגע נתון. כך אפשר להבטיח זרם יציב של בקשות לפונקציה הבסיסית, ולצמצם את מספר המכונות הפעילות והפעלות במצב התחלתי (cold start).

בדיקת פונקציות של תור משימות

ברוב המקרים, המהדר של Cloud Functions הוא הדרך הטובה ביותר לבדוק פונקציות של תורים של משימות. במסמכי התיעוד של Emulator Suite מוסבר איך לספק לאפליקציה כלים להדמיה של פונקציות של תורי משימות.

בנוסף, פונקציות ה-functions_sdk של תור המשימות מוצגות כפונקציות HTTP פשוטות ב-Firebase Local Emulator Suite. כדי לבדוק פונקציית משימה שעברה הדמיה, שולחים בקשת HTTP POST עם מטען נתונים בפורמט JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

פריסת פונקציות של תור משימות

פורסים את הפונקציה של תור המשימות באמצעות CLI של Firebase:

$ firebase deploy --only functions:backupapod

כשפורסים פונקציה של תור משימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב ניסיון חוזר) שצוינו בקוד המקור.

אם נתקלתם בשגיאות הרשאה בזמן הפריסה של הפונקציות, ודאו שהתפקידי ה-IAM המתאימים הוקצו למשתמש שמריץ את פקודות הפריסה.

הוספת פונקציות של תור משימות לתור

אפשר להוסיף פונקציות של תורי משימות לתור ב-Cloud Tasks מסביבת שרת מהימנה כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK ל-Node.js או ספריות Google Cloud ל-Python. אם אתם משתמשים חדשים ב-Admin SDK, תוכלו להיעזר במאמר הוספת Firebase לשרת כדי להתחיל.

תהליך טיפוסי יוצר משימה חדשה, מוסיפה אותה לתור ב-Cloud Tasks ומגדיר את ההגדרות של המשימה:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • הקוד לדוגמה מנסה לפזר את ביצוע המשימות על ידי שיוך עיכוב של N דקות למשימה ה-N. כלומר, הפעלה של משימה אחת בערך בכל דקה. הערה: אפשר גם להשתמש ב-scheduleTime (Node.js) או ב-schedule_time (Python) אם רוצים ש-Cloud Tasks תפעיל משימה בשעה ספציפית.

  • בקוד לדוגמה מוגדר משך הזמן המקסימלי ש-Cloud Tasks ימתין להשלמת המשימה. Cloud Tasks ינסה שוב לבצע את המשימה בהתאם להגדרות הניסיון החוזר של התור, או עד שיגיע למועד היעד הזה. בדוגמה, התור מוגדר לנסות שוב את המשימה עד 5 פעמים, אבל המשימה מבוטלת באופן אוטומטי אם התהליך כולו (כולל ניסיונות חוזרים) נמשך יותר מ-5 דקות.

אחזור של ה-URI של היעד וכללתו

בגלל האופן שבו Cloud Tasks יוצר אסימוני אימות כדי לאמת בקשות לפונקציות של תור המשימות הבסיסי, צריך לציין את כתובת ה-URL של הפונקציה ב-Cloud Run כששולחים משימות לתור. מומלץ לאחזר את כתובת ה-URL של הפונקציה באופן פרוגרמטי, כפי שמתואר בהמשך:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

פתרון בעיות

Cloud Tasks">הפעלת הרישום ביומן של Cloud Tasks

היומנים מ-Cloud Tasks מכילים מידע שימושי לאבחון, כמו סטטוס הבקשה שמשויכת למשימה. כברירת מחדל, יומנים מ-Cloud Tasks מושבתים בגלל הכמות הגדולה של יומנים שהם עלולים ליצור בפרויקט. מומלץ להפעיל את יומני ניפוי הבאגים בזמן הפיתוח והניפוי של פונקציות תור המשימות. הפעלת הרישום ביומן

הרשאות IAM

יכול להיות שתראו שגיאות מסוג PERMISSION DENIED כשאתם מוסיפים משימות לתור או כש-Cloud Tasks מנסה להפעיל את הפונקציות של תור המשימות. מוודאים שהפרויקט כולל את קישורי ה-IAM הבאים:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

הוראות להוספת חשבון השירות App Engine שמוגדר כברירת מחדל כמשתמש בחשבון השירות App Engine שמוגדר כברירת מחדל מפורטות במסמכי העזרה של Google Cloud IAM.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker