وظائف قائمة الانتظار مع المهام السحابية


تستفيد وظائف قائمة انتظار المهام من مهام Google Cloud لمساعدة تطبيقك على تشغيل المهام التي تستغرق وقتًا طويلاً أو كثيفة الاستخدام للموارد أو محدودة النطاق الترددي بشكل غير متزامن، خارج تدفق التطبيق الرئيسي.

على سبيل المثال، تخيل أنك تريد إنشاء نسخ احتياطية لمجموعة كبيرة من ملفات الصور المستضافة حاليًا على واجهة برمجة التطبيقات (API) مع حد للمعدل. لكي تكون مستهلكًا مسؤولاً لواجهة برمجة التطبيقات (API) تلك، يتعين عليك احترام حدود الأسعار الخاصة بها. بالإضافة إلى ذلك، قد يكون هذا النوع من المهام طويلة الأمد عرضة للفشل بسبب انتهاء المهلات وحدود الذاكرة.

للتخفيف من هذا التعقيد، يمكنك كتابة دالة قائمة انتظار المهام التي تحدد خيارات المهام الأساسية مثل scheduleTime و dispatchDeadline ، ثم تسليم الوظيفة إلى قائمة انتظار في Cloud Tasks. تم تصميم بيئة Cloud Tasks خصيصًا لضمان التحكم الفعال في الازدحام وإعادة محاولة السياسات لهذه الأنواع من العمليات.

تتفاعل حزمة Firebase SDK للوظائف السحابية لـ Firebase v3.20.1 والإصدارات الأحدث مع Firebase Admin SDK الإصدار 10.2.0 والإصدارات الأحدث لدعم وظائف قائمة انتظار المهام.

يمكن أن يؤدي استخدام وظائف قائمة انتظار المهام مع Firebase إلى فرض رسوم على معالجة المهام السحابية. راجع تسعير المهام السحابية لمزيد من المعلومات.

إنشاء وظائف قائمة انتظار المهام

لاستخدام وظائف قائمة انتظار المهام، اتبع سير العمل التالي:

  1. اكتب وظيفة قائمة انتظار المهام باستخدام Firebase SDK for Cloud Functions.
  2. اختبر وظيفتك عن طريق تشغيلها بطلب HTTP.
  3. انشر وظيفتك باستخدام Firebase CLI. عند نشر وظيفة قائمة انتظار المهام الخاصة بك لأول مرة، ستقوم واجهة سطر الأوامر (CLI) بإنشاء قائمة انتظار مهام في Cloud Tasks مع خيارات (تحديد المعدل وإعادة المحاولة) المحددة في التعليمات البرمجية المصدر الخاصة بك.
  4. أضف المهام إلى قائمة انتظار المهام التي تم إنشاؤها حديثًا، وقم بتمرير المعلمات لإعداد جدول التنفيذ إذا لزم الأمر. يمكنك تحقيق ذلك عن طريق كتابة التعليمات البرمجية باستخدام Admin SDK ونشرها على Cloud Functions for Firebase.

كتابة وظائف قائمة انتظار المهام

تعتمد نماذج التعليمات البرمجية الموجودة في هذا القسم على تطبيق يقوم بإعداد خدمة تدعم جميع الصور من صورة اليوم لعلم الفلك التابعة لناسا. للبدء، قم باستيراد الوحدات المطلوبة:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

بايثون

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

استخدم onTaskDispatched أو on_task_dispatched لوظائف قائمة انتظار المهام. عند كتابة وظيفة قائمة انتظار المهام، يمكنك تعيين إعادة المحاولة لكل قائمة انتظار وتكوين تحديد المعدل.

تكوين وظائف قائمة انتظار المهام

تأتي وظائف قائمة انتظار المهام مع مجموعة قوية من إعدادات التكوين للتحكم بدقة في حدود المعدل وإعادة محاولة سلوك قائمة انتظار المهام:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

بايثون

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : تتم إعادة محاولة كل مهمة في قائمة انتظار المهام تلقائيًا حتى 5 مرات. ويساعد ذلك في تخفيف الأخطاء العابرة مثل أخطاء الشبكة أو الانقطاع المؤقت للخدمة لخدمة خارجية تابعة.

  • retryConfig.minBackoffSeconds=60 : تتم إعادة محاولة كل مهمة بعد 60 ثانية على الأقل من كل محاولة. يوفر هذا مساحة تخزين مؤقت كبيرة بين كل محاولة حتى لا نتسرع في استنفاد محاولات إعادة المحاولة الخمس بسرعة كبيرة.

  • rateLimits.maxConcurrentDispatch=6 : يتم إرسال 6 مهام على الأكثر في وقت معين. يساعد هذا على ضمان التدفق المستمر للطلبات إلى الوظيفة الأساسية ويساعد على تقليل عدد المثيلات النشطة وحالات البدء البارد.

اختبار وظائف قائمة انتظار المهام

يتم عرض وظائف قائمة انتظار المهام في Firebase Local Emulator Suite كوظائف HTTP بسيطة. يمكنك اختبار وظيفة مهمة تمت محاكاتها عن طريق إرسال طلب HTTP POST مع حمولة بيانات JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

نشر وظائف قائمة انتظار المهام

نشر وظيفة قائمة انتظار المهام باستخدام Firebase CLI:

$ firebase deploy --only functions:backupapod

عند نشر وظيفة قائمة انتظار المهام لأول مرة، تقوم واجهة سطر الأوامر (CLI) بإنشاء قائمة انتظار مهام في Cloud Tasks مع خيارات (تحديد المعدل وإعادة المحاولة) المحددة في التعليمات البرمجية المصدر الخاصة بك.

إذا واجهت أخطاء في الأذونات عند نشر الوظائف، فتأكد من تعيين أدوار IAM المناسبة للمستخدم الذي يقوم بتشغيل أوامر النشر.

Enqueue وظائف قائمة انتظار المهام

يمكن إدراج وظائف قائمة انتظار المهام في Cloud Tasks من بيئة خادم موثوقة مثل Cloud Functions for Firebase باستخدام Firebase Admin SDK لـ Node.js أو مكتبات Google Cloud لـ Python. إذا كنت مستخدمًا جديدًا لحزم SDK للمشرف، فراجع إضافة Firebase إلى خادم للبدء.

ينشئ التدفق النموذجي مهمة جديدة، ويضعها في قائمة المهام السحابية، ويعين تكوين المهمة:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

بايثون

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
                                         "backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(http_request={
            "http_method": tasks_v2.HttpMethod.POST,
            "url": target_uri,
            "headers": {
                "Content-type": "application/json"
            },
            "body": json.dumps(body).encode()
        },
                             schedule_time=schedule_time)
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • يحاول نموذج التعليمات البرمجية نشر تنفيذ المهام عن طريق ربط تأخير قدره Nth دقيقة للمهمة Nth. يُترجم هذا إلى تشغيل مهمة واحدة تقريبًا في الدقيقة. لاحظ أنه يمكنك أيضًا استخدام scheduleTime (Node.js) أو schedule_time (Python) إذا كنت تريد أن تقوم Cloud Tasks بتشغيل مهمة في وقت محدد.

  • يقوم نموذج التعليمات البرمجية بتعيين الحد الأقصى للوقت الذي ستنتظره Cloud Tasks حتى تكتمل المهمة. ستقوم Cloud Tasks بإعادة محاولة المهمة بعد تكوين إعادة المحاولة لقائمة الانتظار أو حتى الوصول إلى هذا الموعد النهائي. في العينة، تم تكوين قائمة الانتظار لإعادة محاولة المهمة حتى 5 مرات، ولكن يتم إلغاء المهمة تلقائيًا إذا استغرقت العملية بأكملها (بما في ذلك محاولات إعادة المحاولة) أكثر من 5 دقائق.

استرداد وتضمين URI الهدف

نظرًا للطريقة التي تنشئ بها Cloud Tasks رموز المصادقة المميزة لمصادقة الطلبات إلى وظائف قائمة انتظار المهام الأساسية، يجب عليك تحديد عنوان URL الخاص بـ Cloud Run للوظيفة عند وضع المهام في قائمة الانتظار. نوصيك باسترداد عنوان URL لوظيفتك برمجيًا كما هو موضح أدناه:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

بايثون

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

استكشاف الأخطاء وإصلاحها

قم بتشغيل تسجيل المهام السحابية

تحتوي السجلات من المهام السحابية على معلومات تشخيصية مفيدة مثل حالة الطلب المرتبط بالمهمة. افتراضيًا، يتم إيقاف تشغيل السجلات من "المهام السحابية" نظرًا للحجم الكبير للسجلات التي من المحتمل أن تنشئها في مشروعك. نوصيك بتشغيل سجلات تصحيح الأخطاء أثناء قيامك بتطوير وتصحيح وظائف قائمة انتظار المهام بشكل نشط. راجع تشغيل التسجيل .

أذونات إدارة الهوية وإمكانية الوصول (IAM).

قد ترى أخطاء PERMISSION DENIED عند وضع المهام في قائمة الانتظار أو عندما تحاول Cloud Tasks استدعاء وظائف قائمة انتظار المهام الخاصة بك. تأكد من أن مشروعك يحتوي على روابط IAM التالية:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

راجع وثائق Google Cloud IAM للحصول على إرشادات حول كيفية إضافة حساب خدمة App Engine الافتراضي كمستخدم لحساب خدمة App Engine الافتراضي.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker