/*  * Copyright (C) 2018 Google LLC  *  * Licensed under the Apache License, Version 2.0 (the "License"); you may not  * use this file except in compliance with the License. You may obtain a copy of  * the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the  * License for the specific language governing permissions and limitations under  * the License.  */ package com.google.cloud.teleport.v2.templates; import static com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.teleport.metadata.MultiTemplate; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions; import com.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options; import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow; import com.google.cloud.teleport.v2.transforms.ErrorConverters; import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn; import com.google.cloud.teleport.v2.utils.BigQueryIOUtils; import com.google.cloud.teleport.v2.utils.ResourceUtils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import java.nio.charset.StandardCharsets; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /**  * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format  * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors  * which occur in the transformation of the data or execution of the UDF will be output to a  * separate errors table in BigQuery. The errors table will be created if it does not exist prior to  * execution. Both output and error tables are specified by the user as template parameters.  *  * <p><b>Pipeline Requirements</b>  *  * <ul>  * <li>The Pub/Sub topic exists.  * <li>The BigQuery output table exists.  * </ul>  *  * <p>Check out <a  * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a>  * for instructions on how to use or modify this template.  */ @MultiTemplate({  @Template(  name = "PubSub_to_BigQuery_Flex",  category = TemplateCategory.STREAMING,  displayName = "Pub/Sub to BigQuery",  description =  "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",  optionsClass = Options.class,  skipOptions = {  "pythonExternalTextTransformGcsPath",  "pythonExternalTextTransformFunctionName",  },  flexContainerName = "pubsub-to-bigquery",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",  contactInformation = "https://cloud.google.com/support",  requirements = {  "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",  "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."  },  streaming = true,  supportsAtLeastOnce = true,  supportsExactlyOnce = true),  @Template(  name = "PubSub_to_BigQuery_Xlang",  category = TemplateCategory.STREAMING,  displayName = "Pub/Sub to BigQuery with Python UDFs",  type = Template.TemplateType.XLANG,  description =  "The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "  + "You can use the template as a quick solution to move Pub/Sub data to BigQuery. "  + "The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",  optionsClass = Options.class,  skipOptions = {  "javascriptTextTransformGcsPath",  "javascriptTextTransformFunctionName",  "javascriptTextTransformReloadIntervalMinutes"  },  flexContainerName = "pubsub-to-bigquery-xlang",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",  contactInformation = "https://cloud.google.com/support",  requirements = {  "The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.",  "The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."  },  streaming = true,  supportsAtLeastOnce = true,  supportsExactlyOnce = true) }) public class PubSubToBigQuery {  /** The log to output status messages to. */  private static final Logger LOG = LoggerFactory.getLogger(PubSubToBigQuery.class);  /** The tag for the main output for the UDF. */  public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_OUT =  new TupleTag<FailsafeElement<PubsubMessage, String>>() {};  /** The tag for the main output of the json transformation. */  public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};  /** The tag for the dead-letter output of the udf. */  public static final TupleTag<FailsafeElement<PubsubMessage, String>> UDF_DEADLETTER_OUT =  new TupleTag<FailsafeElement<PubsubMessage, String>>() {};  /** The tag for the dead-letter output of the json to table row transform. */  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_DEADLETTER_OUT =  new TupleTag<FailsafeElement<PubsubMessage, String>>() {};  /** The default suffix for error tables if dead letter table is not specified. */  public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";  /** Pubsub message/string coder for pipeline. */  public static final FailsafeElementCoder<PubsubMessage, String> CODER =  FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());  /** String/String Coder for FailsafeElement. */  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =  FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());  /**  * The {@link Options} class provides the custom execution options passed by the executor at the  * command-line.  */  public interface Options  extends PipelineOptions,  BigQueryStorageApiStreamingOptions,  PythonExternalTextTransformerOptions,  DataflowPipelineWorkerPoolOptions {  @TemplateParameter.BigQueryTable(  order = 1,  groupName = "Target",  description = "BigQuery output table",  helpText =  "The BigQuery table to write to, formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`.")  String getOutputTableSpec();  void setOutputTableSpec(String value);  @TemplateParameter.PubsubTopic(  order = 2,  groupName = "Source",  optional = true,  description = "Input Pub/Sub topic",  helpText =  "The Pub/Sub topic to read from, formatted as `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")  String getInputTopic();  void setInputTopic(String value);  @TemplateParameter.PubsubSubscription(  order = 3,  groupName = "Source",  optional = true,  description = "Pub/Sub input subscription",  helpText =  "The Pub/Sub subscription to read from, "  + "formatted as `projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>`.")  String getInputSubscription();  void setInputSubscription(String value);  @TemplateParameter.BigQueryTable(  order = 4,  optional = true,  description =  "Table for messages failed to reach the output table (i.e., Deadletter table)",  helpText =  "The BigQuery table to use for messages that failed to reach the output table, "  + "formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`. If the table "  + "doesn't exist, it is created when the pipeline runs. "  + "If this parameter is not specified, "  + "the value `OUTPUT_TABLE_SPEC_error_records` is used instead.")  String getOutputDeadletterTable();  void setOutputDeadletterTable(String value);  @TemplateParameter.Boolean(  order = 5,  optional = true,  parentName = "useStorageWriteApi",  parentTriggerValues = {"true"},  description = "Use at at-least-once semantics in BigQuery Storage Write API",  helpText =  "When using the Storage Write API, specifies the write semantics. "  + "To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"  + ", set this parameter to true. "  + "To use exactly-once semantics, set the parameter to `false`. "  + "This parameter applies only when `useStorageWriteApi` is `true`. "  + "The default value is `false`.")  @Default.Boolean(false)  @Override  Boolean getUseStorageWriteApiAtLeastOnce();  void setUseStorageWriteApiAtLeastOnce(Boolean value);  }  /**  * The main entry-point for pipeline execution. This method will start the pipeline but will not  * wait for it's execution to finish. If blocking execution is required, use the {@link  * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code  * result.waitUntilFinish()} on the {@link PipelineResult}.  *  * @param args The command-line args passed by the executor.  */  public static void main(String[] args) {  UncaughtExceptionLogger.register();  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);  BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);  // options.setWorkerDiskType(  //  // "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");  run(options);  }  /**  * Runs the pipeline to completion with the specified options. This method does not wait until the  * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result  * object to block until the pipeline is finished running if blocking programmatic execution is  * required.  *  * @param options The execution options.  * @return The pipeline result.  */  public static PipelineResult run(Options options) {  boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());  boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());  if (useInputSubscription == useInputTopic) {  throw new IllegalArgumentException(  "Either input topic or input subscription must be provided, but not both.");  }  Pipeline pipeline = Pipeline.create(options);  CoderRegistry coderRegistry = pipeline.getCoderRegistry();  coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);  /*  * Steps:  * 1) Read messages in from Pub/Sub  * 2) Transform the PubsubMessages into TableRows  * - Transform message payload via UDF  * - Convert UDF result to TableRow objects  * 3) Write successful records out to BigQuery  * 4) Write failed records out to BigQuery  */  /*  * Step #1: Read messages in from Pub/Sub  * Either from a Subscription or Topic  */  PCollection<PubsubMessage> messages = null;  if (useInputSubscription) {  messages =  pipeline.apply(  "ReadPubSubSubscription",  PubsubIO.readMessagesWithAttributes()  .fromSubscription(options.getInputSubscription()));  } else {  messages =  pipeline.apply(  "ReadPubSubTopic",  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));  }  PCollectionTuple convertedTableRows =  messages  /*  * Step #2: Transform the PubsubMessages into TableRows  */  .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));  /*  * Step #3: Write the successful records out to BigQuery  */  WriteResult writeResult =  convertedTableRows  .get(TRANSFORM_OUT)  .apply(  "WriteSuccessfulRecords",  BigQueryIO.writeTableRows()  .withoutValidation()  .withCreateDisposition(CreateDisposition.CREATE_NEVER)  .withWriteDisposition(WriteDisposition.WRITE_APPEND)  .withExtendedErrorInfo()  .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())  .to(options.getOutputTableSpec()));  /*  * Step 3 Contd.  * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement  */  PCollection<FailsafeElement<String, String>> failedInserts =  BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)  .apply(  "WrapInsertionErrors",  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())  .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))  .setCoder(FAILSAFE_ELEMENT_CODER);  /*  * Step #4: Write records that failed table row transformation  * or conversion out to BigQuery deadletter table.  */  PCollectionList.of(  ImmutableList.of(  convertedTableRows.get(UDF_DEADLETTER_OUT),  convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))  .apply("Flatten", Flatten.pCollections())  .apply(  "WriteFailedRecords",  ErrorConverters.WritePubsubMessageErrors.newBuilder()  .setErrorRecordsTable(  !Strings.isNullOrEmpty(options.getOutputDeadletterTable())  ? options.getOutputDeadletterTable()  : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)  .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())  .build());  // 5) Insert records that failed insert into deadletter table  failedInserts.apply(  "WriteFailedRecords",  ErrorConverters.WriteStringMessageErrors.newBuilder()  .setErrorRecordsTable(  !Strings.isNullOrEmpty(options.getOutputDeadletterTable())  ? options.getOutputDeadletterTable()  : options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)  .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())  .build());  return pipeline.run();  }  /**  * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming  * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while  * applying an optional UDF to the input. The executions of the UDF and transformation to {@link  * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload  * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will  * output a {@link PCollectionTuple} which contains all output and dead-letter {@link  * PCollection}.  *  * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:  *  * <ul>  * <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records  * successfully processed by the optional UDF.  * <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement}  * records which failed processing during the UDF execution.  * <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from  * JSON to {@link TableRow} objects.  * <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement}  * records which couldn't be converted to table rows.  * </ul>  */  static class PubsubMessageToTableRow  extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {  private final Options options;  PubsubMessageToTableRow(Options options) {  this.options = options;  }  @Override  public PCollectionTuple expand(PCollection<PubsubMessage> input) {  boolean useJavascriptUdf =  !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());  boolean usePythonUdf =  !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());  if (useJavascriptUdf && usePythonUdf) {  throw new IllegalArgumentException(  "Either javascript or Python gcs path must be provided, but not both.");  }  PCollectionTuple udfOut;  if (usePythonUdf) {  PCollection<Row> udfRowsOut =  input  // Map the incoming messages into FailsafeElements so we can recover from failures  // across multiple transforms.  .apply(  "MapToRecord",  PythonExternalTextTransformer.FailsafeRowPythonExternalUdf  .pubSubMappingFunction())  .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)  .apply(  "InvokeUDF",  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()  .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())  .setFunctionName(options.getPythonExternalTextTransformFunctionName())  .build());  udfOut =  udfRowsOut.apply(  "MapRowsToFailsafeElements",  ParDo.of(new RowToPubSubFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))  .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));  } else {  udfOut =  input  // Map the incoming messages into FailsafeElements so we can recover from failures  // across multiple transforms.  .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))  .apply(  "InvokeUDF",  FailsafeJavascriptUdf.<PubsubMessage>newBuilder()  .setFileSystemPath(options.getJavascriptTextTransformGcsPath())  .setFunctionName(options.getJavascriptTextTransformFunctionName())  .setReloadIntervalMinutes(  options.getJavascriptTextTransformReloadIntervalMinutes())  .setSuccessTag(UDF_OUT)  .setFailureTag(UDF_DEADLETTER_OUT)  .build());  }  // Convert the records which were successfully processed by the UDF into TableRow objects.  PCollectionTuple jsonToTableRowOut =  udfOut  .get(UDF_OUT)  .apply(  "JsonToTableRow",  FailsafeJsonToTableRow.<PubsubMessage>newBuilder()  .setSuccessTag(TRANSFORM_OUT)  .setFailureTag(TRANSFORM_DEADLETTER_OUT)  .build());  // Re-wrap the PCollections so we can return a single PCollectionTuple  return PCollectionTuple.of(UDF_OUT, udfOut.get(UDF_OUT))  .and(UDF_DEADLETTER_OUT, udfOut.get(UDF_DEADLETTER_OUT))  .and(TRANSFORM_OUT, jsonToTableRowOut.get(TRANSFORM_OUT))  .and(TRANSFORM_DEADLETTER_OUT, jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));  }  }  /**  * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the  * {@link FailsafeElement} class so errors can be recovered from and the original message can be  * output to a error records table.  */  static class PubsubMessageToFailsafeElementFn  extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {  @ProcessElement  public void processElement(ProcessContext context) {  PubsubMessage message = context.element();  context.output(  FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));  }  } }