Auf dieser Seite finden Sie Beispiele für die Verwendung von Dataflow zur Durchführung von Cloud Firestore-Massenvorgängen in einer Apache Beam- Pipeline . Apache Beam unterstützt einen Connector für Cloud Firestore. Mit diesem Connector können Sie Batch- und Streaming-Vorgänge in Dataflow ausführen.
Wir empfehlen die Verwendung von Dataflow und Apache Beam für große Datenverarbeitungs-Workloads.
Der Cloud Firestore-Connector für Apache Beam ist in Java verfügbar. Weitere Informationen zum Cloud Firestore-Connector finden Sie im Apache Beam SDK für Java .
Bevor Sie beginnen
Bevor Sie diese Seite lesen, sollten Sie mit dem Programmiermodell für Apache Beam vertraut sein.
Um die Beispiele auszuführen, müssen Sie die Dataflow-API aktivieren .Beispiele für Cloud Firestore-Pipelines
Die folgenden Beispiele veranschaulichen eine Pipeline, die Daten schreibt, und eine, die Daten liest und filtert. Sie können diese Beispiele als Ausgangspunkt für Ihre eigenen Pipelines verwenden.
Ausführen der Beispielpipelines
Der Quellcode für die Beispiele ist im GitHub-Repository googleapis/java-firestore verfügbar. Um diese Beispiele auszuführen, laden Sie den Quellcode herunter und sehen Sie sich die README-Datei an.
Beispiel Write
Pipeline
Das folgende Beispiel erstellt Dokumente in der Sammlung cities-beam-sample
:
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
Das Beispiel verwendet die folgenden Argumente, um eine Pipeline zu konfigurieren und auszuführen:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Beispiel Read
Die folgende Beispielpipeline liest Dokumente aus der Sammlung cities-beam-sample
, wendet einen Filter auf Dokumente an, bei denen das Feld „ country
auf USA
festgelegt ist, und gibt die Namen der übereinstimmenden Dokumente zurück.
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
Das Beispiel verwendet die folgenden Argumente, um eine Pipeline zu konfigurieren und auszuführen:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Preisgestaltung
Beim Ausführen einer Cloud Firestore-Arbeitslast in Dataflow fallen Kosten für die Cloud Firestore-Nutzung und die Dataflow-Nutzung an. Die Dataflow-Nutzung wird für die Ressourcen abgerechnet, die Ihre Jobs nutzen. Weitere Informationen finden Sie auf der Preisseite von Dataflow . Informationen zu den Preisen für Cloud Firestore finden Sie auf der Seite „Preise“ .
Was kommt als nächstes
- Ein weiteres Pipeline-Beispiel finden Sie unter Verwenden von Firestore und Apache Beam für die Datenverarbeitung .
- Weitere Informationen zu Dataflow und Apache Beam finden Sie in der Dataflow-Dokumentation .