הוספת פונקציות לתור באמצעות Cloud Tasks


הפונקציות של תור המשימות מנצלים את היתרונות של Google Cloud Tasks כדי לעזור לאפליקציה שלכם לפעול שהיא גוזלת זמן רב, צורכת משאבים רבים או שהיא מוגבלת ברוחב פס באופן אסינכרוני, מחוץ לתהליך הראשי של האפליקציה.

לדוגמה, נניח שאתם רוצים ליצור גיבויים של קבוצה גדולה של תמונות קבצים שמתארחים כרגע ב-API עם מגבלת קצב של יצירת בקשות. כדי להיות לצרכן האחראי של ה-API הזה, עליכם לציית להגבלות הקצב של יצירת הבקשות. בנוסף, משימה ממושכת מהסוג הזה עלולה להיות חשופה לכשל בגלל חסימות זמניות מגבלות זיכרון.

כדי לצמצם את המורכבות הזו, אפשר לכתוב פונקציית תור משימות שמגדירה אפשרויות כמו scheduleTime ו-dispatchDeadline, ואז נותנים הפונקציה מושבתת לתור ב-Cloud Tasks. Cloud Tasks עוצבה במיוחד כדי להבטיח בקרת צפיפות יעילה לביצוע ניסיונות חוזרים של כללי מדיניות מהסוגים האלה.

ה-SDK של Firebase for Cloud Functions for Firebase בגרסה 3.20.1 ואילך פועל באופן הדדי עם Firebase Admin SDK מגרסה 10.2.0 ואילך, כדי לתמוך בפונקציות של תור המשימות.

שימוש בפונקציות של תור משימות עם Firebase עלול לגרום לחיוב על עיבוד Cloud Tasks. למידע נוסף, ראו תמחור Cloud Tasks.

יצירת פונקציות בתור המשימות

כדי להשתמש בפונקציות של תור המשימות, צריך לפעול לפי השלבים הבאים:

  1. כתיבת פונקציה של תור המשימות באמצעות ה-SDK Firebase של Cloud Functions.
  2. כדי לבדוק את הפונקציה, מפעילים אותה באמצעות בקשת HTTP.
  3. פורסים את הפונקציה עם ה-CLI Firebase. כשפורסים את הפונקציה של תור המשימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.
  4. להוסיף משימות לתור המשימות החדש שנוצר, ולהעביר פרמטרים להגדרה לוח זמנים לביצוע במידת הצורך. אפשר לעשות זאת באמצעות כתיבת הקוד באמצעות Admin SDK ולפרוס אותו ב-Cloud Functions for Firebase.

כתיבת פונקציות של תור המשימות

דוגמאות הקוד בקטע הזה מבוססות על אפליקציה שמגדירה שירות שמגבה את כל התמונות תמונת האסטרונומיה של היום. כדי להתחיל, מייבאים את המודולים הנדרשים:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

שימוש ב-onTaskDispatched או on_task_dispatched לפונקציות של תור המשימות. כשכותבים פונקציה של תור משימות, אפשר להגדיר הגדרות של ניסיונות חוזרים לכל תור והגבלת קצב.

הגדרת פונקציות של תור משימות

הפונקציות של 'הבאים בתור' כוללות קבוצה עוצמתית של הגדרות תצורה כדי לשלוט במדויק במגבלות הקצב של יצירת הבקשות ולנסות שוב את ההתנהגות של תור המשימות:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: כל משימה בתור המשימות מתעדכנת באופן אוטומטי בוצעה ניסיון חוזר של עד 5 פעמים. כך אפשר לצמצם שגיאות חולפות כמו שגיאות רשת או שיבושים זמניים בשירות של שירות חיצוני תלוי.

  • retryConfig.minBackoffSeconds=60: צריך לבצע ניסיון חוזר של כל משימה במשך 60 שניות לפחות בנפרד מכל ניסיון. כך נוצר מרווח פנימי גדול בין כל ניסיון כך שלא נמהר למצות את חמשת הניסיונות החוזרים מהר מדי.

  • rateLimits.maxConcurrentDispatch=6: 6 משימות לכל היותר נשלחות בבת אחת בזמן נתון. כך אפשר להבטיח זרם יציב של בקשות לפונקציה הבסיסית, ולצמצם את מספר המכונות הפעילות והפעלות במצב התחלתי (cold start).

