/*  * Copyright (C) 2020 Google LLC  *  * Licensed under the Apache License, Version 2.0 (the "License"); you may not  * use this file except in compliance with the License. You may obtain a copy of  * the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the  * License for the specific language governing permissions and limitations under  * the License.  */ package com.google.cloud.teleport.v2.templates; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.github.vincentrussell.json.datagenerator.JsonDataGenerator; import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException; import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.StreamingDataGeneratorOptions; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToJdbc; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToKafka; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToSpanner; import com.google.cloud.teleport.v2.utils.DurationUtils; import com.google.cloud.teleport.v2.utils.GCSUtils; import com.google.cloud.teleport.v2.utils.MetadataValidator; import java.io.ByteArrayOutputStream; import java.io.IOException; import javax.annotation.Nonnull; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.joda.time.Instant; /**  * The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a  * specified rate to either Pub/Sub, BigQuery, GCS, JDBC, or Spanner. The messages are generated  * according to a schema template which instructs the pipeline how to populate the messages with  * fake data compliant to constraints.  *  * <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.  * Use a general rule of 2,500 QPS per core in the worker pool.  *  * <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>  * for instructions on how to construct the schema file.  *  * <p>Check out <a  * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/streaming-data-generator/README_Streaming_Data_Generator.md">README</a>  * for instructions on how to use or modify this template.  */ @Template(  name = "Streaming_Data_Generator",  category = TemplateCategory.UTILITIES,  displayName = "Streaming Data Generator",  description =  "A pipeline to publish messages at specified QPS.This template can be used to benchmark"  + " performance of streaming pipelines.",  optionsClass = StreamingDataGeneratorOptions.class,  flexContainerName = "streaming-data-generator",  documentation =  "https://cloud.google.com/dataflow/docs/guides/templates/provided/streaming-data-generator",  contactInformation = "https://cloud.google.com/support",  streaming = true,  supportsAtLeastOnce = true) public class StreamingDataGenerator {  /**  * The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by  * the executor at the command-line.  */  public interface StreamingDataGeneratorOptions extends PipelineOptions {  @TemplateParameter.Long(  order = 1,  description = "Required output rate",  helpText = "Indicates rate of messages per second to be published to Pub/Sub")  @Required  Long getQps();  void setQps(Long value);  @TemplateParameter.Enum(  order = 2,  enumOptions = {@TemplateEnumOption("GAME_EVENT")},  optional = true,  description = "Schema template to generate fake data",  helpText = "Pre-existing schema template to use. The value must be one of: [GAME_EVENT]")  SchemaTemplate getSchemaTemplate();  void setSchemaTemplate(SchemaTemplate value);  @TemplateParameter.GcsReadFile(  order = 3,  optional = true,  description = "Location of Schema file to generate fake data",  helpText = "Cloud Storage path of schema location.",  example = "gs://<bucket-name>/prefix")  String getSchemaLocation();  void setSchemaLocation(String value);  @TemplateParameter.PubsubTopic(  order = 4,  optional = true,  description = "Output Pub/Sub topic",  helpText = "The name of the topic to which the pipeline should publish data.",  example = "projects/<project-id>/topics/<topic-name>")  String getTopic();  void setTopic(String value);  @TemplateParameter.Long(  order = 5,  optional = true,  description = "Maximum number of output Messages",  helpText =  "Indicates maximum number of output messages to be generated. 0 means unlimited.")  @Default.Long(0L)  Long getMessagesLimit();  void setMessagesLimit(Long value);  @TemplateParameter.Enum(  order = 6,  enumOptions = {  @TemplateEnumOption("AVRO"),  @TemplateEnumOption("JSON"),  @TemplateEnumOption("PARQUET")  },  optional = true,  description = "Output Encoding Type",  helpText = "The message Output type. Default is JSON.")  @Default.Enum("JSON")  OutputType getOutputType();  void setOutputType(OutputType value);  @TemplateParameter.GcsReadFile(  order = 7,  optional = true,  parentName = "outputType",  parentTriggerValues = {"AVRO", "PARQUET"},  description = "Location of Avro Schema file",  helpText =  "Cloud Storage path of Avro schema location. Mandatory when output type is AVRO or"  + " PARQUET.",  example = "gs://your-bucket/your-path/schema.avsc")  String getAvroSchemaLocation();  void setAvroSchemaLocation(String value);  @TemplateParameter.Enum(  order = 8,  enumOptions = {  @TemplateEnumOption("BIGQUERY"),  @TemplateEnumOption("GCS"),  @TemplateEnumOption("PUBSUB"),  @TemplateEnumOption("JDBC"),  @TemplateEnumOption("SPANNER"),  @TemplateEnumOption("KAFKA")  },  optional = true,  description = "Output Sink Type",  helpText = "The message Sink type. Default is PUBSUB")  @Default.Enum("PUBSUB")  SinkType getSinkType();  void setSinkType(SinkType value);  @TemplateParameter.BigQueryTable(  order = 9,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"BIGQUERY"},  description = "Output BigQuery table",  helpText = "Output BigQuery table. Mandatory when sinkType is BIGQUERY",  example = "<project>:<dataset>.<table_name>")  String getOutputTableSpec();  void setOutputTableSpec(String value);  @TemplateParameter.Enum(  order = 10,  enumOptions = {  @TemplateEnumOption("WRITE_APPEND"),  @TemplateEnumOption("WRITE_EMPTY"),  @TemplateEnumOption("WRITE_TRUNCATE")  },  optional = true,  parentName = "sinkType",  parentTriggerValues = {"BIGQUERY"},  description = "Write Disposition to use for BigQuery",  helpText =  "BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.")  @Default.String("WRITE_APPEND")  String getWriteDisposition();  void setWriteDisposition(String writeDisposition);  @TemplateParameter.BigQueryTable(  order = 11,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"BIGQUERY"},  description = "The dead-letter table name to output failed messages to BigQuery",  helpText =  "Messages failed to reach the output table for all kind of reasons (e.g., mismatched"  + " schema, malformed json) are written to this table. If it doesn't exist, it will"  + " be created during pipeline execution.",  example = "your-project-id:your-dataset.your-table-name")  String getOutputDeadletterTable();  void setOutputDeadletterTable(String outputDeadletterTable);  @TemplateParameter.Duration(  order = 12,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"GCS"},  description = "Window duration",  helpText =  "The window duration/size in which data will be written to Cloud Storage. Allowed"  + " formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh"  + " (for hours, example: 2h).",  example = "1m")  @Default.String("1m")  String getWindowDuration();  void setWindowDuration(String windowDuration);  @TemplateParameter.GcsWriteFolder(  order = 13,  optional = true,  description = "Output file directory in Cloud Storage",  helpText =  "The path and filename prefix for writing output files. Must end with a slash. DateTime"  + " formatting is used to parse directory path for date & time formatters.",  example = "gs://your-bucket/your-path/")  String getOutputDirectory();  void setOutputDirectory(String outputDirectory);  @TemplateParameter.Text(  order = 14,  optional = true,  description = "Output filename prefix of the files to write",  helpText = "The prefix to place on each windowed file.",  example = "output-")  @Default.String("output-")  String getOutputFilenamePrefix();  void setOutputFilenamePrefix(String outputFilenamePrefix);  @TemplateParameter.Integer(  order = 15,  optional = true,  description = "Maximum output shards",  helpText =  "The maximum number of output shards produced when writing. A higher number of shards"  + " means higher throughput for writing to Cloud Storage, but potentially higher"  + " data aggregation cost across shards when processing output Cloud Storage files."  + " Default value is decided by Dataflow.")  @Default.Integer(0)  Integer getNumShards();  void setNumShards(Integer numShards);  @TemplateParameter.Text(  order = 16,  optional = true,  regexes = {"^.+$"},  description = "JDBC driver class name.",  helpText = "JDBC driver class name to use.",  example = "com.mysql.jdbc.Driver")  String getDriverClassName();  void setDriverClassName(String driverClassName);  @TemplateParameter.Text(  order = 17,  optional = true,  regexes = {  "(^jdbc:[a-zA-Z0-9/:@.?_+!*=&-;]+$)|(^([A-Za-z0-9+/]{4}){1,}([A-Za-z0-9+/]{0,3})={0,3})"  },  description = "JDBC connection URL string.",  helpText = "Url connection string to connect to the JDBC source.",  example = "jdbc:mysql://some-host:3306/sampledb")  String getConnectionUrl();  void setConnectionUrl(String connectionUrl);  @TemplateParameter.Text(  order = 18,  optional = true,  regexes = {"^.+$"},  description = "JDBC connection username.",  helpText = "User name to be used for the JDBC connection.")  String getUsername();  void setUsername(String username);  @TemplateParameter.Password(  order = 19,  optional = true,  description = "JDBC connection password.",  helpText = "Password to be used for the JDBC connection.")  String getPassword();  void setPassword(String password);  @TemplateParameter.Text(  order = 20,  optional = true,  regexes = {"^[a-zA-Z0-9_;!*&=@#-:\\/]+$"},  description = "JDBC connection property string.",  helpText =  "Properties string to use for the JDBC connection. Format of the string must be"  + " [propertyName=property;]*.",  example = "unicode=true;characterEncoding=UTF-8")  String getConnectionProperties();  void setConnectionProperties(String connectionProperties);  @TemplateParameter.Text(  order = 21,  optional = true,  regexes = {"^.+$"},  description = "Statement which will be executed against the database.",  helpText =  "SQL statement which will be executed to write to the database. The statement must"  + " specify the column names of the table in any order. Only the values of the"  + " specified column names will be read from the json and added to the statement.",  example = "INSERT INTO tableName (column1, column2) VALUES (?,?)")  String getStatement();  void setStatement(String statement);  @TemplateParameter.ProjectId(  order = 22,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  description = "GCP Project Id of where the Spanner table lives.",  helpText = "GCP Project Id of where the Spanner table lives.")  String getProjectId();  void setProjectId(String projectId);  @TemplateParameter.Text(  order = 23,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  regexes = {"^.+$"},  description = "Cloud Spanner instance name.",  helpText = "Cloud Spanner instance name.")  String getSpannerInstanceName();  void setSpannerInstanceName(String spannerInstanceName);  @TemplateParameter.Text(  order = 24,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  regexes = {"^.+$"},  description = "Cloud Spanner database name.",  helpText = "Cloud Spanner database name.")  String getSpannerDatabaseName();  void setSpannerDatabaseName(String spannerDBName);  @TemplateParameter.Text(  order = 25,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  regexes = {"^.+$"},  description = "Cloud Spanner table name.",  helpText = "Cloud Spanner table name.")  String getSpannerTableName();  void setSpannerTableName(String spannerTableName);  @TemplateParameter.Long(  order = 26,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  description = "Max mutatated cells per batch.",  helpText =  "Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000")  Long getMaxNumMutations();  void setMaxNumMutations(Long value);  @TemplateParameter.Long(  order = 27,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  description = "Max rows per batch.",  helpText =  "Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 1000")  Long getMaxNumRows();  void setMaxNumRows(Long value);  @TemplateParameter.Long(  order = 28,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  description = "Max batch size in bytes.",  helpText =  "Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1MB")  Long getBatchSizeBytes();  void setBatchSizeBytes(Long value);  @TemplateParameter.Long(  order = 29,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"SPANNER"},  description = "Commit deadline in seconds for write requests.",  helpText = "Specifies the deadline in seconds for the Commit API call.")  Long getCommitDeadlineSeconds();  void setCommitDeadlineSeconds(Long value);  @TemplateParameter.Text(  order = 30,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"KAFKA"},  regexes = {"[,:a-zA-Z0-9._-]+"},  description = "Output Kafka Bootstrap Server",  helpText = "Kafka Bootstrap Server ",  example = "localhost:9092")  String getBootstrapServer();  void setBootstrapServer(String bootstrapServer);  @TemplateParameter.Text(  order = 31,  optional = true,  parentName = "sinkType",  parentTriggerValues = {"KAFKA"},  regexes = {"[a-zA-Z0-9._-]+"},  description = "Kafka topic to write to",  helpText = "Kafka topic to write to.",  example = "topic")  String getKafkaTopic();  void setKafkaTopic(String outputTopic);  }  /** Allowed list of existing schema templates. */  public enum SchemaTemplate {  GAME_EVENT(  "{\n"  + " \"eventId\": \"{{uuid()}}\",\n"  + " \"eventTimestamp\": {{timestamp()}},\n"  + " \"ipv4\": \"{{ipv4()}}\",\n"  + " \"ipv6\": \"{{ipv6()}}\",\n"  + " \"country\": \"{{country()}}\",\n"  + " \"username\": \"{{username()}}\",\n"  + " \"quest\": \"{{random(\"A Break In the Ice\", \"Ghosts of Perdition\", \"Survive" + " the Low Road\")}}\",\n"  + " \"score\": {{integer(100, 10000)}},\n"  + " \"completed\": {{bool()}}\n"  + "}"),  LOG_ENTRY(  "{\n"  + " \"logName\": \"{{alpha(10,20)}}\",\n"  + " \"resource\": {\n"  + " \"type\": \"{{alpha(5,10)}}\"\n"  + " },\n"  + " \"timestamp\": {{timestamp()}},\n"  + " \"receiveTimestamp\": {{timestamp()}},\n"  + " \"severity\": \"{{random(\"DEFAULT\", \"DEBUG\", \"INFO\", \"NOTICE\"," + " \"WARNING\", \"ERROR\", \"CRITICAL\", \"ERROR\")}}\",\n"  + " \"insertId\": \"{{uuid()}}\",\n"  + " \"trace\": \"{{uuid()}}\",\n"  + " \"spanId\": \"{{uuid()}}\",\n"  + " \"jsonPayload\": {\n"  + " \"bytes_sent\": {{integer(1000,20000)}},\n"  + " \"connection\": {\n"  + " \"dest_ip\": \"{{ipv4()}}\",\n"  + " \"dest_port\": {{integer(0,65000)}},\n"  + " \"protocol\": {{integer(0,6)}},\n"  + " \"src_ip\": \"{{ipv4()}}\",\n"  + " \"src_port\": {{integer(0,65000)}}\n"  + " },\n"  + " \"dest_instance\": {\n"  + " \"project_id\": \"{{concat(\"PROJECT\", integer(0,3))}}\",\n"  + " \"region\": \"{{country()}}\",\n"  + " \"vm_name\": \"{{username()}}\",\n"  + " \"zone\": \"{{state()}}\"\n"  + " },\n"  + " \"end_time\": {{timestamp()}},\n"  + " \"packets_sent\": {{integer(100,400)}},\n"  + " \"reporter\": \"{{random(\"SRC\", \"DEST\")}}\",\n"  + " \"rtt_msec\": {{integer(0,20)}},\n"  + " \"start_time\": {{timestamp()}}\n"  + " }\n"  + "}");  private final String schema;  SchemaTemplate(String schema) {  this.schema = schema;  }  public String getSchema() {  return schema;  }  }  /** Allowed list of message encoding types. */  public enum OutputType {  JSON(".json"),  AVRO(".avro"),  PARQUET(".parquet");  private final String fileExtension;  /** Sets file extension associated with output type. */  OutputType(String fileExtension) {  this.fileExtension = fileExtension;  }  /** Returns file extension associated with output type. */  public String getFileExtension() {  return fileExtension;  }  }  /** Allowed list of sink types. */  public enum SinkType {  PUBSUB,  BIGQUERY,  GCS,  JDBC,  SPANNER,  KAFKA  }  /**  * The main entry-point for pipeline execution. This method will start the pipeline but will not  * wait for it's execution to finish. If blocking execution is required, use the {@link  * StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and  * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.  *  * @param args command-line args passed by the executor.  */  public static void main(String[] args) {  UncaughtExceptionLogger.register();  StreamingDataGeneratorOptions options =  PipelineOptionsFactory.fromArgs(args)  .withValidation()  .as(StreamingDataGeneratorOptions.class);  run(options);  }  /**  * Runs the pipeline to completion with the specified options. This method does not wait until the  * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result  * object to block until the pipeline is finished running if blocking programmatic execution is  * required.  *  * @param options the execution options.  * @return the pipeline result.  */  public static PipelineResult run(@Nonnull StreamingDataGeneratorOptions options) {  checkNotNull(options, "options argument to run method cannot be null.");  MetadataValidator.validate(options);  // FileSystems does not set the default configuration in workers till Pipeline.run  // Explicitly registering standard file systems.  FileSystems.setDefaultPipelineOptions(options);  String schema = getSchema(options.getSchemaTemplate(), options.getSchemaLocation());  // Create the pipeline  Pipeline pipeline = Pipeline.create(options);  /*  * Steps:  * 1) Trigger at the supplied QPS  * 2) Generate messages containing fake data  * 3) Write messages to appropriate Sink  */  PCollection<byte[]> generatedMessages =  pipeline  .apply("Trigger", createTrigger(options))  .apply("Generate Fake Messages", ParDo.of(new MessageGeneratorFn(schema)));  if (options.getSinkType().equals(SinkType.GCS)) {  generatedMessages =  generatedMessages.apply(  options.getWindowDuration() + " Window",  Window.into(  FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));  }  generatedMessages.apply(  "Write To " + options.getSinkType().name(), createSink(options, schema));  return pipeline.run();  }  /**  * Creates either Bounded or UnBounded Source based on messageLimit pipeline option.  *  * @param options the pipeline options.  */  private static GenerateSequence createTrigger(@Nonnull StreamingDataGeneratorOptions options) {  checkNotNull(options, "options argument to createTrigger method cannot be null.");  GenerateSequence generateSequence =  GenerateSequence.from(0L)  .withRate(options.getQps(), /* periodLength= */ Duration.standardSeconds(1L));  return options.getMessagesLimit() > 0  ? generateSequence.to(options.getMessagesLimit())  : generateSequence;  }  /**  * The {@link MessageGeneratorFn} class generates fake messages based on supplied schema  *  * <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>  * for instructions on how to construct the schema file.  */  @VisibleForTesting  static class MessageGeneratorFn extends DoFn<Long, byte[]> {  // Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.  private transient JsonDataGenerator dataGenerator;  private final String schema;  MessageGeneratorFn(String schema) {  this.schema = schema;  }  @Setup  public void setup() {  dataGenerator = new JsonDataGeneratorImpl();  }  @ProcessElement  public void processElement(  @Element Long element,  @Timestamp Instant timestamp,  OutputReceiver<byte[]> receiver,  ProcessContext context)  throws IOException, JsonDataGeneratorException {  byte[] payload;  // Generate the fake JSON according to the schema.  try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {  dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);  payload = byteArrayOutputStream.toByteArray();  }  receiver.output(payload);  }  }  /**  * Creates appropriate sink based on sinkType pipeline option.  *  * @param options the pipeline options.  */  @VisibleForTesting  static PTransform<PCollection<byte[]>, PDone> createSink(  @Nonnull StreamingDataGeneratorOptions options, @Nonnull String schema) {  checkNotNull(options, "options argument to createSink method cannot be null.");  checkNotNull(schema, "schema argument to createSink method cannot be null.");  switch (options.getSinkType()) {  case PUBSUB:  checkArgument(  options.getTopic() != null,  String.format(  "Missing required value --topic for %s sink type", options.getSinkType().name()));  return StreamingDataGeneratorWriteToPubSub.Writer.builder(options, schema).build();  case BIGQUERY:  checkArgument(  options.getOutputTableSpec() != null,  String.format(  "Missing required value --outputTableSpec in format"  + " <project>:<dataset>.<table_name> for %s sink type",  options.getSinkType().name()));  return StreamingDataGeneratorWriteToBigQuery.builder(options).build();  case GCS:  checkArgument(  options.getOutputDirectory() != null,  String.format(  "Missing required value --outputDirectory in format gs:// for %s sink type",  options.getSinkType().name()));  return StreamingDataGeneratorWriteToGcs.builder(options).build();  case JDBC:  checkArgument(  options.getDriverClassName() != null,  String.format(  "Missing required value --driverClassName for %s sink type",  options.getSinkType().name()));  checkArgument(  options.getConnectionUrl() != null,  String.format(  "Missing required value --connectionUrl for %s sink type",  options.getSinkType().name()));  checkArgument(  options.getStatement() != null,  String.format(  "Missing required value --statement for %s sink type",  options.getSinkType().name()));  return StreamingDataGeneratorWriteToJdbc.builder(options).build();  case SPANNER:  checkArgument(  options.getProjectId() != null,  String.format(  "Missing required value --projectId for %s sink type",  options.getSinkType().name()));  checkArgument(  options.getSpannerInstanceName() != null,  String.format(  "Missing required value --spannerInstanceName for %s sink type",  options.getSinkType().name()));  checkArgument(  options.getSpannerDatabaseName() != null,  String.format(  "Missing required value --spannerDatabaseName for %s sink type",  options.getSinkType().name()));  checkArgument(  options.getSpannerTableName() != null,  String.format(  "Missing required value --spannerTableName for %s sink type",  options.getSinkType().name()));  return StreamingDataGeneratorWriteToSpanner.builder(options).build();  case KAFKA:  checkArgument(  options.getBootstrapServer() != null,  String.format(  "Missing required value --bootstrapServer for %s sink type",  options.getSinkType().name()));  checkArgument(  options.getKafkaTopic() != null,  String.format(  "Missing required value --kafkaTopic for %s sink type",  options.getSinkType().name()));  return StreamingDataGeneratorWriteToKafka.Writer.builder(options).build();  default:  throw new IllegalArgumentException("Unsupported Sink.");  }  }  private static String getSchema(SchemaTemplate schemaTemplate, String schemaLocation) {  checkArgument(  schemaTemplate != null || schemaLocation != null,  "Either schemaTemplate or schemaLocation argument of MessageGeneratorFn class must be"  + " provided.");  if (schemaLocation != null) {  return GCSUtils.getGcsFileAsString(schemaLocation);  } else {  return schemaTemplate.getSchema();  }  } }