/* * Copyright (C) 2021 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package com.google.cloud.teleport.v2.templates; import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.cloud.teleport.metadata.MultiTemplate; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions; import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions; import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions; import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions; import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions; import com.google.cloud.teleport.v2.transforms.BigQueryConverters; import com.google.cloud.teleport.v2.transforms.ErrorConverters; import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage; import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn; import com.google.cloud.teleport.v2.utils.BigQueryIOUtils; import com.google.cloud.teleport.v2.utils.GCSUtils; import com.google.cloud.teleport.v2.utils.SchemaUtils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.commons.lang3.ArrayUtils; /** * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a> * records from Pub/Sub to BigQuery. * * <p>Persistent failures are written to a Pub/Sub unprocessed topic. * * <p>Check out <a * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a> * for instructions on how to use or modify this template. */ @MultiTemplate({ @Template( name = "PubSub_Proto_to_BigQuery_Flex", category = TemplateCategory.STREAMING, displayName = "Pub/Sub Proto to BigQuery", description = { "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. " + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n", "A JavaScript user-defined function (UDF) can be provided to transform data. " + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors." }, skipOptions = { "pythonExternalTextTransformGcsPath", "pythonExternalTextTransformFunctionName" }, optionsClass = PubSubProtoToBigQueryOptions.class, flexContainerName = "pubsub-proto-to-bigquery", documentation = "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery", contactInformation = "https://cloud.google.com/support", requirements = { "The input Pub/Sub subscription must exist.", "The schema file for the Proto records must exist on Cloud Storage.", "The output Pub/Sub topic must exist.", "The output BigQuery dataset must exist.", "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value." }, streaming = true, supportsAtLeastOnce = true), @Template( name = "PubSub_Proto_to_BigQuery_Xlang", category = TemplateCategory.STREAMING, displayName = "Pub/Sub Proto to BigQuery with Python UDF", type = Template.TemplateType.XLANG, description = { "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. " + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n", "A Python user-defined function (UDF) can be provided to transform data. " + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors." }, skipOptions = { "javascriptTextTransformGcsPath", "javascriptTextTransformFunctionName", "javascriptTextTransformReloadIntervalMinutes" }, optionsClass = PubSubProtoToBigQueryOptions.class, flexContainerName = "pubsub-proto-to-bigquery-xlang", documentation = "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery", contactInformation = "https://cloud.google.com/support", requirements = { "The input Pub/Sub subscription must exist.", "The schema file for the Proto records must exist on Cloud Storage.", "The output Pub/Sub topic must exist.", "The output BigQuery dataset must exist.", "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value." }, streaming = true, supportsAtLeastOnce = true) }) public final class PubsubProtoToBigQuery { private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>(); private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>(); private static final FailsafeElementCoder<String, String> FAILSAFE_CODER = FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); public static void main(String[] args) { UncaughtExceptionLogger.register(); run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class)); } /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */ public interface PubSubProtoToBigQueryOptions extends ReadSubscriptionOptions, WriteOptions, WriteTopicOptions, PythonExternalTextTransformer.PythonExternalTextTransformerOptions, BigQueryStorageApiStreamingOptions { @TemplateParameter.GcsReadFile( order = 1, description = "Cloud Storage Path to the Proto Schema File", helpText = "The Cloud Storage location of the self-contained proto schema file. For example," + " `gs://path/to/my/file.pb`. You can generate this file with" + " the `--descriptor_set_out` flag of the protoc command." + " The `--include_imports` flag guarantees that the file is self-contained.") @Required String getProtoSchemaPath(); void setProtoSchemaPath(String value); @TemplateParameter.Text( order = 2, regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"}, description = "Full Proto Message Name", helpText = "The full proto message name. For example, `package.name`." + " `MessageName`, where `package.name` is the value provided for the" + " `package` statement and not the `java_package` statement.") @Required String getFullMessageName(); void setFullMessageName(String value); @TemplateParameter.Boolean( order = 3, optional = true, description = "Preserve Proto Field Names", helpText = "To preserve the original proto field name in JSON, set this property to `true`. " + "To use more standard JSON names, set to `false`." + " For example, `false` would change `field_name` to `fieldName`. Defaults to: `false`.") @Default.Boolean(false) Boolean getPreserveProtoFieldNames(); void setPreserveProtoFieldNames(Boolean value); @TemplateParameter.GcsReadFile( order = 4, optional = true, description = "BigQuery Table Schema Path", helpText = "The Cloud Storage path to the BigQuery schema path. " + "If this value isn't provided, then the schema is inferred from the Proto schema.", example = "gs://MyBucket/bq_schema.json") String getBigQueryTableSchemaPath(); void setBigQueryTableSchemaPath(String value); @TemplateParameter.PubsubTopic( order = 5, optional = true, description = "Pub/Sub output topic for UDF failures", helpText = "The Pub/Sub topic storing the UDF errors." + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.", example = "projects/your-project-id/topics/your-topic-name") String getUdfOutputTopic(); void setUdfOutputTopic(String udfOutputTopic); // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned // on when pipeline is running on ALO mode and using the Storage Write API @TemplateParameter.Boolean( order = 6, optional = true, parentName = "useStorageWriteApi", parentTriggerValues = {"true"}, description = "Use at at-least-once semantics in BigQuery Storage Write API", helpText = "When using the Storage Write API, specifies the write semantics." + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`." + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.", hiddenUi = true) @Default.Boolean(false) @Override Boolean getUseStorageWriteApiAtLeastOnce(); void setUseStorageWriteApiAtLeastOnce(Boolean value); } /** Runs the pipeline and returns the results. */ private static PipelineResult run(PubSubProtoToBigQueryOptions options) { BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options); Pipeline pipeline = Pipeline.create(options); Descriptor descriptor = getDescriptor(options); PCollection<String> maybeForUdf = pipeline .apply("Read From Pubsub", readPubsubMessages(options, descriptor)) .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options)); WriteResult writeResult = runUdf(maybeForUdf, options) .apply("Write to BigQuery", writeToBigQuery(options, descriptor)); BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options) .apply( "Create Error Payload", ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder() .setPayloadCoder(StringUtf8Coder.of()) .setTranslateFunction(BigQueryConverters::tableRowToJson) .build()) .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic())); return pipeline.run(); } /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */ @VisibleForTesting static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) { String schemaPath = options.getProtoSchemaPath(); String messageName = options.getFullMessageName(); Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName); if (descriptor == null) { throw new IllegalArgumentException( messageName + " is not a recognized message in " + schemaPath); } return descriptor; } /** Returns the {@link PTransform} for reading Pub/Sub messages. */ private static Read<DynamicMessage> readPubsubMessages( PubSubProtoToBigQueryOptions options, Descriptor descriptor) { return PubsubIO.readProtoDynamicMessages(descriptor) .fromSubscription(options.getInputSubscription()) .withDeadLetterTopic(options.getOutputTopic()); } /** * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}. * * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is * specified in {@code options}. */ @VisibleForTesting static Write<String> writeToBigQuery( PubSubProtoToBigQueryOptions options, Descriptor descriptor) { Write<String> write = BigQueryConverters.<String>createWriteTransform(options) .withFormatFunction(BigQueryConverters::convertJsonToTableRow); String schemaPath = options.getBigQueryTableSchemaPath(); if (Strings.isNullOrEmpty(schemaPath)) { return write.withSchema( SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames())); } else { return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath)); } } /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */ private static class ConvertDynamicProtoMessageToJson extends PTransform<PCollection<DynamicMessage>, PCollection<String>> { private final boolean preserveProtoName; private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) { this.preserveProtoName = options.getPreserveProtoFieldNames(); } @Override public PCollection<String> expand(PCollection<DynamicMessage> input) { return input.apply( "Map to JSON", MapElements.into(TypeDescriptors.strings()) .via( message -> { try { JsonFormat.Printer printer = JsonFormat.printer(); return preserveProtoName ? printer.preservingProtoFieldNames().print(message) : printer.print(message); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } })); } } /** * Handles running the UDF. * * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called. * * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed * topic. * * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF * @param options the options containing info on running the UDF * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not * called */ @VisibleForTesting static PCollection<String> runUdf( PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) { boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath()); boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath()); // In order to avoid generating a graph that makes it look like a UDF was called when none was // intended, simply return the input as "success" output. if (!useJavascriptUdf && !usePythonUdf) { return jsonCollection; } // For testing purposes, we need to do this check before creating the PTransform rather than // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning // a value. if (useJavascriptUdf && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) { throw new IllegalArgumentException( "JavaScript function name cannot be null or empty if file is set"); } if (usePythonUdf && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) { throw new IllegalArgumentException( "Python function name cannot be null or empty if file is set"); } if (usePythonUdf && useJavascriptUdf) { throw new IllegalArgumentException( "Either javascript or Python gcs path must be provided, but not both."); } PCollectionTuple maybeSuccess; if (usePythonUdf) { maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options)); } else { maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options)); } maybeSuccess .get(UDF_FAILURE_TAG) .setCoder(FAILSAFE_CODER) .apply( "Get UDF Failures", ConvertFailsafeElementToPubsubMessage.<String, String>builder() .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8))) .setErrorMessageAttributeKey("udfErrorMessage") .build()) .apply("Write Failed UDF", writeUdfFailures(options)); return maybeSuccess .get(UDF_SUCCESS_TAG) .setCoder(FAILSAFE_CODER) .apply( "Get UDF Output", MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)) .setCoder(NullableCoder.of(StringUtf8Coder.of())); } /** {@link PTransform} that calls a UDF and returns both success and failure output. */ private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> { private final PubSubProtoToBigQueryOptions options; RunUdf(PubSubProtoToBigQueryOptions options) { this.options = options; } @Override public PCollectionTuple expand(PCollection<String> input) { return input .apply("Prepare Failsafe UDF", makeFailsafe()) .setCoder(FAILSAFE_CODER) .apply( "Call UDF", FailsafeJavascriptUdf.<String>newBuilder() .setFileSystemPath(options.getJavascriptTextTransformGcsPath()) .setFunctionName(options.getJavascriptTextTransformFunctionName()) .setReloadIntervalMinutes( options.getJavascriptTextTransformReloadIntervalMinutes()) .setSuccessTag(UDF_SUCCESS_TAG) .setFailureTag(UDF_FAILURE_TAG) .build()); } private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() { return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {}) .via((String json) -> FailsafeElement.of(json, json)); } } /** {@link PTransform} that calls a python UDF and returns both success and failure output. */ private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> { private final PubSubProtoToBigQueryOptions options; RunPythonUdf(PubSubProtoToBigQueryOptions options) { this.options = options; } @Override public PCollectionTuple expand(PCollection<String> input) { return input .apply("Prepare Failsafe row", stringMappingFunction()) .setCoder( RowCoder.of(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)) .apply( "InvokeUDF", PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder() .setFileSystemPath(options.getPythonExternalTextTransformGcsPath()) .setFunctionName(options.getPythonExternalTextTransformFunctionName()) .build()) .apply( "MapRowsToFailsafeElements", ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG)) .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG))); } } /** * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output * topic. */ private static PubsubIO.Write<PubsubMessage> writeUdfFailures( PubSubProtoToBigQueryOptions options) { PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages(); return Strings.isNullOrEmpty(options.getUdfOutputTopic()) ? write.to(options.getOutputTopic()) : write.to(options.getUdfOutputTopic()); } }