/* * Copyright (C) 2019 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package com.google.cloud.teleport.v2.templates; import com.google.auto.value.AutoValue; import com.google.cloud.teleport.metadata.MultiTemplate; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.templates.PubSubToMongoDB.Options; import com.google.cloud.teleport.v2.transforms.ErrorConverters; import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer; import com.google.cloud.teleport.v2.utils.SchemaUtils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonSyntaxException; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.mongodb.MongoDbIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The {@link PubSubToMongoDB} pipeline is a streaming pipeline which ingests data in JSON format * from PubSub, applies a Javascript UDF if provided and inserts resulting records as Bson Document * in MongoDB. If the element fails to be processed then it is written to a deadletter table in * BigQuery. * * <p><b>Pipeline Requirements</b> * * <ul> * <li>The PubSub topic and subscriptions exist * <li>The MongoDB is up and running * </ul> * * <p>Check out <a * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-to-mongodb/README_Cloud_PubSub_to_MongoDB.md">README</a> * for instructions on how to use or modify this template. */ @MultiTemplate({ @Template( name = "Cloud_PubSub_to_MongoDB", category = TemplateCategory.STREAMING, displayName = "Pub/Sub to MongoDB", description = "The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. " + "If required, this pipeline supports additional transforms that can be included using a JavaScript user-defined function (UDF). " + "Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. " + "If a table for unprocessed records does not exist prior to execution, the pipeline automatically creates this table.", skipOptions = { "pythonExternalTextTransformGcsPath", "pythonExternalTextTransformFunctionName" }, optionsClass = Options.class, flexContainerName = "pubsub-to-mongodb", documentation = "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-mongodb", contactInformation = "https://cloud.google.com/support", preview = true, requirements = { "The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.", "The MongoDB cluster must exist and should be accessible from the Dataflow worker machines." }, streaming = true, supportsAtLeastOnce = true), @Template( name = "Cloud_PubSub_to_MongoDB_Xlang", category = TemplateCategory.STREAMING, type = Template.TemplateType.XLANG, displayName = "Pub/Sub to MongoDB with Python UDFs", description = "The Pub/Sub to MongoDB template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them to MongoDB as documents. " + "If required, this pipeline supports additional transforms that can be included using a Python user-defined function (UDF). " + "Any errors occurred due to schema mismatch, malformed JSON, or while executing transforms are recorded in a BigQuery table for unprocessed messages along with input message. " + "If a table for unprocessed records does not exist prior to execution, the pipeline automatically creates this table.", skipOptions = { "javascriptTextTransformGcsPath", "javascriptTextTransformFunctionName", "javascriptTextTransformReloadIntervalMinutes" }, optionsClass = Options.class, flexContainerName = "pubsub-to-mongodb-xlang", documentation = "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-mongodb", contactInformation = "https://cloud.google.com/support", preview = true, requirements = { "The Pub/Sub Subscription must exist and the messages must be encoded in a valid JSON format.", "The MongoDB cluster must exist and should be accessible from the Dataflow worker machines." }, streaming = true, supportsAtLeastOnce = true) }) public class PubSubToMongoDB { /** * Options supported by {@link PubSubToMongoDB} * * <p>Inherits standard configuration options. */ /** The tag for the main output of the json transformation. */ public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT = new TupleTag<FailsafeElement<PubsubMessage, String>>() {}; /** The tag for the dead-letter output of the json to table row transform. */ public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT = new TupleTag<FailsafeElement<PubsubMessage, String>>() {}; /** Pubsub message/string coder for pipeline. */ public static final FailsafeElementCoder<PubsubMessage, String> CODER = FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of()); /** String/String Coder for FailsafeElement. */ public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER = FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); /** The log to output status messages to. */ private static final Logger LOG = LoggerFactory.getLogger(PubSubToMongoDB.class); /** * The {@link Options} class provides the custom execution options passed by the executor at the * command-line. * * <p>Inherits standard configuration options, options from {@link * PythonExternalTextTransformer.PythonExternalTextTransformerOptions}. */ public interface Options extends PythonExternalTextTransformer.PythonExternalTextTransformerOptions, PipelineOptions { @TemplateParameter.PubsubSubscription( order = 1, groupName = "Source", description = "Pub/Sub input subscription", helpText = "Name of the Pub/Sub subscription.", example = "projects/your-project-id/subscriptions/your-subscription-name") @Validation.Required String getInputSubscription(); void setInputSubscription(String inputSubscription); @TemplateParameter.Text( order = 2, groupName = "Target", description = "MongoDB Connection URI", helpText = "Comma separated list of MongoDB servers.", example = "host1:port,host2:port,host3:port") @Validation.Required String getMongoDBUri(); void setMongoDBUri(String mongoDBUri); @TemplateParameter.Text( order = 3, groupName = "Target", description = "MongoDB Database", helpText = "Database in MongoDB to store the collection.", example = "my-db") @Validation.Required String getDatabase(); void setDatabase(String database); @TemplateParameter.Text( order = 4, groupName = "Target", description = "MongoDB collection", helpText = "Name of the collection in the MongoDB database.", example = "my-collection") @Validation.Required String getCollection(); void setCollection(String collection); @TemplateParameter.BigQueryTable( order = 5, description = "The dead-letter table name to output failed messages to BigQuery", helpText = "The BigQuery table that stores messages caused by failures, such as mismatched schema, malformed JSON, and so on.", example = "your-project-id:your-dataset.your-table-name") @Validation.Required String getDeadletterTable(); void setDeadletterTable(String deadletterTable); @TemplateParameter.Long( order = 6, optional = true, description = "Batch Size", helpText = "Batch size used for batch insertion of documents into MongoDB.") @Default.Long(1000) Long getBatchSize(); void setBatchSize(Long batchSize); @TemplateParameter.Long( order = 7, optional = true, description = "Batch Size in Bytes", helpText = "Batch size in bytes.") @Default.Long(5242880) Long getBatchSizeBytes(); void setBatchSizeBytes(Long batchSizeBytes); @TemplateParameter.Integer( order = 8, optional = true, description = "Max Connection idle time", helpText = "Maximum idle time allowed in seconds before connection timeout occurs.") @Default.Integer(60000) int getMaxConnectionIdleTime(); void setMaxConnectionIdleTime(int maxConnectionIdleTime); @TemplateParameter.Boolean( order = 9, optional = true, description = "SSL Enabled", helpText = "Boolean value indicating whether the connection to MongoDB is SSL enabled.") @Default.Boolean(true) Boolean getSslEnabled(); void setSslEnabled(Boolean sslEnabled); @TemplateParameter.Boolean( order = 10, optional = true, description = "Ignore SSL Certificate", helpText = "Boolean value indicating whether to ignore the SSL certificate.") @Default.Boolean(true) Boolean getIgnoreSSLCertificate(); void setIgnoreSSLCertificate(Boolean ignoreSSLCertificate); @TemplateParameter.Boolean( order = 11, optional = true, description = "withOrdered", helpText = "Boolean value enabling ordered bulk insertions into MongoDB.") @Default.Boolean(true) Boolean getWithOrdered(); void setWithOrdered(Boolean withOrdered); @TemplateParameter.Boolean( order = 12, optional = true, description = "withSSLInvalidHostNameAllowed", helpText = "Boolean value indicating whether an invalid hostname is allowed for the SSL connection.") @Default.Boolean(true) Boolean getWithSSLInvalidHostNameAllowed(); void setWithSSLInvalidHostNameAllowed(Boolean withSSLInvalidHostNameAllowed); } /** DoFn that will parse the given string elements as Bson Documents. */ private static class ParseAsDocumentsFn extends DoFn<String, Document> { @ProcessElement public void processElement(ProcessContext context) { context.output(Document.parse(context.element())); } } /** * Main entry point for executing the pipeline. * * @param args The command-line arguments to the pipeline. */ public static void main(String[] args) { UncaughtExceptionLogger.register(); // Parse the user options passed from the command-line. Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); run(options); } /** * Runs the pipeline with the supplied options. * * @param options The execution parameters to the pipeline. * @return The result of the pipeline execution. */ public static PipelineResult run(Options options) { // Create the pipeline Pipeline pipeline = Pipeline.create(options); // Register the coders for pipeline CoderRegistry coderRegistry = pipeline.getCoderRegistry(); coderRegistry.registerCoderForType( FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER); coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER); boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath()); boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath()); if (usePythonUdf && useJavascriptUdf) { throw new IllegalArgumentException( "Either javascript or Python gcs path must be provided, but not both."); } /* * Steps: 1) Read PubSubMessage with attributes from input PubSub subscription. * 2) Apply Javascript or Python UDF if provided. * 3) Write to MongoDB * */ LOG.info("Reading from subscription: " + options.getInputSubscription()); PCollection<PubsubMessage> readMessagesFromPubsub = pipeline /* * Step #1: Read from a PubSub subscription. */ .apply( "Read PubSub Subscription", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); PCollectionTuple convertedPubsubMessages; if (usePythonUdf) { convertedPubsubMessages = readMessagesFromPubsub /* * Step #2: Apply Python Transform and transform, if provided and transform * the PubsubMessages into Json documents. */ .apply( "Apply Python UDF", PubSubMessageToJsonDocument.newBuilder() .setPythonExternalTextTransformFunctionName( options.getPythonExternalTextTransformFunctionName()) .setPythonExternalTextTransformGcsPath( options.getPythonExternalTextTransformGcsPath()) .build()); } else { convertedPubsubMessages = readMessagesFromPubsub /* * Step #2: Apply Javascript Transform and transform, if provided and transform * the PubsubMessages into Json documents. */ .apply( "Apply Javascript UDF", PubSubMessageToJsonDocument.newBuilder() .setJavascriptTextTransformFunctionName( options.getJavascriptTextTransformFunctionName()) .setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath()) .setJavascriptTextTransformReloadIntervalMinutes( options.getJavascriptTextTransformReloadIntervalMinutes()) .build()); } /* * Step #3a: Write Json documents into MongoDB using {@link MongoDbIO.write}. */ convertedPubsubMessages .get(TRANSFORM_OUT) .apply( "Get Json Documents", MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)) .apply("Parse as BSON Document", ParDo.of(new ParseAsDocumentsFn())) .apply( "Put to MongoDB", MongoDbIO.write() .withBatchSize(options.getBatchSize()) .withUri(prefixMongoDb(options.getMongoDBUri())) .withDatabase(options.getDatabase()) .withCollection(options.getCollection()) .withIgnoreSSLCertificate(options.getIgnoreSSLCertificate()) .withMaxConnectionIdleTime(options.getMaxConnectionIdleTime()) .withOrdered(options.getWithOrdered()) .withSSLEnabled(options.getSslEnabled()) .withSSLInvalidHostNameAllowed(options.getWithSSLInvalidHostNameAllowed())); /* * Step 3b: Write elements that failed processing to deadletter table via {@link BigQueryIO}. */ convertedPubsubMessages .get(TRANSFORM_DEADLETTER_OUT) .apply( "Write Transform Failures To BigQuery", ErrorConverters.WritePubsubMessageErrors.newBuilder() .setErrorRecordsTable(options.getDeadletterTable()) .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA) .build()); // Execute the pipeline and return the result. return pipeline.run(); } /** Add the MongoDB protocol prefix only if the given uri doesn't have it. */ private static String prefixMongoDb(String mongoDBUri) { if (mongoDBUri.startsWith("mongodb://") || mongoDBUri.startsWith("mongodb+srv://")) { return mongoDBUri; } return String.format("mongodb://%s", mongoDBUri); } /** * The {@link PubSubMessageToJsonDocument} class is a {@link PTransform} which transforms incoming * {@link PubsubMessage} objects into JSON objects for insertion into MongoDB while applying an * optional UDF to the input. The executions of the UDF and transformation to Json objects is done * in a fail-safe way by wrapping the element with it's original payload inside the {@link * FailsafeElement} class. The {@link PubSubMessageToJsonDocument} transform will output a {@link * PCollectionTuple} which contains all output and dead-letter {@link PCollection}. * * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}: * * <ul> * <li>{@link PubSubToMongoDB#TRANSFORM_OUT} - Contains all records successfully converted to * JSON objects. * <li>{@link PubSubToMongoDB#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement} * records which couldn't be converted to table rows. * </ul> */ @AutoValue public abstract static class PubSubMessageToJsonDocument extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> { public static Builder newBuilder() { return new AutoValue_PubSubToMongoDB_PubSubMessageToJsonDocument.Builder(); } @Nullable public abstract String javascriptTextTransformGcsPath(); @Nullable public abstract String javascriptTextTransformFunctionName(); @Nullable public abstract String pythonExternalTextTransformGcsPath(); @Nullable public abstract String pythonExternalTextTransformFunctionName(); @Nullable public abstract Integer javascriptTextTransformReloadIntervalMinutes(); @Override public PCollectionTuple expand(PCollection<PubsubMessage> input) { // Check for python UDF first. If the pipeline is not using Python UDF, proceed as normal. if (pythonExternalTextTransformGcsPath() != null) { PCollection<Row> failsafeElements = input // Map the incoming messages into FailsafeElements, so we can recover from failures // across multiple transforms. .apply( "MapToRecord", PythonExternalTextTransformer.FailsafeRowPythonExternalUdf .pubSubMappingFunction()) .setRowSchema( PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA); return failsafeElements .apply( "InvokeUDF", PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder() .setFileSystemPath(pythonExternalTextTransformGcsPath()) .setFunctionName(pythonExternalTextTransformFunctionName()) .build()) .apply( "MapRowsToFailsafeElements", ParDo.of( new PythonExternalTextTransformer.RowToPubSubFailsafeElementFn( TRANSFORM_OUT, TRANSFORM_DEADLETTER_OUT)) .withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT))); } // If we don't have Python UDF, we proceed as normal checking for Javascript UDF. // Map the incoming messages into FailsafeElements so we can recover from failures // across multiple transforms. PCollection<FailsafeElement<PubsubMessage, String>> failsafeElements = input.apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn())); // If a Udf is supplied then use it to parse the PubSubMessages. if (javascriptTextTransformGcsPath() != null) { return failsafeElements.apply( "InvokeUDF", JavascriptTextTransformer.FailsafeJavascriptUdf.<PubsubMessage>newBuilder() .setFileSystemPath(javascriptTextTransformGcsPath()) .setFunctionName(javascriptTextTransformFunctionName()) .setReloadIntervalMinutes(javascriptTextTransformReloadIntervalMinutes()) .setSuccessTag(TRANSFORM_OUT) .setFailureTag(TRANSFORM_DEADLETTER_OUT) .build()); } else { return failsafeElements.apply( "ProcessPubSubMessages", ParDo.of(new ProcessFailsafePubSubFn()) .withOutputTags(TRANSFORM_OUT, TupleTagList.of(TRANSFORM_DEADLETTER_OUT))); } } /** Builder for {@link PubSubMessageToJsonDocument}. */ @AutoValue.Builder public abstract static class Builder { public abstract Builder setJavascriptTextTransformGcsPath( String javascriptTextTransformGcsPath); public abstract Builder setJavascriptTextTransformFunctionName( String javascriptTextTransformFunctionName); public abstract Builder setPythonExternalTextTransformGcsPath( String pythonExternalTextTransformGcsPath); public abstract Builder setPythonExternalTextTransformFunctionName( String pythonExternalTextTransformFunctionName); public abstract Builder setJavascriptTextTransformReloadIntervalMinutes( Integer javascriptTextTransformReloadIntervalMinutes); public abstract PubSubMessageToJsonDocument build(); } } /** * The {@link ProcessFailsafePubSubFn} class processes a {@link FailsafeElement} containing a * {@link PubsubMessage} and a String of the message's payload {@link PubsubMessage#getPayload()} * into a {@link FailsafeElement} of the original {@link PubsubMessage} and a JSON string that has * been processed with {@link Gson}. * * <p>If {@link PubsubMessage#getAttributeMap()} is not empty then the message attributes will be * serialized along with the message payload. */ static class ProcessFailsafePubSubFn extends DoFn<FailsafeElement<PubsubMessage, String>, FailsafeElement<PubsubMessage, String>> { private static final Counter successCounter = Metrics.counter(PubSubMessageToJsonDocument.class, "successful-json-conversion"); private static Gson gson = new Gson(); private static final Counter failedCounter = Metrics.counter(PubSubMessageToJsonDocument.class, "failed-json-conversion"); @ProcessElement public void processElement(ProcessContext context) { PubsubMessage pubsubMessage = context.element().getOriginalPayload(); JsonObject messageObject = new JsonObject(); try { if (pubsubMessage.getPayload().length > 0) { messageObject = gson.fromJson( new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8), JsonObject.class); } // If message attributes are present they will be serialized along with the message payload if (pubsubMessage.getAttributeMap() != null) { pubsubMessage.getAttributeMap().forEach(messageObject::addProperty); } context.output(FailsafeElement.of(pubsubMessage, messageObject.toString())); successCounter.inc(); } catch (JsonSyntaxException e) { context.output( TRANSFORM_DEADLETTER_OUT, FailsafeElement.of(context.element()) .setErrorMessage(e.getMessage()) .setStacktrace(Throwables.getStackTraceAsString(e))); failedCounter.inc(); } } } /** * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the * {@link FailsafeElement} class so errors can be recovered from and the original message can be * output to a error records table. */ static class PubsubMessageToFailsafeElementFn extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> { @ProcessElement public void processElement(ProcessContext context) { PubsubMessage message = context.element(); context.output( FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8))); } } }