בדיקת הפונקציות בתור המשימה

ברוב המקרים, האמולטור Cloud Functions הוא הדרך הטובה ביותר לבדוק את המשימה בתור. כדי לקבל מידע נוסף על: כלים לאפליקציה לאמולציה של פונקציות בתור משימות.

בנוסף, הפונקציות של תור המשימות נחשפות כפשוטות פונקציות HTTP בFirebase Local Emulator Suite. אפשר לבדוק פונקציית אמולציה של פונקציית המשימה על ידי שליחת HTTP POST בקשה באמצעות מטען ייעודי (payload) של נתוני JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

פריסת הפונקציות של תור המשימות

פורסים את הפונקציה של תור המשימות באמצעות ה-CLI Firebase:

$ firebase deploy --only functions:backupapod

בעת פריסת פונקציית תור משימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וביצוע ניסיון חוזר) שצוין בקוד המקור שלכם.

אם נתקלתם בשגיאות הרשאות בזמן הפריסה של פונקציות, ודאו שהתפקידי ה-IAM המתאימים הוקצו למשתמש שמריץ את פקודות הפריסה.

הוספת פונקציות של תור משימות לתור

אפשר להוסיף את הפונקציות לתור למשימות לתור ב-Cloud Tasks מחשבון מהימן סביבת שרת כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK ספריות Node.js או Google Cloud בשביל Python. אם זו הפעם הראשונה שיש לך Admin SDK, כדאי לעיין במאמר הבא: כדי להתחיל, מוסיפים את Firebase לשרת.

תהליך טיפוסי יוצר משימה חדשה, מוסיפה אותה לתור ב-Cloud Tasks ומגדיר את ההגדרות של המשימה:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • הקוד לדוגמה מנסה לפזר את הביצוע של משימות באמצעות שיוך השהיה של N דקות למשימה N. הזה מתורגמת להפעלה של כמשימה אחת בדקה. חשוב לזכור שאפשר גם להשתמש scheduleTime (Node.js) או schedule_time (Python) אם רוצים Cloud Tasks כדי להפעיל משימה במועד ספציפי.

  • הקוד לדוגמה מגדיר את משך הזמן המקסימלי Cloud Tasks ימתין לביצוע משימה. ניסיון נוסף של Cloud Tasks המשימה שאחרי הניסיון החוזר של התור או עד שהמועד האחרון יגיע. בדוגמה, התור מוגדר לנסות שוב את המשימה עד 5 פעמים, אבל המשימה מבוטלת באופן אוטומטי אם התהליך כולו (כולל ניסיונות חוזרים) נמשך יותר מ-5 דקות.

אחזור והכללה של URI היעד

בגלל האופן שבו Cloud Tasks יוצר אסימוני אימות לצורך אימות לפונקציות הבסיסיות של תור המשימות, צריך לציין כתובת ה-URL של הפונקציה ב-Cloud Run במהלך הוספת משימות לתור. רביעי שמומלץ לאחזר באופן פרוגרמטי את כתובת ה-URL של הפונקציה הדוגמה הבאה:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

פתרון בעיות

Cloud Tasks">הפעלת הרישום ביומן של Cloud Tasks

היומנים מ-Cloud Tasks מכילים מידע שימושי לאבחון, כמו סטטוס הבקשה שמשויכת למשימה. כברירת מחדל, יומנים מ- Cloud Tasks מושבתים בגלל כמות היומנים הגדולה שיכולים שיש אפשרות ליצור בפרויקט שלכם. מומלץ להפעיל את יומני ניפוי הבאגים בזמן הפיתוח והניפוי של פונקציות תור המשימות. צפייה הפעלה רישום ביומן.

הרשאות IAM

יכול להיות שיופיעו PERMISSION DENIED שגיאות ברשימת המשימות או Cloud Tasks מנסה להפעיל את הפונקציות בתור המשימות. צריך לוודא שהמאפיינים לפרויקט יש את קישורי ה-IAM הבאים:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

מאמרי העזרה של Google Cloud IAM איך מוסיפים את חשבון השירות App Engine שמוגדר כברירת מחדל? כמשתמש בחשבון השירות App Engine שמוגדר כברירת מחדל.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker