Cloud Tasks로 큐에 함수 추가


태스크 큐 함수는 Google Cloud Tasks를 활용하여 앱이 기본 애플리케이션 흐름 외부에서 시간이 많이 걸리거나, 리소스를 많이 소모하거나, 대역폭이 제한적인 태스크를 비동기식으로 실행할 수 있게 해 줍니다.

예를 들어 현재 비율 제한이 있는 API에서 호스팅되는 대규모 이미지 파일 세트의 백업을 만든다고 가정해 보겠습니다. 책임감 있는 API 소비자가 되려면 비율 제한을 준수해야 합니다. 또한 이러한 종류의 장기 실행 작업은 시간 제한 및 메모리 한도로 인해 실패에 취약할 수 있습니다.

이러한 복잡성을 완화하기 위해 기본 IP 주소를 설정하는 태스크 큐 함수를 작성할 수 있습니다. scheduleTime, dispatchDeadline 같은 작업 옵션을 지정한 다음 함수를 Cloud Tasks의 큐에 추가합니다. Cloud Tasks 효과적인 혼잡 제어와 대응이 가능하도록 특별히 설계되고 재시도 정책을 참조하세요

Cloud Functions for Firebase용 Firebase SDK v3.20.1 이상은 상호 운용됨 Firebase Admin SDK v10.2.0 이상에서 태스크 큐 함수를 지원합니다.

Firebase에서 태스크 큐 함수를 사용하면 다음에 대한 요금이 부과될 수 있습니다. Cloud Tasks 처리 중. 자세한 내용은 Cloud Tasks 가격 책정을 참고하세요.

태스크 큐 함수 만들기

태스크 큐 함수를 사용하려면 다음 워크플로를 따르세요.

  1. Cloud FunctionsFirebase SDK를 사용하여 태스크 큐 함수를 작성합니다.
  2. HTTP 요청으로 트리거하여 함수를 테스트합니다.
  3. Firebase CLI를 사용하여 함수를 배포합니다. 태스크를 배포할 때 큐 함수를 처음 만들 때 CLI는 Cloud Tasks의 태스크 큐 소스 코드에 지정된 옵션 (비율 제한 및 재시도)을 사용합니다.
  4. 새로 생성된 태스크 큐에 태스크를 추가하고 필요한 경우 매개변수를 전달하여 실행 일정을 설정합니다. Admin SDK를 사용하여 코드를 작성하고 Cloud Functions for Firebase에 배포하면 됩니다.

태스크 큐 함수 작성

이 섹션에 나와 있는 코드 샘플은 NASA Astronomy Picture of the Day 사이트의 모든 이미지를 백업하는 서비스 설정 앱을 기반으로 합니다. 시작하려면 필수 모듈을 가져옵니다.

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

태스크 큐 함수에는 onTaskDispatched 또는 on_task_dispatched를 사용합니다. 태스크 큐 함수를 작성할 때 큐별 재시도와 비율 제한 구성을 설정할 수 있습니다.

태스크 큐 함수 구성

태스크 큐 함수에는 태스크 큐의 비율 제한과 재시도 동작을 정밀하게 제어하는 강력한 구성 설정 모음이 함께 제공됩니다.

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: 태스크 큐의 각 태스크가 최대 5회까지 자동으로 재시도됩니다. 이렇게 하면 네트워크 오류 또는 종속된 외부 서비스의 일시적인 중단과 같은 일시적인 오류를 완화할 수 있습니다.

  • retryConfig.minBackoffSeconds=60: 각 시도마다 최소 60초 간격으로 태스크가 재시도됩니다. 이렇게 하면 각 시도 간에 긴 버퍼가 제공되므로 재시도 횟수 5회가 그다지 빨리 소진되지 않습니다.

  • rateLimits.maxConcurrentDispatch=6: 지정된 시간에 최대 6개의 태스크가 디스패치됩니다. 이렇게 하면 기본 함수에 대한 요청이 꾸준히 이루어지고 활성 인스턴스 및 콜드 스타트 횟수를 줄일 수 있습니다.

태스크 큐 함수 테스트

대부분의 경우 Cloud Functions 에뮬레이터가 작업을 테스트하는 가장 좋은 방법입니다. 할 수 있습니다. 에뮬레이터 도구 모음 문서를 참고하여 태스크 큐 함수 에뮬레이션을 위해 앱 계측

