/*  * Copyright (C) 2018 Google LLC  *  * Licensed under the Apache License, Version 2.0 (the "License"); you may not  * use this file except in compliance with the License. You may obtain a copy of  * the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the  * License for the specific language governing permissions and limitations under  * the License.  */ package com.google.cloud.teleport.v2.templates; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.teleport.metadata.MultiTemplate; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions; import com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.TextToBigQueryStreamingOptions; import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow; import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors; import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer; import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn; import com.google.cloud.teleport.v2.utils.BigQueryIOUtils; import com.google.cloud.teleport.v2.utils.GCSUtils; import com.google.cloud.teleport.v2.utils.ResourceUtils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Watch.Growth; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.commons.lang3.StringUtils; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /**  * The {@link TextToBigQueryStreaming} is a streaming version of {@link TextIOToBigQuery} pipeline  * that reads text files, applies a JavaScript UDF and writes the output to BigQuery. The pipeline  * continuously polls for new files, reads them row-by-row and processes each record into BigQuery.  * The polling interval is set at 10 seconds.  *  * <p>Check out <a  * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Stream_GCS_Text_to_BigQuery_Flex.md">README</a>  * for instructions on how to use or modify this template.  */ @MultiTemplate({  @Template(  name = "Stream_GCS_Text_to_BigQuery_Flex",  category = TemplateCategory.STREAMING,  displayName = "Cloud Storage Text to BigQuery (Stream)",  description = {  "The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",  "The pipeline runs indefinitely and needs to be terminated manually via a\n"  + " <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"  + " <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"  + " <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"  + " draining."  },  skipOptions = {  "pythonExternalTextTransfromGcsPath",  "pythonExternalTextTransformFunctionName"  },  optionsClass = TextToBigQueryStreamingOptions.class,  flexContainerName = "text-to-bigquery-streaming",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",  contactInformation = "https://cloud.google.com/support",  requirements = {  "Create a JSON file that describes the schema of your output table in BigQuery.\n"  + " <p>\n"  + " Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"  + " contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"  + " For example:\n"  + " </p>\n"  + "<pre class=\"prettyprint lang-json\">\n"  + "{\n"  + " \"fields\": [\n"  + " {\n"  + " \"name\": \"location\",\n"  + " \"type\": \"STRING\"\n"  + " },\n"  + " {\n"  + " \"name\": \"name\",\n"  + " \"type\": \"STRING\"\n"  + " },\n"  + " {\n"  + " \"name\": \"age\",\n"  + " \"type\": \"STRING\"\n"  + " },\n"  + " {\n"  + " \"name\": \"color\",\n"  + " \"type\": \"STRING\",\n"  + " \"mode\": \"REQUIRED\"\n"  + " },\n"  + " {\n"  + " \"name\": \"coffee\",\n"  + " \"type\": \"STRING\",\n"  + " \"mode\": \"REQUIRED\"\n"  + " }\n"  + " ]\n"  + "}\n"  + "</pre>",  "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"  + " to transform the lines of text. Note that your function must return a JSON string.\n"  + " <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"  + " transforming the values.</p>\n"  + "<pre class=\"prettyprint\" suppresswarning>\n"  + "function transform(line) {\n"  + "var values = line.split(',');\n"  + "\n"  + "var obj = new Object();\n"  + "obj.location = values[0];\n"  + "obj.name = values[1];\n"  + "obj.age = values[2];\n"  + "obj.color = values[3];\n"  + "obj.coffee = values[4];\n"  + "var jsonString = JSON.stringify(obj);\n"  + "\n"  + "return jsonString;\n"  + "}\n"  + "</pre>"  },  streaming = true,  supportsAtLeastOnce = true),  @Template(  name = "Stream_GCS_Text_to_BigQuery_Xlang",  category = TemplateCategory.STREAMING,  displayName = "Cloud Storage Text to BigQuery (Stream) with Python UDF",  type = Template.TemplateType.XLANG,  description = {  "The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",  "The pipeline runs indefinitely and needs to be terminated manually via a\n"  + " <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"  + " <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"  + " <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"  + " draining."  },  skipOptions = {  "javascriptTextTransformGcsPath",  "javascriptTextTransformFunctionName",  "javascriptTextTransformReloadIntervalMinutes"  },  optionsClass = TextToBigQueryStreamingOptions.class,  flexContainerName = "text-to-bigquery-streaming-xlang",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",  contactInformation = "https://cloud.google.com/support",  requirements = {  "Create a JSON file that describes the schema of your output table in BigQuery.\n"  + " <p>\n"  + " Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"  + " contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"  + " For example:\n"  + " </p>\n"  + "<pre class=\"prettyprint lang-json\">\n"  + "{\n"  + " \"fields\": [\n"  + " {\n"  + " \"name\": \"location\",\n"  + " \"type\": \"STRING\"\n"  + " },\n"  + " {\n"  + " \"name\": \"name\",\n"  + " \"type\": \"STRING\"\n"  + " },\n"  + " {\n"  + " \"name\": \"age\",\n"  + " \"type\": \"STRING\"\n"  + " },\n"  + " {\n"  + " \"name\": \"color\",\n"  + " \"type\": \"STRING\",\n"  + " \"mode\": \"REQUIRED\"\n"  + " },\n"  + " {\n"  + " \"name\": \"coffee\",\n"  + " \"type\": \"STRING\",\n"  + " \"mode\": \"REQUIRED\"\n"  + " }\n"  + " ]\n"  + "}\n"  + "</pre>",  "Create a Python (<code>.js</code>) file with your UDF function that supplies the logic\n"  + " to transform the lines of text. Note that your function must return a JSON string.\n"  + " <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"  + " transforming the values.</p>\n"  + "<pre class=\"prettyprint\" suppresswarning>\n"  + "import json\n"  + "def transform(line): \n"  + " values = line.split(',')\n"  + "\n"  + " obj = {\n"  + " 'location' : values[0],\n"  + " 'name' : values[1],\n"  + " 'age' : values[2],\n"  + " 'color' : values[3],\n"  + " 'coffee' : values[4]\n"  + " }\n"  + " jsonString = JSON.dumps(obj);\n"  + "\n"  + " return jsonString;\n"  + "\n"  + "</pre>"  },  streaming = true,  supportsAtLeastOnce = true) }) public class TextToBigQueryStreaming {  private static final Logger LOG = LoggerFactory.getLogger(TextToBigQueryStreaming.class);  /** The tag for the main output for the UDF. */  private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =  new TupleTag<FailsafeElement<String, String>>() {};  /** The tag for the dead-letter output of the udf. */  private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =  new TupleTag<FailsafeElement<String, String>>() {};  /** The tag for the main output of the json transformation. */  private static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};  /** The tag for the dead-letter output of the json to table row transform. */  private static final TupleTag<FailsafeElement<String, String>> TRANSFORM_DEADLETTER_OUT =  new TupleTag<FailsafeElement<String, String>>() {};  /** The default suffix for error tables if dead letter table is not specified. */  private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";  /** Default interval for polling files in GCS. */  private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);  /** Coder for FailsafeElement. */  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =  FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());  private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();  /**  * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If  * blocking execution is required, use the {@link  * TextToBigQueryStreaming#run(TextToBigQueryStreamingOptions)} method to start the pipeline and  * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}  *  * @param args The command-line arguments to the pipeline.  */  public static void main(String[] args) {  UncaughtExceptionLogger.register();  // Parse the user options passed from the command-line  TextToBigQueryStreamingOptions options =  PipelineOptionsFactory.fromArgs(args)  .withValidation()  .as(TextToBigQueryStreamingOptions.class);  BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);  run(options);  }  /**  * Runs the pipeline with the supplied options.  *  * @param options The execution parameters to the pipeline.  * @return The result of the pipeline execution.  */  public static PipelineResult run(TextToBigQueryStreamingOptions options) {  // Create the pipeline  Pipeline pipeline = Pipeline.create(options);  // Register the coder for pipeline  FailsafeElementCoder<String, String> coder =  FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());  CoderRegistry coderRegistry = pipeline.getCoderRegistry();  coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);  // Determine if we are using Python UDFs or JS UDFs based on the provided options.  boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());  boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());  if (useJavascriptUdf && usePythonUdf) {  throw new IllegalArgumentException(  "Either javascript or Python gcs path must be provided, but not both.");  }  /*  * Steps:  * 1) Read from the text source continuously.  * 2) Convert to FailsafeElement.  * 3) Apply Javascript udf transformation.  * - Tag records that were successfully transformed and those  * that failed transformation.  * 4) Convert records to TableRow.  * - Tag records that were successfully converted and those  * that failed conversion.  * 5) Insert successfully converted records into BigQuery.  * - Errors encountered while streaming will be sent to deadletter table.  * 6) Insert records that failed into deadletter table.  */  PCollection<String> sourceRead =  pipeline.apply(  TextIO.read()  .from(options.getInputFilePattern())  .watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()));  PCollectionTuple transformedOutput;  if (usePythonUdf) {  transformedOutput =  sourceRead  .apply(  "MapToRecord",  PythonExternalTextTransformer.FailsafeRowPythonExternalUdf  .stringMappingFunction())  .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)  .apply(  "InvokeUDF",  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()  .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())  .setFunctionName(options.getPythonExternalTextTransformFunctionName())  .build())  .apply(  ParDo.of(new RowToStringFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))  .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));  } else {  transformedOutput =  pipeline  // 1) Read from the text source continuously.  .apply(  "ReadFromSource",  TextIO.read()  .from(options.getInputFilePattern())  .watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))  // 2) Convert to FailsafeElement.  .apply(  "ConvertToFailsafeElement",  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())  .via(input -> FailsafeElement.of(input, input)))  // 3) Apply Javascript udf transformation.  .apply(  "ApplyUDFTransformation",  FailsafeJavascriptUdf.<String>newBuilder()  .setFileSystemPath(options.getJavascriptTextTransformGcsPath())  .setFunctionName(options.getJavascriptTextTransformFunctionName())  .setReloadIntervalMinutes(  options.getJavascriptTextTransformReloadIntervalMinutes())  .setSuccessTag(UDF_OUT)  .setFailureTag(UDF_DEADLETTER_OUT)  .build());  }  PCollectionTuple convertedTableRows =  transformedOutput  // 4) Convert records to TableRow.  .get(UDF_OUT)  .apply(  "ConvertJSONToTableRow",  FailsafeJsonToTableRow.<String>newBuilder()  .setSuccessTag(TRANSFORM_OUT)  .setFailureTag(TRANSFORM_DEADLETTER_OUT)  .build());  WriteResult writeResult =  convertedTableRows  // 5) Insert successfully converted records into BigQuery.  .get(TRANSFORM_OUT)  .apply(  "InsertIntoBigQuery",  BigQueryIO.writeTableRows()  .withJsonSchema(GCSUtils.getGcsFileAsString(options.getJSONPath()))  .to(options.getOutputTable())  .withExtendedErrorInfo()  .withoutValidation()  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)  .withWriteDisposition(WriteDisposition.WRITE_APPEND)  .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())  .withCustomGcsTempLocation(  StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())));  // Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement  PCollection<FailsafeElement<String, String>> failedInserts =  BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)  .apply(  "WrapInsertionErrors",  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())  .via(TextToBigQueryStreaming::wrapBigQueryInsertError));  // 6) Insert records that failed transformation or conversion into deadletter table  PCollectionList.of(  ImmutableList.of(  transformedOutput.get(UDF_DEADLETTER_OUT),  convertedTableRows.get(TRANSFORM_DEADLETTER_OUT),  failedInserts))  .apply("Flatten", Flatten.pCollections())  .apply(  "WriteFailedRecords",  WriteStringMessageErrors.newBuilder()  .setErrorRecordsTable(  StringUtils.isNotEmpty(options.getOutputDeadletterTable())  ? options.getOutputDeadletterTable()  : options.getOutputTable() + DEFAULT_DEADLETTER_TABLE_SUFFIX)  .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())  .build());  return pipeline.run();  }  /**  * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.  *  * @param insertError BigQueryInsert error.  * @return FailsafeElement object.  * @throws IOException  */  static FailsafeElement<String, String> wrapBigQueryInsertError(BigQueryInsertError insertError) {  FailsafeElement<String, String> failsafeElement;  try {  String rowPayload = JSON_FACTORY.toString(insertError.getRow());  String errorMessage = JSON_FACTORY.toString(insertError.getError());  failsafeElement = FailsafeElement.of(rowPayload, rowPayload);  failsafeElement.setErrorMessage(errorMessage);  } catch (IOException e) {  throw new RuntimeException(e);  }  return failsafeElement;  }  /**  * The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed  * by the executor at the command-line.  */  public interface TextToBigQueryStreamingOptions  extends TextIOToBigQuery.Options, BigQueryStorageApiStreamingOptions {  @TemplateParameter.BigQueryTable(  order = 1,  optional = true,  description = "The dead-letter table name to output failed messages to BigQuery",  helpText =  "Table for messages that failed to reach the output table. If a table doesn't exist, it is created during "  + "pipeline execution. If not specified, `<outputTableSpec>_error_records` is used.",  example = "<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>")  String getOutputDeadletterTable();  void setOutputDeadletterTable(String value);  // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned  // on when pipeline is running on ALO mode and using the Storage Write API  @TemplateParameter.Boolean(  order = 2,  optional = true,  parentName = "useStorageWriteApi",  parentTriggerValues = {"true"},  description = "Use at at-least-once semantics in BigQuery Storage Write API",  helpText =  "This parameter takes effect only if `Use BigQuery Storage Write API` is enabled. If"  + " enabled the at-least-once semantics will be used for Storage Write API, otherwise"  + " exactly-once semantics will be used.",  hiddenUi = true)  @Default.Boolean(false)  @Override  Boolean getUseStorageWriteApiAtLeastOnce();  void setUseStorageWriteApiAtLeastOnce(Boolean value);  } }