/*  * Copyright (C) 2021 Google LLC  *  * Licensed under the Apache License, Version 2.0 (the "License"); you may not  * use this file except in compliance with the License. You may obtain a copy of  * the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the  * License for the specific language governing permissions and limitations under  * the License.  */ package com.google.cloud.teleport.v2.templates; import static com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.stringMappingFunction; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.cloud.teleport.metadata.MultiTemplate; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions; import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions; import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions; import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions; import com.google.cloud.teleport.v2.templates.PubsubProtoToBigQuery.PubSubProtoToBigQueryOptions; import com.google.cloud.teleport.v2.transforms.BigQueryConverters; import com.google.cloud.teleport.v2.transforms.ErrorConverters; import com.google.cloud.teleport.v2.transforms.FailsafeElementTransforms.ConvertFailsafeElementToPubsubMessage; import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn; import com.google.cloud.teleport.v2.utils.BigQueryIOUtils; import com.google.cloud.teleport.v2.utils.GCSUtils; import com.google.cloud.teleport.v2.utils.SchemaUtils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.commons.lang3.ArrayUtils; /**  * A template for writing <a href="https://developers.google.com/protocol-buffers">Protobuf</a>  * records from Pub/Sub to BigQuery.  *  * <p>Persistent failures are written to a Pub/Sub unprocessed topic.  *  * <p>Check out <a  * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Proto_to_BigQuery.md">README</a>  * for instructions on how to use or modify this template.  */ @MultiTemplate({  @Template(  name = "PubSub_Proto_to_BigQuery_Flex",  category = TemplateCategory.STREAMING,  displayName = "Pub/Sub Proto to BigQuery",  description = {  "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "  + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",  "A JavaScript user-defined function (UDF) can be provided to transform data. "  + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."  },  skipOptions = {  "pythonExternalTextTransformGcsPath",  "pythonExternalTextTransformFunctionName"  },  optionsClass = PubSubProtoToBigQueryOptions.class,  flexContainerName = "pubsub-proto-to-bigquery",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",  contactInformation = "https://cloud.google.com/support",  requirements = {  "The input Pub/Sub subscription must exist.",  "The schema file for the Proto records must exist on Cloud Storage.",  "The output Pub/Sub topic must exist.",  "The output BigQuery dataset must exist.",  "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."  },  streaming = true,  supportsAtLeastOnce = true),  @Template(  name = "PubSub_Proto_to_BigQuery_Xlang",  category = TemplateCategory.STREAMING,  displayName = "Pub/Sub Proto to BigQuery with Python UDF",  type = Template.TemplateType.XLANG,  description = {  "The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto data from a Pub/Sub subscription into a BigQuery table. "  + "Any errors that occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.\n",  "A Python user-defined function (UDF) can be provided to transform data. "  + "Errors while executing the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as the BigQuery errors."  },  skipOptions = {  "javascriptTextTransformGcsPath",  "javascriptTextTransformFunctionName",  "javascriptTextTransformReloadIntervalMinutes"  },  optionsClass = PubSubProtoToBigQueryOptions.class,  flexContainerName = "pubsub-proto-to-bigquery-xlang",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-proto-to-bigquery",  contactInformation = "https://cloud.google.com/support",  requirements = {  "The input Pub/Sub subscription must exist.",  "The schema file for the Proto records must exist on Cloud Storage.",  "The output Pub/Sub topic must exist.",  "The output BigQuery dataset must exist.",  "If the BigQuery table exists, it must have a schema matching the proto data regardless of the <code>createDisposition</code> value."  },  streaming = true,  supportsAtLeastOnce = true) }) public final class PubsubProtoToBigQuery {  private static final TupleTag<FailsafeElement<String, String>> UDF_SUCCESS_TAG = new TupleTag<>();  private static final TupleTag<FailsafeElement<String, String>> UDF_FAILURE_TAG = new TupleTag<>();  private static final FailsafeElementCoder<String, String> FAILSAFE_CODER =  FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());  public static void main(String[] args) {  UncaughtExceptionLogger.register();  run(PipelineOptionsFactory.fromArgs(args).as(PubSubProtoToBigQueryOptions.class));  }  /** {@link org.apache.beam.sdk.options.PipelineOptions} for {@link PubsubProtoToBigQuery}. */  public interface PubSubProtoToBigQueryOptions  extends ReadSubscriptionOptions,  WriteOptions,  WriteTopicOptions,  PythonExternalTextTransformer.PythonExternalTextTransformerOptions,  BigQueryStorageApiStreamingOptions {  @TemplateParameter.GcsReadFile(  order = 1,  description = "Cloud Storage Path to the Proto Schema File",  helpText =  "The Cloud Storage location of the self-contained proto schema file. For example,"  + " `gs://path/to/my/file.pb`. You can generate this file with"  + " the `--descriptor_set_out` flag of the protoc command."  + " The `--include_imports` flag guarantees that the file is self-contained.")  @Required  String getProtoSchemaPath();  void setProtoSchemaPath(String value);  @TemplateParameter.Text(  order = 2,  regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},  description = "Full Proto Message Name",  helpText =  "The full proto message name. For example, `package.name`."  + " `MessageName`, where `package.name` is the value provided for the"  + " `package` statement and not the `java_package` statement.")  @Required  String getFullMessageName();  void setFullMessageName(String value);  @TemplateParameter.Boolean(  order = 3,  optional = true,  description = "Preserve Proto Field Names",  helpText =  "To preserve the original proto field name in JSON, set this property to `true`. "  + "To use more standard JSON names, set to `false`."  + " For example, `false` would change `field_name` to `fieldName`. Defaults to: `false`.")  @Default.Boolean(false)  Boolean getPreserveProtoFieldNames();  void setPreserveProtoFieldNames(Boolean value);  @TemplateParameter.GcsReadFile(  order = 4,  optional = true,  description = "BigQuery Table Schema Path",  helpText =  "The Cloud Storage path to the BigQuery schema path. "  + "If this value isn't provided, then the schema is inferred from the Proto schema.",  example = "gs://MyBucket/bq_schema.json")  String getBigQueryTableSchemaPath();  void setBigQueryTableSchemaPath(String value);  @TemplateParameter.PubsubTopic(  order = 5,  optional = true,  description = "Pub/Sub output topic for UDF failures",  helpText =  "The Pub/Sub topic storing the UDF errors."  + " If this value isn't provided, UDF errors are sent to the same topic as `outputTopic`.",  example = "projects/your-project-id/topics/your-topic-name")  String getUdfOutputTopic();  void setUdfOutputTopic(String udfOutputTopic);  // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned  // on when pipeline is running on ALO mode and using the Storage Write API  @TemplateParameter.Boolean(  order = 6,  optional = true,  parentName = "useStorageWriteApi",  parentTriggerValues = {"true"},  description = "Use at at-least-once semantics in BigQuery Storage Write API",  helpText =  "When using the Storage Write API, specifies the write semantics."  + " To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true`. To use exactly-once semantics, set the parameter to `false`."  + " This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",  hiddenUi = true)  @Default.Boolean(false)  @Override  Boolean getUseStorageWriteApiAtLeastOnce();  void setUseStorageWriteApiAtLeastOnce(Boolean value);  }  /** Runs the pipeline and returns the results. */  private static PipelineResult run(PubSubProtoToBigQueryOptions options) {  BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);  Pipeline pipeline = Pipeline.create(options);  Descriptor descriptor = getDescriptor(options);  PCollection<String> maybeForUdf =  pipeline  .apply("Read From Pubsub", readPubsubMessages(options, descriptor))  .apply("Dynamic Message to TableRow", new ConvertDynamicProtoMessageToJson(options));  WriteResult writeResult =  runUdf(maybeForUdf, options)  .apply("Write to BigQuery", writeToBigQuery(options, descriptor));  BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)  .apply(  "Create Error Payload",  ErrorConverters.BigQueryInsertErrorToPubsubMessage.<String>newBuilder()  .setPayloadCoder(StringUtf8Coder.of())  .setTranslateFunction(BigQueryConverters::tableRowToJson)  .build())  .apply("Write Failed BQ Records", PubsubIO.writeMessages().to(options.getOutputTopic()));  return pipeline.run();  }  /** Gets the {@link Descriptor} for the message type in the Pub/Sub topic. */  @VisibleForTesting  static Descriptor getDescriptor(PubSubProtoToBigQueryOptions options) {  String schemaPath = options.getProtoSchemaPath();  String messageName = options.getFullMessageName();  Descriptor descriptor = SchemaUtils.getProtoDomain(schemaPath).getDescriptor(messageName);  if (descriptor == null) {  throw new IllegalArgumentException(  messageName + " is not a recognized message in " + schemaPath);  }  return descriptor;  }  /** Returns the {@link PTransform} for reading Pub/Sub messages. */  private static Read<DynamicMessage> readPubsubMessages(  PubSubProtoToBigQueryOptions options, Descriptor descriptor) {  return PubsubIO.readProtoDynamicMessages(descriptor)  .fromSubscription(options.getInputSubscription())  .withDeadLetterTopic(options.getOutputTopic());  }  /**  * Writes messages to BigQuery, creating the table if necessary and allowed in {@code options}.  *  * <p>The BigQuery schema will be inferred from {@code descriptor} unless a JSON schema path is  * specified in {@code options}.  */  @VisibleForTesting  static Write<String> writeToBigQuery(  PubSubProtoToBigQueryOptions options, Descriptor descriptor) {  Write<String> write =  BigQueryConverters.<String>createWriteTransform(options)  .withFormatFunction(BigQueryConverters::convertJsonToTableRow);  String schemaPath = options.getBigQueryTableSchemaPath();  if (Strings.isNullOrEmpty(schemaPath)) {  return write.withSchema(  SchemaUtils.createBigQuerySchema(descriptor, options.getPreserveProtoFieldNames()));  } else {  return write.withJsonSchema(GCSUtils.getGcsFileAsString(schemaPath));  }  }  /** {@link PTransform} that handles converting {@link PubsubMessage} values to JSON. */  private static class ConvertDynamicProtoMessageToJson  extends PTransform<PCollection<DynamicMessage>, PCollection<String>> {  private final boolean preserveProtoName;  private ConvertDynamicProtoMessageToJson(PubSubProtoToBigQueryOptions options) {  this.preserveProtoName = options.getPreserveProtoFieldNames();  }  @Override  public PCollection<String> expand(PCollection<DynamicMessage> input) {  return input.apply(  "Map to JSON",  MapElements.into(TypeDescriptors.strings())  .via(  message -> {  try {  JsonFormat.Printer printer = JsonFormat.printer();  return preserveProtoName  ? printer.preservingProtoFieldNames().print(message)  : printer.print(message);  } catch (InvalidProtocolBufferException e) {  throw new RuntimeException(e);  }  }));  }  }  /**  * Handles running the UDF.  *  * <p>If {@code options} is configured so as not to run the UDF, then the UDF will not be called.  *  * <p>This may add a branch to the pipeline for outputting failed UDF records to an unprocessed  * topic.  *  * @param jsonCollection {@link PCollection} of JSON strings for use as input to the UDF  * @param options the options containing info on running the UDF  * @return the {@link PCollection} of UDF output as JSON or {@code jsonCollection} if UDF not  * called  */  @VisibleForTesting  static PCollection<String> runUdf(  PCollection<String> jsonCollection, PubSubProtoToBigQueryOptions options) {  boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());  boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());  // In order to avoid generating a graph that makes it look like a UDF was called when none was  // intended, simply return the input as "success" output.  if (!useJavascriptUdf && !usePythonUdf) {  return jsonCollection;  }  // For testing purposes, we need to do this check before creating the PTransform rather than  // in `expand`. Otherwise, we get a NullPointerException due to the PTransform not returning  // a value.  if (useJavascriptUdf  && Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {  throw new IllegalArgumentException(  "JavaScript function name cannot be null or empty if file is set");  }  if (usePythonUdf  && Strings.isNullOrEmpty(options.getPythonExternalTextTransformFunctionName())) {  throw new IllegalArgumentException(  "Python function name cannot be null or empty if file is set");  }  if (usePythonUdf && useJavascriptUdf) {  throw new IllegalArgumentException(  "Either javascript or Python gcs path must be provided, but not both.");  }  PCollectionTuple maybeSuccess;  if (usePythonUdf) {  maybeSuccess = jsonCollection.apply("Run UDF", new RunPythonUdf(options));  } else {  maybeSuccess = jsonCollection.apply("Run UDF", new RunUdf(options));  }  maybeSuccess  .get(UDF_FAILURE_TAG)  .setCoder(FAILSAFE_CODER)  .apply(  "Get UDF Failures",  ConvertFailsafeElementToPubsubMessage.<String, String>builder()  .setOriginalPayloadSerializeFn(s -> ArrayUtils.toObject(s.getBytes(UTF_8)))  .setErrorMessageAttributeKey("udfErrorMessage")  .build())  .apply("Write Failed UDF", writeUdfFailures(options));  return maybeSuccess  .get(UDF_SUCCESS_TAG)  .setCoder(FAILSAFE_CODER)  .apply(  "Get UDF Output",  MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))  .setCoder(NullableCoder.of(StringUtf8Coder.of()));  }  /** {@link PTransform} that calls a UDF and returns both success and failure output. */  private static class RunUdf extends PTransform<PCollection<String>, PCollectionTuple> {  private final PubSubProtoToBigQueryOptions options;  RunUdf(PubSubProtoToBigQueryOptions options) {  this.options = options;  }  @Override  public PCollectionTuple expand(PCollection<String> input) {  return input  .apply("Prepare Failsafe UDF", makeFailsafe())  .setCoder(FAILSAFE_CODER)  .apply(  "Call UDF",  FailsafeJavascriptUdf.<String>newBuilder()  .setFileSystemPath(options.getJavascriptTextTransformGcsPath())  .setFunctionName(options.getJavascriptTextTransformFunctionName())  .setReloadIntervalMinutes(  options.getJavascriptTextTransformReloadIntervalMinutes())  .setSuccessTag(UDF_SUCCESS_TAG)  .setFailureTag(UDF_FAILURE_TAG)  .build());  }  private static MapElements<String, FailsafeElement<String, String>> makeFailsafe() {  return MapElements.into(new TypeDescriptor<FailsafeElement<String, String>>() {})  .via((String json) -> FailsafeElement.of(json, json));  }  }  /** {@link PTransform} that calls a python UDF and returns both success and failure output. */  private static class RunPythonUdf extends PTransform<PCollection<String>, PCollectionTuple> {  private final PubSubProtoToBigQueryOptions options;  RunPythonUdf(PubSubProtoToBigQueryOptions options) {  this.options = options;  }  @Override  public PCollectionTuple expand(PCollection<String> input) {  return input  .apply("Prepare Failsafe row", stringMappingFunction())  .setCoder(  RowCoder.of(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA))  .apply(  "InvokeUDF",  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()  .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())  .setFunctionName(options.getPythonExternalTextTransformFunctionName())  .build())  .apply(  "MapRowsToFailsafeElements",  ParDo.of(new RowToStringFailsafeElementFn(UDF_SUCCESS_TAG, UDF_FAILURE_TAG))  .withOutputTags(UDF_SUCCESS_TAG, TupleTagList.of(UDF_FAILURE_TAG)));  }  }  /**  * Returns a {@link PubsubIO.Write} configured to write UDF failures to the appropriate output  * topic.  */  private static PubsubIO.Write<PubsubMessage> writeUdfFailures(  PubSubProtoToBigQueryOptions options) {  PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages();  return Strings.isNullOrEmpty(options.getUdfOutputTopic())  ? write.to(options.getOutputTopic())  : write.to(options.getUdfOutputTopic());  } }