ฟังก์ชันคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้คุณแอปทำงานที่ใช้เวลานาน ใช้ทรัพยากรมาก หรือมีแบนด์วิดท์จำกัดแบบไม่พร้อมกันนอกลำดับการทำงานของแอปพลิเคชันหลัก
ตัวอย่างเช่น สมมติว่าคุณต้องการสร้างข้อมูลสํารองของไฟล์รูปภาพจํานวนมากที่โฮสต์อยู่ใน API ที่มีขีดจํากัดอัตรา คุณต้องเคารพขีดจำกัดอัตราของ API นั้นเพื่อแสดงให้เห็นว่าคุณเป็นผู้บริโภคที่รับผิดชอบ นอกจากนี้ งานประเภทที่ทำงานต่อเนื่องนานๆ นี้ยังอาจเสี่ยงที่จะล้มเหลวเนื่องจากหมดเวลาและขีดจํากัดของหน่วยความจํา
หากต้องการลดความซับซ้อนนี้ คุณสามารถเขียนฟังก์ชันคิวงานที่จะตั้งค่าตัวเลือกงานพื้นฐาน เช่น scheduleTime
และ dispatchDeadline
จากนั้นส่งฟังก์ชันไปยังคิวใน Cloud Tasks สภาพแวดล้อม Cloud Tasks
ได้รับการออกแบบมาโดยเฉพาะเพื่อให้การควบคุมความแออัดและนโยบายการลองอีกครั้งมีประสิทธิภาพสำหรับการดำเนินการประเภทเหล่านี้
Firebase SDK สําหรับ Cloud Functions for Firebase v3.20.1 ขึ้นไปทํางานร่วมกับ Firebase Admin SDK v10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน
การใช้ฟังก์ชันคิวงานกับ Firebase อาจส่งผลให้มีการเรียกเก็บค่าบริการCloud Tasksการประมวลผล ดูข้อมูลเพิ่มเติมเกี่ยวกับราคา Cloud Tasks
สร้างฟังก์ชันคิวงาน
หากต้องการใช้ฟังก์ชันคิวงาน ให้ทําตามเวิร์กโฟลว์นี้
- เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สําหรับ Cloud Functions
- ทดสอบฟังก์ชันโดยเรียกใช้ด้วยคำขอ HTTP
- ติดตั้งใช้งานฟังก์ชันด้วย Firebase CLI เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks ด้วยตัวเลือก (การจำกัดอัตราการส่งและลองอีกครั้ง) ที่ระบุไว้ในซอร์สโค้ด
- เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ โดยส่งพารามิเตอร์เพื่อตั้งค่ากำหนดการเรียกใช้ หากจำเป็น ซึ่งทำได้โดยเขียนโค้ดโดยใช้ Admin SDK และนำไปใช้งานใน Cloud Functions for Firebase
เขียนฟังก์ชันคิวงาน
ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่สร้างบริการสํารองข้อมูลรูปภาพทั้งหมดจากรูปภาพดาราศาสตร์ประจำวันของ NASA หากต้องการเริ่มต้นใช้งาน ให้นําเข้าโมดูลที่จําเป็น ดังนี้
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
ใช้ onTaskDispatched
หรือ on_task_dispatched
สำหรับฟังก์ชันคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองอีกครั้งต่อคิวและการกําหนดค่าการจํากัดอัตรา
กำหนดค่าฟังก์ชันคิวงาน
ฟังก์ชันคิวงานมาพร้อมกับการตั้งค่าการกําหนดค่าที่มีประสิทธิภาพชุดหนึ่งเพื่อควบคุมขีดจํากัดอัตราและลักษณะการลองอีกครั้งของคิวงานอย่างแม่นยํา
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: ระบบจะลองดำเนินการแต่ละงานในคิวงานใหม่โดยอัตโนมัติสูงสุด 5 ครั้ง ซึ่งจะช่วยบรรเทาข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดของเครือข่ายหรือการหยุดชะงักชั่วคราวของบริการภายนอกที่ต้องอาศัยretryConfig.minBackoffSeconds=60
: ระบบจะลองทำแต่ละงานอีกครั้งโดยเว้นระยะเวลาอย่างน้อย 60 วินาทีจากแต่ละครั้งที่ลอง วิธีนี้จะช่วยให้มีบัฟเฟอร์ขนาดใหญ่ระหว่างแต่ละครั้งที่พยายามเพื่อไม่ให้พยายามใหม่ 5 ครั้งจนหมดเร็วเกินไปrateLimits.maxConcurrentDispatch=6
: ระบบจะส่งงานพร้อมกันได้สูงสุด 6 งานในช่วงเวลาหนึ่งๆ วิธีนี้ช่วยให้มั่นใจได้ว่าจะมีคําขอไปยังฟังก์ชันพื้นฐานอย่างต่อเนื่อง และช่วยจํากัดจํานวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start
ทดสอบฟังก์ชันคิวงาน
ในกรณีส่วนใหญ่ Cloud Functionsโปรแกรมจำลองเป็นวิธีที่ดีที่สุดในการทดสอบฟังก์ชันคิวงาน ดูเอกสารประกอบของ Emulator Suite เพื่อดูวิธีเครื่องมือวัดแอปสำหรับการจําลองฟังก์ชันคิวงาน
นอกจากนี้ ระบบจะแสดง functions_sdk สำหรับคิวงานเป็นฟังก์ชัน HTTP แบบง่ายใน Firebase Local Emulator Suite คุณสามารถทดสอบฟังก์ชันงานจำลองได้โดยส่งคำขอ HTTP POST ที่มีเพย์โหลดข้อมูล JSON ดังนี้
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
ทำให้ฟังก์ชันคิวงานใช้งานได้
ติดตั้งใช้งานฟังก์ชันคิวงานโดยใช้ Firebase CLI
$ firebase deploy --only functions:backupapod
เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราการส่งและลองอีกครั้ง) ที่ระบุไว้ในซอร์สโค้ด
หากพบข้อผิดพลาดเกี่ยวกับสิทธิ์เมื่อทําการติดตั้งใช้งานฟังก์ชัน ให้ตรวจสอบว่าได้กําหนดบทบาท IAM ที่เหมาะสมให้กับผู้ใช้ที่เรียกใช้คําสั่งการติดตั้งใช้งานแล้ว
จัดคิวฟังก์ชันคิวงาน
คุณสามารถจัดคิวฟังก์ชันคิวงานใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions for Firebase โดยใช้ Firebase Admin SDK สำหรับ Node.js หรือไลบรารี Google Cloud สำหรับ Python หากคุณเพิ่งเริ่มใช้ Admin SDK โปรดดูหัวข้อเพิ่ม Firebase ลงในเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน
ขั้นตอนทั่วไปจะสร้างงานใหม่ จัดคิวงานใน Cloud Tasks และกำหนดค่าสำหรับงาน ดังนี้
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
โค้ดตัวอย่างจะพยายามกระจายการดําเนินการของงานโดยเชื่อมโยงความล่าช้าเป็น N นาทีสําหรับงานลำดับที่ N ซึ่งหมายความว่าจะทริกเกอร์งานประมาณ 1 งาน/นาที โปรดทราบว่าคุณยังใช้
scheduleTime
(Node.js) หรือschedule_time
(Python) ได้ด้วยหากต้องการให้ Cloud Tasks ทริกเกอร์งานในเวลาที่เจาะจงโค้ดตัวอย่างจะกำหนดเวลาสูงสุดที่ Cloud Tasks จะรอให้งานเสร็จสมบูรณ์ Cloud Tasks จะลองดำเนินการกับงานอีกครั้งตามการกำหนดค่าการลองอีกครั้งของคิว หรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่างนี้ ระบบกําหนดค่าคิวให้ลองทํางานอีกครั้งสูงสุด 5 ครั้ง แต่ระบบจะยกเลิกงานโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงการพยายามทํางานอีกครั้ง) ใช้เวลานานกว่า 5 นาที
ดึงข้อมูลและใส่ URI เป้าหมาย
เนื่องจาก Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์คําขอไปยังฟังก์ชันคิวงานที่เกี่ยวข้อง คุณจึงต้องระบุ URL ของ Cloud Run ของฟังก์ชันเมื่อจัดคิวงาน เราขอแนะนำให้คุณเรียกข้อมูล URL ของฟังก์ชันแบบเป็นโปรแกรมตามที่แสดงด้านล่าง
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
การแก้ปัญหา
Cloud Tasks">เปิดการบันทึก Cloud Tasks
บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น ระบบจะปิดบันทึกจาก Cloud Tasks เนื่องจากมีปริมาณบันทึกจำนวนมากที่อาจสร้างขึ้นในโปรเจ็กต์ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่องขณะพัฒนาและแก้ไขข้อบกพร่องฟังก์ชันคิวงาน ดูหัวข้อการเปิดการบันทึก
สิทธิ์ IAM
คุณอาจเห็นข้อผิดพลาด PERMISSION DENIED
เมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่าโปรเจ็กต์มีการเชื่อมโยง IAM ต่อไปนี้
ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ IAM
cloudtasks.tasks.create
ในตัวอย่างนี้ App Engine คือบัญชีบริการเริ่มต้น
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks
ในตัวอย่างนี้ App Engine คือบัญชีบริการเริ่มต้น
ดูเอกสารประกอบของ Google Cloud IAM เพื่อดูวิธีการเพิ่มบัญชีบริการเริ่มต้น App Engine ในฐานะผู้ใช้บัญชีบริการเริ่มต้น App Engine
ข้อมูลประจำตัวที่ใช้เรียกใช้ฟังก์ชันคิวงานต้องมีสิทธิ์
cloudfunctions.functions.invoke
ในตัวอย่างนี้คือบัญชีบริการเริ่มต้นของ App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker