จัดคิวฟังก์ชันด้วย Cloud Tasks


ฟังก์ชันของคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปของคุณใช้เวลานาน ต้องใช้ทรัพยากรมาก หรือจำกัดแบนด์วิดท์ งานต่างๆ แบบไม่พร้อมกันภายนอกโฟลว์หลักของแอปพลิเคชัน

เช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองชุดรูปภาพขนาดใหญ่ ซึ่งปัจจุบันโฮสต์อยู่ใน API โดยมีขีดจำกัดอัตราคำขอ หากต้องการเป็น ผู้บริโภคที่รับผิดชอบของ API ดังกล่าว คุณจะต้องเคารพขีดจำกัดอัตราคำขอ นอกจากนี้ งานที่ใช้เวลานานนี้อาจเสี่ยงต่อการล้มเหลวเนื่องจากหมดเวลา ขีดจำกัดของหน่วยความจำ

เพื่อลดความซับซ้อนนี้ คุณสามารถเขียนฟังก์ชันคิวงานที่ตั้งค่าพื้นฐาน ตัวเลือกงาน เช่น scheduleTime และ dispatchDeadline แล้วมอบหมาย ปิดไปยังคิวใน Cloud Tasks Cloud Tasks ได้รับการออกแบบเป็นพิเศษเพื่อให้มั่นใจว่าการควบคุมความคับคั่งที่มีประสิทธิภาพ ลองกำหนดนโยบายอีกครั้งสำหรับการดำเนินการประเภทนี้

Firebase SDK สำหรับการทำงานร่วมกันระหว่าง Cloud Functions for Firebase เวอร์ชัน 3.20.1 ขึ้นไป มี Firebase Admin SDK v10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน

การใช้ฟังก์ชันคิวงานกับ Firebase อาจส่งผลให้มีการเรียกเก็บเงินสำหรับ กำลังประมวลผล Cloud Tasks โปรดดู ราคา Cloud Tasks เพื่อดูข้อมูลเพิ่มเติม

สร้างฟังก์ชันคิวงาน

หากต้องการใช้ฟังก์ชันคิวงาน ให้ทำตามขั้นตอนต่อไปนี้

  1. เขียนฟังก์ชันคิวงานโดยใช้ SDK ของ Firebase สำหรับ Cloud Functions
  2. ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
  3. ทำให้ฟังก์ชันใช้งานได้ด้วย CLI ของ Firebase เมื่อทำให้งานของคุณใช้งานได้ เป็นครั้งแรก CLI จะสร้าง คิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราและลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
  4. เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ ส่งผ่านพารามิเตอร์เพื่อตั้งค่า กำหนดการดำเนินการหากจำเป็น คุณสามารถทำได้โดยเขียนโค้ด โดยใช้ Admin SDK และทำให้ใช้งานได้กับ Cloud Functions for Firebase

เขียนฟังก์ชันของคิวงาน

ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้งค่า บริการที่สำรองข้อมูลรูปภาพทั้งหมดจากองค์การนาซา ภาพประจำวันของดาราศาสตร์ ในการเริ่มต้น ให้นำเข้าโมดูลที่จำเป็น:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

ใช้ onTaskDispatched หรือ on_task_dispatched สำหรับฟังก์ชันของคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองอีกครั้งต่อคิวและการกำหนดค่าการจำกัดอัตรา

กำหนดค่าฟังก์ชันคิวงาน

ฟังก์ชันคิวงานมาพร้อมกับชุดการตั้งค่าการกำหนดค่าที่มีประสิทธิภาพ เพื่อควบคุมขีดจำกัดอัตราคำขอและการทำงานซ้ำของคิวงานอย่างละเอียด

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: ระบบจะดำเนินการแต่ละงานในคิวงานโดยอัตโนมัติ ลองใหม่ถึง 5 ครั้ง การดำเนินการนี้จะช่วยลดข้อผิดพลาดชั่วคราว เช่น เครือข่าย หรือบริการขัดข้องชั่วคราวของบริการภายนอกที่จำเป็น

  • retryConfig.minBackoffSeconds=60: แต่ละงานลองใหม่อย่างน้อย 60 วินาที นอกเหนือจากความพยายามแต่ละครั้ง ทำให้มีบัฟเฟอร์ขนาดใหญ่ระหว่างการพยายามแต่ละครั้ง เราจึงไม่ต้องรีบร้อนในการพยายามลองใหม่ทั้ง 5 ครั้งเร็วเกินไป

  • rateLimits.maxConcurrentDispatch=6: โดยส่งงานมากที่สุด 6 งานในเวลา ตามเวลาที่กำหนด วิธีนี้ช่วยให้มั่นใจว่าจะมีคำขอที่ส่งอย่างต่อเนื่องไปยังผู้ที่เกี่ยวข้อง และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start

ทดสอบฟังก์ชันของคิวงาน

ในกรณีส่วนใหญ่ โปรแกรมจำลอง Cloud Functions เป็นวิธีที่ดีที่สุดในการทดสอบงาน ของฟังก์ชันคิว โปรดดูวิธีดำเนินการจากเอกสารชุดโปรแกรมจำลอง สร้างเครื่องมือให้แก่แอปสำหรับการจำลองฟังก์ชันของคิวงาน

นอกจากนี้ Functions_sdk ของคิวงานจะแสดงในรูปแบบอย่างง่าย ฟังก์ชัน HTTP ใน Firebase Local Emulator Suite คุณทดสอบฟังก์ชันงานที่จำลองได้โดยส่ง HTTP POST คำขอที่มีเพย์โหลดข้อมูล JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

ทำให้ฟังก์ชันของคิวงานใช้งานได้

ทำให้ฟังก์ชันคิวงานใช้งานได้โดยใช้ Firebase CLI:

$ firebase deploy --only functions:backupapod

เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้าง คิวงานใน Cloud Tasks พร้อมตัวเลือก (จำกัดอัตราคำขอแล้วลองใหม่) ที่ระบุไว้ในซอร์สโค้ดของคุณ

หากคุณพบข้อผิดพลาดด้านสิทธิ์เมื่อทำให้ฟังก์ชันใช้งานได้ โปรดตรวจสอบว่า บทบาท IAM ที่เหมาะสม ได้รับการกำหนดให้กับผู้ใช้ที่เรียกใช้คำสั่งทำให้ใช้งานได้

จัดคิวฟังก์ชันของคิวงาน

ฟังก์ชันของคิวงานสามารถจัดคิวใน Cloud Tasks ได้จาก สภาพแวดล้อมของเซิร์ฟเวอร์อย่าง Cloud Functions for Firebase ที่ใช้ Firebase Admin SDK สำหรับ ไลบรารี Node.js หรือ Google Cloud สำหรับ Python หากคุณเพิ่งเริ่มใช้ Admin SDK โปรดดู เพิ่ม Firebase ไปยังเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน

ขั้นตอนทั่วไปเป็นการสร้างงานใหม่ และเพิ่มคิวงานลงในคิว Cloud Tasks แล้วกำหนดค่าสำหรับงานดังนี้

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • โค้ดตัวอย่างจะพยายามกระจายการดำเนินการของ งานด้วยการเชื่อมโยงความล่าช้าของนาทีที่ N กับงานที่ N ช่วงเวลานี้ แปลเป็นทริกเกอร์ ~ 1 งาน/นาที โปรดทราบว่าคุณยังสามารถ scheduleTime (Node.js) หรือ schedule_time (Python) หากต้องการ Cloud Tasks เพื่อทริกเกอร์งานในเวลาที่เจาะจง

  • โค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุด Cloud Tasks จะรอ เพื่อให้งานเสร็จสมบูรณ์ Cloud Tasks จะลองอีกครั้ง งานหลังจากการลองใหม่ การกำหนดค่าของคิว หรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่างนี้ คิวจะได้รับการกำหนดค่าให้ลองทำงานอีกครั้งได้สูงสุด 5 ครั้ง แต่งาน ยกเลิกโดยอัตโนมัติหากทั้งกระบวนการ (รวมถึงการพยายามลองอีกครั้ง) ใช้เวลานานกว่า 5 นาที

ดึงข้อมูลและรวม URI เป้าหมาย

เนื่องจากวิธีที่ Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์ ไปยังฟังก์ชันของคิวงานที่จำเป็น คุณต้องระบุ URL ของ Cloud Run ของฟังก์ชันเมื่อจัดคิวงาน พ ขอแนะนำให้คุณเรียกข้อมูล URL สำหรับฟังก์ชันโดยใช้โปรแกรมดังนี้ ดังที่แสดงด้านล่าง

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

การแก้ปัญหา

Cloud Tasks">เปิดการบันทึก Cloud Tasks

บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่มีประโยชน์ เช่น สถานะคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น บันทึกจาก Cloud Tasks ปิดอยู่เนื่องจากมีบันทึกจำนวนมาก ที่อาจสร้างขึ้นในโปรเจ็กต์ของคุณได้ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่อง ขณะที่คุณกำลังพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวงาน โปรดดู กำลังเปิด Logging

สิทธิ์ IAM

คุณอาจเห็นข้อผิดพลาด PERMISSION DENIED รายการเมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่า โปรเจ็กต์มีการเชื่อมโยง IAM ต่อไปนี้

  • ข้อมูลประจำตัวที่ใช้เพื่อจัดคิวงานให้ตรงกับความต้องการของ Cloud Tasks สิทธิ์ IAM cloudtasks.tasks.create

    ในตัวอย่างนี้ บัญชีนี้คือบัญชีบริการเริ่มต้น App Engine

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • ข้อมูลประจำตัวที่ใช้เพื่อจัดคิวงานให้กับ Cloud Tasks ต้องการสิทธิ์ เพื่อใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks

    ในตัวอย่างนี้ นี่คือบัญชีบริการเริ่มต้น App Engine บัญชี

ดูเอกสารประกอบ Google Cloud IAM เพื่อดูวิธีการเพิ่มบัญชีบริการเริ่มต้น App Engine ในฐานะผู้ใช้บัญชีบริการเริ่มต้น App Engine

  • ข้อมูลประจำตัวที่ใช้ในการทริกเกอร์ความต้องการฟังก์ชันคิวงาน สิทธิ์cloudfunctions.functions.invoke

    ในตัวอย่างนี้ บัญชีนี้คือบัญชีบริการเริ่มต้น App Engine

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker