Pipeline option patterns
The samples on this page show you common pipeline configurations. For more information about pipeline configuration options, see Creating a pipeline and Configuring pipeline options.
- Java SDK
- Python SDK
Retroactively logging runtime parameters
Use the ValueProvider interface to access runtime parameters after completing a pipeline job.
You can use the ValueProvider interface to pass runtime parameters to your pipeline, but you can only log the parameters from within the Beam DAG. A solution is to add a pipeline branch with a DoFn that processes a placeholder value and then logs the runtime parameters:
/** Sample of PipelineOptions with a ValueProvider option argument. */ public interface MyOptions extends PipelineOptions { @Description("My option") @Default.String("Hello world!") ValueProvider<String> getStringValue(); void setStringValue(ValueProvider<String> value); } public static void accessingValueProviderInfoAfterRunSnip1(String[] args) { MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); // Create pipeline. Pipeline p = Pipeline.create(options); // Add a branch for logging the ValueProvider value. p.apply(Create.of(1)) .apply( ParDo.of( new DoFn<Integer, Integer>() { // Define the DoFn that logs the ValueProvider value. @ProcessElement public void process(ProcessContext c) { MyOptions ops = c.getPipelineOptions().as(MyOptions.class); // This example logs the ValueProvider value, but you could store it by // pushing it to an external database. LOG.info("Option StringValue was {}", ops.getStringValue()); } })); // The main pipeline. p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally()); p.run(); }import logging import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import RuntimeValueProvider class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--string_value', type=str) class LogValueProvidersFn(beam.DoFn): def __init__(self, string_vp): self.string_vp = string_vp # Define the DoFn that logs the ValueProvider value. # The DoFn is called when creating the pipeline branch. # This example logs the ValueProvider value, but # you could store it by pushing it to an external database. def process(self, an_int): logging.info('The string_value is %s' % self.string_vp.get()) # Another option (where you don't need to pass the value at all) is: logging.info( 'The string value is %s' % RuntimeValueProvider.get_value('string_value', str, '')) beam_options = PipelineOptions() args = beam_options.view_as(MyOptions) # Create pipeline. with beam.Pipeline(options=beam_options) as pipeline: # Add a branch for logging the ValueProvider value. _ = ( pipeline | beam.Create([None]) | 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value))) # The main pipeline. result_pc = ( pipeline | "main_pc" >> beam.Create([1, 2, 3]) | beam.CombineGlobally(sum))Last updated on 2025/12/12
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!