또한 작업 대기열 functions_sdk는 Firebase Local Emulator Suite의 HTTP 함수 HTTP POST를 전송하여 에뮬레이션된 작업 함수를 테스트할 수 있습니다. JSON 데이터 페이로드로 요청할 수 있습니다

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

태스크 큐 함수 배포

Firebase CLI를 사용하여 태스크 큐 함수를 배포합니다.

$ firebase deploy --only functions:backupapod

태스크 큐 함수를 처음 배포할 때 CLI가 옵션이 있는 Cloud Tasks의 태스크 큐 (비율 제한 및 재시도) 소스 코드에 지정되어 있습니다

함수를 배포할 때 권한 오류가 발생하면 배포 명령어를 실행하는 사용자에게 적절한 IAM 역할이 할당되었는지 확인합니다.

큐에 태스크 큐 함수 추가

태스크 큐 함수는 신뢰할 수 있는 Cloud Storage의 Cloud TasksFirebase Admin SDK를 사용하는 Cloud Functions for Firebase 같은 서버 환경 Node.js 또는 Python용 Google Cloud 라이브러리입니다. Admin SDK를 처음 사용하는 경우 다음을 참고하세요. 시작하려면 서버에 Firebase를 추가하세요.

일반적인 흐름은 새 작업을 만들어 Cloud Tasks로 설정하고 작업의 구성을 설정합니다.

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • 샘플 코드는 N 번째 태스크의 N분 지연을 연결하여 태스크 실행을 분산하려고 시도합니다. 즉, 분당 약 1개의 태스크가 트리거됩니다. 또한 원하는 경우 scheduleTime (Node.js) 또는 schedule_time (Python) Cloud Tasks: 특정 시간에 작업을 트리거합니다.

  • 샘플 코드는 최대 시간을 설정하고 Cloud Tasks님이 대기합니다. 작업을 완료할 때까지 기다려야 합니다 Cloud Tasks는 큐의 재시도 구성에 따라 또는 이 기한에 도달할 때까지 태스크를 재시도합니다. 샘플에는 태스크를 최대 5회까지 재시도하도록 큐가 구성되어 있지만, 전체 프로세스(재시도 포함)가 5분 넘게 걸리는 경우에는 태스크가 자동으로 취소됩니다.

대상 URI 검색 및 포함

Cloud Tasks에서 인증을 위한 인증 토큰을 만드는 방식으로 인해 요청을 보낼 때 기본 태스크 큐 함수에 태스크를 큐에 추가할 때 함수의 Cloud Run URL입니다. 아래에 나온 것처럼 함수의 URL을 프로그래매틱 방식으로 검색하는 것이 좋습니다.

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

문제 해결

Cloud Tasks">Cloud Tasks 로깅 사용 설정

Cloud Tasks의 로그에는 태스크와 관련된 요청의 상태입니다. 기본적으로 로그 볼륨이 크기 때문에 Cloud Tasks이(가) 사용 중지되었습니다. 프로젝트에서 생성할 수 있습니다. 태스크 큐 함수를 적극적으로 개발하고 디버깅하는 동안에는 디버그 로그를 사용 설정하는 것이 좋습니다. 자세한 내용은 사용 설정 중 로깅을 사용합니다.

IAM 권한

태스크를 큐에 추가할 때 또는PERMISSION DENIED Cloud Tasks는 태스크 큐 함수를 호출하려고 시도합니다. 프로젝트에 다음 IAM 바인딩이 있는지 확인하세요.

  • 태스크를 Cloud Tasks의 필요에 따라 큐에 추가하는 데 사용되는 ID cloudtasks.tasks.create IAM 권한입니다.

    샘플에서는 App Engine 기본 서비스 계정입니다.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 태스크를 Cloud Tasks의 큐에 추가하는 데 사용되는 ID에 권한이 필요합니다. Cloud Tasks에서 태스크와 연결된 서비스 계정 사용

    샘플에서는 App Engine 기본 서비스 계정입니다.

Google Cloud IAM 문서 참조 App Engine 기본 서비스 계정을 추가하는 방법에 대한 안내 App Engine 기본 서비스 계정의 사용자로 사용할 수 있습니다.

  • 태스크 큐 함수를 트리거하는 데 사용되는 ID에는 cloudfunctions.functions.invoke 권한이 필요합니다.

    샘플에서는 App Engine 기본 서비스 계정입니다.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker