Dataflow로 데이터 일괄 처리

이 페이지에서는 Dataflow를 사용하여 Apache Beam 파이프라인에서 일괄 Cloud Firestore 작업을 수행하는 방법에 대한 예시를 보여줍니다. Apache Beam은 Cloud Firestore 커넥터를 지원합니다. 이 커넥터를 사용하여 Dataflow에서 일괄 및 스트리밍 작업을 실행할 수 있습니다.

대규모 데이터 처리 워크로드에는 Dataflow 및 Apache Beam을 사용하는 것이 좋습니다.

Apache Beam용 Cloud Firestore 커넥터는 자바로 제공됩니다. Cloud Firestore 커넥터에 대한 자세한 내용은 Java용 Apache Beam SDK를 참조하세요.

시작하기 전에

이 페이지를 읽기 전에 Apache Beam 프로그래밍 모델을 숙지해야 합니다.

샘플을 실행하려면 Dataflow API를 사용 설정해야 합니다.

예시 Cloud Firestore 파이프라인

아래 예시는 데이터를 쓰는 파이프라인과 데이터를 읽고 필터링하는 파이프라인을 보여줍니다. 이 샘플을 자체 파이프라인의 시작점으로 사용할 수 있습니다.

샘플 파이프라인 실행

샘플의 소스 코드는 googleapis/java-firestore GitHub 저장소에서 제공됩니다. 다음 샘플을 실행하려면 소스 코드를 다운로드하고 README를 참조하세요.

Write 파이프라인 예시

다음은 cities-beam-sample 컬렉션에 문서를 만드는 예시입니다.

public class ExampleFirestoreBeamWrite {
  private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance();

  public static void main(String[] args) {
    runWrite(args, "cities-beam-sample");
  }

  public static void runWrite(String[] args, String collectionId) {
    // create pipeline options from the passed in arguments
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    RpcQosOptions rpcQosOptions =
        RpcQosOptions.newBuilder()
            .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
            .build();

    // create some writes
    Write write1 =
        Write.newBuilder()
            .setUpdate(
                Document.newBuilder()
                    // resolves to
                    // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC
                    .setName(createDocumentName(collectionId, "NYC"))
                    .putFields("name", Value.newBuilder().setStringValue("New York City").build())
                    .putFields("state", Value.newBuilder().setStringValue("New York").build())
                    .putFields("country", Value.newBuilder().setStringValue("USA").build()))
            .build();

    Write write2 =
        Write.newBuilder()
            .setUpdate(
                Document.newBuilder()
                    // resolves to
                    // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK
                    .setName(createDocumentName(collectionId, "TOK"))
                    .putFields("name", Value.newBuilder().setStringValue("Tokyo").build())
                    .putFields("country", Value.newBuilder().setStringValue("Japan").build())
                    .putFields("capital", Value.newBuilder().setBooleanValue(true).build()))
            .build();

    // batch write the data
    pipeline
        .apply(Create.of(write1, write2))
        .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());

    // run the pipeline
    pipeline.run().waitUntilFinish();
  }

  private static String createDocumentName(String collectionId, String cityDocId) {
    String documentPath =
        String.format(
            "projects/%s/databases/%s/documents",
            FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId());

    return documentPath + "/" + collectionId + "/" + cityDocId;
  }
}

이 예시에서는 다음 인수를 사용하여 파이프라인을 구성하고 실행합니다.

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

Read 파이프라인 예시

다음은 cities-beam-sample 컬렉션에서 문서를 읽고, country 필드가 USA로 설정된 문서에 대해 필터를 적용하고, 일치하는 문서 이름을 반환하는 예시 파이프라인입니다.

public class ExampleFirestoreBeamRead {

  public static void main(String[] args) {
    runRead(args, "cities-beam-sample");
  }

  public static void runRead(String[] args, String collectionId) {
    FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance();

    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    RpcQosOptions rpcQosOptions =
        RpcQosOptions.newBuilder()
            .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
            .build();

    pipeline
        .apply(Create.of(collectionId))
        .apply(
            new FilterDocumentsQuery(
                firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId()))
        .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build())
        .apply(
            ParDo.of(
                // transform each document to its name
                new DoFn<RunQueryResponse, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(Objects.requireNonNull(c.element()).getDocument().getName());
                  }
                }))
        .apply(
            ParDo.of(
                // print the document name
                new DoFn<String, Void>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    System.out.println(c.element());
                  }
                }));

    pipeline.run().waitUntilFinish();
  }

  private static final class FilterDocumentsQuery
      extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> {

    private final String projectId;
    private final String databaseId;

    public FilterDocumentsQuery(String projectId, String databaseId) {
      this.projectId = projectId;
      this.databaseId = databaseId;
    }

    @Override
    public PCollection<RunQueryRequest> expand(PCollection<String> input) {
      return input.apply(
          ParDo.of(
              new DoFn<String, RunQueryRequest>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  // select from collection "cities-collection-<uuid>"
                  StructuredQuery.CollectionSelector collection =
                      StructuredQuery.CollectionSelector.newBuilder()
                          .setCollectionId(Objects.requireNonNull(c.element()))
                          .build();
                  // filter where country is equal to USA
                  StructuredQuery.Filter countryFilter =
                      StructuredQuery.Filter.newBuilder()
                          .setFieldFilter(
                              StructuredQuery.FieldFilter.newBuilder()
                                  .setField(
                                      StructuredQuery.FieldReference.newBuilder()
                                          .setFieldPath("country")
                                          .build())
                                  .setValue(Value.newBuilder().setStringValue("USA").build())
                                  .setOp(StructuredQuery.FieldFilter.Operator.EQUAL))
                          .buildPartial();

                  RunQueryRequest runQueryRequest =
                      RunQueryRequest.newBuilder()
                          .setParent(DocumentRootName.format(projectId, databaseId))
                          .setStructuredQuery(
                              StructuredQuery.newBuilder()
                                  .addFrom(collection)
                                  .setWhere(countryFilter)
                                  .build())
                          .build();
                  c.output(runQueryRequest);
                }
              }));
    }
  }
}

이 예시에서는 다음 인수를 사용하여 파이프라인을 구성하고 실행합니다.

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

가격 책정

Dataflow에서 Cloud Firestore 워크로드를 실행하면 Cloud Firestore 사용량 및 Dataflow 사용량에 대한 비용이 발생합니다. 작업에 사용된 리소스 비용이 Dataflow 사용량으로 청구됩니다. 자세한 내용은 Dataflow 가격 책정 페이지를 참조하세요. Cloud Firestore 가격은 가격 책정 페이지를 참조하세요.

다음 단계