Pipeline option patterns

The samples on this page show you common pipeline configurations. For more information about pipeline configuration options, see Creating a pipeline and Configuring pipeline options.

Retroactively logging runtime parameters

Use the ValueProvider interface to access runtime parameters after completing a pipeline job.

You can use the ValueProvider interface to pass runtime parameters to your pipeline, but you can only log the parameters from within the Beam DAG. A solution is to add a pipeline branch with a DoFn that processes a placeholder value and then logs the runtime parameters:

 /** Sample of PipelineOptions with a ValueProvider option argument. */  public interface MyOptions extends PipelineOptions {  @Description("My option")  @Default.String("Hello world!")  ValueProvider<String> getStringValue();   void setStringValue(ValueProvider<String> value);  }   public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {   MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);   // Create pipeline.  Pipeline p = Pipeline.create(options);   // Add a branch for logging the ValueProvider value.  p.apply(Create.of(1))  .apply(  ParDo.of(  new DoFn<Integer, Integer>() {   // Define the DoFn that logs the ValueProvider value.  @ProcessElement  public void process(ProcessContext c) {   MyOptions ops = c.getPipelineOptions().as(MyOptions.class);  // This example logs the ValueProvider value, but you could store it by  // pushing it to an external database.   LOG.info("Option StringValue was {}", ops.getStringValue());  }  }));   // The main pipeline.  p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());   p.run();  }
import logging  import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import RuntimeValueProvider  class MyOptions(PipelineOptions):  @classmethod  def _add_argparse_args(cls, parser):  parser.add_value_provider_argument('--string_value', type=str)  class LogValueProvidersFn(beam.DoFn):  def __init__(self, string_vp):  self.string_vp = string_vp   # Define the DoFn that logs the ValueProvider value.  # The DoFn is called when creating the pipeline branch.  # This example logs the ValueProvider value, but  # you could store it by pushing it to an external database.  def process(self, an_int):  logging.info('The string_value is %s' % self.string_vp.get())  # Another option (where you don't need to pass the value at all) is:  logging.info(  'The string value is %s' %  RuntimeValueProvider.get_value('string_value', str, ''))  beam_options = PipelineOptions() args = beam_options.view_as(MyOptions)  # Create pipeline. with beam.Pipeline(options=beam_options) as pipeline:   # Add a branch for logging the ValueProvider value.  _ = (  pipeline  | beam.Create([None])  | 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value)))   # The main pipeline.  result_pc = (  pipeline  | "main_pc" >> beam.Create([1, 2, 3])  | beam.CombineGlobally(sum))