Dataflow로 데이터 일괄 처리

이 페이지에서는 Dataflow를 사용하여 작업을 수행하는 방법의 예시를 제공합니다. Apache Beam 파이프라인의 일괄 Cloud Firestore 작업 Apache Beam은 Cloud Firestore 커넥터를 지원합니다. 이 커넥터를 사용하여 Dataflow에서 일괄 및 스트리밍 작업을 실행할 수 있습니다.

대규모 데이터 처리 워크로드에는 Dataflow 및 Apache Beam을 사용하는 것이 좋습니다.

Apache Beam용 Cloud Firestore 커넥터는 자바로 제공됩니다. 자세한 내용은 Cloud Firestore 커넥터에 대한 자세한 내용은 Java용 Apache Beam SDK.

시작하기 전에

이 페이지를 읽기 전에 Apache Beam 프로그래밍 모델을 숙지해야 합니다.

샘플을 실행하려면 Dataflow API를 사용 설정해야 합니다.

Cloud Firestore 파이프라인 예시

아래 예시는 데이터를 쓰는 파이프라인과 데이터를 읽고 필터링하는 파이프라인을 보여줍니다. 이 샘플을 자체 파이프라인의 시작점으로 사용할 수 있습니다.

샘플 파이프라인 실행

샘플의 소스 코드는 googleapis/java-firestore GitHub 저장소에서 제공됩니다. 다음 샘플을 실행하려면 소스 코드를 다운로드하고 리드미를 참조하세요.

Write 파이프라인 예시

다음은 cities-beam-sample 컬렉션에 문서를 만드는 예시입니다.

public class ExampleFirestoreBeamWrite {
  private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance();

  public static void main(String[] args) {
    runWrite(args, "cities-beam-sample");
  }

  public static void runWrite(String[] args, String collectionId) {
    // create pipeline options from the passed in arguments
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    RpcQosOptions rpcQosOptions =
        RpcQosOptions.newBuilder()
            .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
            .build();

    // create some writes
    Write write1 =
        Write.newBuilder()
            .setUpdate(
                Document.newBuilder()
                    // resolves to
                    // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC
                    .setName(createDocumentName(collectionId, "NYC"))
                    .putFields("name", Value.newBuilder().setStringValue("New York City").build())
                    .putFields("state", Value.newBuilder().setStringValue("New York").build())
                    .putFields("country", Value.newBuilder().setStringValue("USA").build()))
            .build();

    Write write2 =
        Write.newBuilder()
            .setUpdate(
                Document.newBuilder()
                    // resolves to
                    // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK
                    .setName(createDocumentName(collectionId, "TOK"))
                    .putFields("name", Value.newBuilder().setStringValue("Tokyo").build())
                    .putFields("country", Value.newBuilder().setStringValue("Japan").build())
                    .putFields("capital", Value.newBuilder().setBooleanValue(true).build()))
            .build();

    // batch write the data
    pipeline
        .apply(Create.of(write1, write2))
        .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());

    // run the pipeline
    pipeline.run().waitUntilFinish();
  }

  private static String createDocumentName(String collectionId, String cityDocId) {
    String documentPath =
        String.format(
            "projects/%s/databases/%s/documents",
            FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId());

    return documentPath + "/" + collectionId + "/" + cityDocId;
  }
}

이 예시에서는 다음 인수를 사용하여 파이프라인을 구성하고 실행합니다.

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

Read 파이프라인 예시

다음은 cities-beam-sample 컬렉션에서 문서를 읽고, country 필드가 USA로 설정된 문서에 대해 필터를 적용하고, 일치하는 문서 이름을 반환하는 예시 파이프라인입니다.

public class ExampleFirestoreBeamRead {

  public static void main(String[] args) {
    runRead(args, "cities-beam-sample");
  }

  public static void runRead(String[] args, String collectionId) {
    FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance();

    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    RpcQosOptions rpcQosOptions =
        RpcQosOptions.newBuilder()
            .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
            .build();

    pipeline
        .apply(Create.of(collectionId))
        .apply(
            new FilterDocumentsQuery(
                firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId()))
        .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build())
        .apply(
            ParDo.of(
                // transform each document to its name
                new DoFn<RunQueryResponse, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(Objects.requireNonNull(c.element()).getDocument().getName());
                  }
                }))
        .apply(
            ParDo.of(
                // print the document name
                new DoFn<String, Void>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    System.out.println(c.element());
                  }
                }));

    pipeline.run().waitUntilFinish();
  }

  private static final class FilterDocumentsQuery
      extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> {

    private final String projectId;
    private final String databaseId;

    public FilterDocumentsQuery(String projectId, String databaseId) {
      this.projectId = projectId;
      this.databaseId = databaseId;
    }

    @Override
    public PCollection<RunQueryRequest> expand(PCollection<String> input) {
      return input.apply(
          ParDo.of(
              new DoFn<String, RunQueryRequest>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  // select from collection "cities-collection-<uuid>"
                  StructuredQuery.CollectionSelector collection =
                      StructuredQuery.CollectionSelector.newBuilder()
                          .setCollectionId(Objects.requireNonNull(c.element()))
                          .build();
                  // filter where country is equal to USA
                  StructuredQuery.Filter countryFilter =
                      StructuredQuery.Filter.newBuilder()
                          .setFieldFilter(
                              StructuredQuery.FieldFilter.newBuilder()
                                  .setField(
                                      StructuredQuery.FieldReference.newBuilder()
                                          .setFieldPath("country")
                                          .build())
                                  .setValue(Value.newBuilder().setStringValue("USA").build())
                                  .setOp(StructuredQuery.FieldFilter.Operator.EQUAL))
                          .buildPartial();

                  RunQueryRequest runQueryRequest =
                      RunQueryRequest.newBuilder()
                          .setParent(DocumentRootName.format(projectId, databaseId))
                          .setStructuredQuery(
                              StructuredQuery.newBuilder()
                                  .addFrom(collection)
                                  .setWhere(countryFilter)
                                  .build())
                          .build();
                  c.output(runQueryRequest);
                }
              }));
    }
  }
}

이 예시에서는 다음 인수를 사용하여 파이프라인을 구성하고 실행합니다.

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

가격 책정

Dataflow에서 Cloud Firestore 워크로드를 실행하면 비용이 발생합니다. Cloud Firestore 사용량, Dataflow 사용량 기준 작업에 사용된 리소스 비용이 Dataflow 사용량으로 청구됩니다. 자세한 내용은 Dataflow 가격 책정 페이지 참조하세요. Cloud Firestore 가격은 다음을 참조하세요. 가격 책정 페이지

다음 단계