How to build an ETL pipeline with Apache Beam on Google Cloud Dataflow Lucas Arruda lucas@ciandt.com ciandt.com
● Apache Beam ○ Concepts ○ Bounded data (Batch) ○ Unbounded data (Streaming) ○ Runners / SDKs ● Big Data Landscape Table of Contents ciandt.com ● Google Cloud Dataflow ○ Capabilities of a managed service ○ Integration with other GCP products ○ Architecture Diagram Samples ○ GCP Trial + Always Free 1 of 36
Lucas Arruda Software Architect @ CI&T ciandt.com Introducing myself Specialist on Google Cloud Platform ● Google Cloud Professional - Cloud Architect Certified ● Google Developer Expert (GDE) ● Google Authorized Trainer ○ trained 200+ people so far ○ Member of Google Cloud Insiders ● Google Qualified Developer in: ● Projects for Google and Fortune 200 companies Compute Engine App Engine Cloud Storage Cloud SQL BigQuery 2 of 36
Big Data Isn't That Important: It's The Small Insights That Will Change Everything - forbes.com ciandt.com3 of 36
ciandt.com4 of 36
The real value of data and analytics lie in their ability to deliver rich insights. - localytics.com ciandt.com5 of 36
ciandt.com6 of 36
ciandt.com7 of 36
The Evolution of Apache Beam ciandt.com MapReduce BigTable DremelColossus FlumeMegastoreSpanner PubSub Millwheel Apache Beam Google Cloud Dataflow 8 of 36
● Clearly separates data processing logic from runtime requirements ● Expresses data-parallel Batch & Streaming algorithms using one unified API What is Apache Beam? ciandt.com Unified Programming Model for Batch & Streaming ● Supports execution on multiple distributed processing runtime environments 9 of 36
Beam Model Concepts Apache Beam 10 of 36
Beam Concepts ciandt.com PCollections public static void main(String[] args) { // Create the pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // Create the PCollection 'lines' by applying a 'Read' transform. PCollection<String> lines = p.apply( "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt")); } The PCollection abstraction represents a potentially distributed, multi-element data set. It is immutable and can be either bounded (size is fixed/known) or unbounded (unlimited size). 11 of 36
Beam Concepts ciandt.com PTransforms Transforms are the operations in your pipeline. A transform takes a PCollection (or more than one PCollection) as input, performs an operation that you specify on each element in that collection, and produces a new output PCollection. [Output PCollection] = [Input PCollection].apply([Transform]) The Beam SDK provides the following core transforms: ● ParDo ● GroupByKey ● Combine ● Flatten and Partition [Output PCollection] = [Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]) 12 of 36
Beam Concepts - Core PTransforms ParDo // Apply a ParDo to the PCollection PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn())); // Implementation of a DoFn static class ComputeWordLengthFn extends DoFn<String, Integer> { @ProcessElement public void processElement(ProcessContext c) { // Get the input element String word = c.element(); // Emit the output element. c.output(word.length()); } } ciandt.com Combine/Flatten // Sum.SumIntegerFn() combines the elements in the input PCollection. PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn())); // Flatten a PCollectionList of PCollection objects of a given type. PCollection<String> pc1 = ...; PCollection<String> pc2 = ...; PCollection<String> pc3 = ...; PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection<String> merged = collections.apply(Flatten.<String>pCollections( )); GroupByKey cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ... cat, 1 dog, 5 and, 1 jump, 3 tree, 2 cat, 5 dog, 2 and, 2 cat, 9 and, 6 ... ciandt.com13 of 36
Beam Concepts ciandt.com Pipeline Options Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline, any runner-specific configuration or even provide input to dynamically apply your data transformations. --<option>=<value> Custom Options MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); } 14 of 36
Beam Concepts ciandt.com Pipeline I/O Beam provides read and write transforms for a number of common data storage types, including file-based, messaging and database types. Google Cloud Platform is also supported. File-based reading p.apply(“ReadFromText”, TextIO.read().from("protocol://my_bucket/path/to/input-*.csv"); records.apply("WriteToText", TextIO.write().to("protocol://my_bucket/path/to/numbers") .withSuffix(".csv")); File-based writing BigQuery Cloud Pub/Sub Cloud Storage Cloud Bigtable Cloud Datastore Cloud Spanner 15 of 36
Bounded Data Batch ciandt.com16 of 36
Batch Processing on Bounded Data ciandt.com Image: Tyler Akidau. A finite pool of unstructured data on the left is run through a data processing engine, resulting in corresponding structured data on the right 17 of 36
Batch Processing on Bounded Data ciandt.com Image: Tyler Akidau. 18 of 36
Unbounded Data Streaming ciandt.com19 of 36
Streaming Processing on Unbounded Data ciandt.com Windowing into fixed windows by event time Windowing into fixed windows by processing time. 20 of 36 If you care about correctness in a system that also cares about time, you need to use event-time windowing.
➔ What results are calculated? ciandt.com ➔ Where in event time are results calculated? ➔ When in processing time are results materialized? ➔ How do refinements of results relate? 21 of 36
What results are calculated? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Sum.integersPerKey()); 22 of 36
Where in Event Time are results calculated? ciandt.com23 of 36
Where in Event Time? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .apply(Sum.integersPerKey()); 24 of 36
When in processing time are results materialized? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .apply(Sum.integersPerKey()); 25 of 36
How do refinements of results relate? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .accumulatingFiredPanes() .apply(Sum.integersPerKey()); 26 of 36
➔ What results are calculated? Transformations ➔ Where in event time are results calculated? Windowing ➔ When in processing time are results materialized? Watermark / Triggers ➔ How do refinements of results relate? Accumulations ciandt.com27 of 36
Runners & SDKs ciandt.com28 of 36
ciandt.com Beam Runners & SDKs ★ Beam Capability Matrix ★ Choice of SDK: Users write their pipelines in a language that’s familiar and integrated with their other tooling ★ Future Proof: keep the code you invested so much to write while big data processing solutions evolve ★ Choice of Runner: ability to choose from a variety of runtimes according to your current needs 29 of 36
<Insert here an image> Logo Dataflow fully automates management of required processing resources. Auto-scaling means optimum throughput and better overall price-to-performance. Monitor your jobs near real-time. Fully managed, no-ops environment ciandt.com Integrates with Cloud Storage, Cloud Pub/Sub, Cloud Datastore, Cloud Bigtable, and BigQuery for seamless data processing. And can be extended to interact with others sources and sinks like Apache Kafka / HDFS. Integrated with GCP products [text] Developers wishing to extend the Dataflow programming model can fork and or submit pull requests on the Apache Beam SDKs. Pipelines can also run on alternate runtimes like Spark and Flink. Based on Apache Beam’s SDK <Insert here an image> Cloud Dataflow provides built-in support for fault-tolerant execution that is consistent and correct regardless of data size, cluster size, processing pattern or pipeline complexity. Reliable & Consistent Processing 30 of 36
Built on top of Compute Engine ciandt.com Google Compute Engine ranked #1 in price-performance by Cloud Spectator 31 of 36
Examples of Data Source/Sink in GCP ciandt.com Cloud Pub/Sub Fully-managed real-time messaging service that allows you to send and receive messages between independent applications. Google BigQuery Fully-managed, serverless, petabyte scale, low cost enterprise data warehouse for analytics. BigQuery scans TB in seconds. 32 of 36
34The Products logos contained in this icon library may be used freely and without permission to accurately reference Google's technology and tools, for instance in books or architecture diagrams. Streaming Batch Analytics Engine BigQuery Authentication App Engine Log Data Cloud Storage Data Processing Cloud Dataflow Data Exploration Cloud Datalab Async Messaging Cloud Pub/Sub Gaming Logs Batch Load Real-Time Events Multiple Platforms Report & Share Business Analysis Architecture: Gaming > Gaming Analytics Architecture Diagram ciandt.com33 of 36
Monitoring & Alerting Architecture Diagram ciandt.com Data Analytics Platform Architecture Source Databases Report & Share Business Analysis StackDriverMonitoringLogging Error Reporting Scheduled Cron App Engine Data Processing Cloud Dataflow Analytics BigQuery Raw Files Cloud Storage Event-based Cloud Functions File Staging Cloud Storage 1 Ingestion with gsutil 2 Streamed Processing of data 3 Scheduled processing of data 4 Data transformation process 5 Data prepared for analysis 6 Staging of processed data 1 2 3 4 5 6 On Premise? AWS? 7 7 Generation of reports 8 Monitoring of all resources 8 34 of 36
cloud.google.com/free ciandt.com35 of 36
Thank you! - Q&A - ciandt.com36 of 36

How to build an ETL pipeline with Apache Beam on Google Cloud Dataflow

  • 1.
    How to buildan ETL pipeline with Apache Beam on Google Cloud Dataflow Lucas Arruda lucas@ciandt.com ciandt.com
  • 2.
    ● Apache Beam ○Concepts ○ Bounded data (Batch) ○ Unbounded data (Streaming) ○ Runners / SDKs ● Big Data Landscape Table of Contents ciandt.com ● Google Cloud Dataflow ○ Capabilities of a managed service ○ Integration with other GCP products ○ Architecture Diagram Samples ○ GCP Trial + Always Free 1 of 36
  • 3.
    Lucas Arruda Software Architect@ CI&T ciandt.com Introducing myself Specialist on Google Cloud Platform ● Google Cloud Professional - Cloud Architect Certified ● Google Developer Expert (GDE) ● Google Authorized Trainer ○ trained 200+ people so far ○ Member of Google Cloud Insiders ● Google Qualified Developer in: ● Projects for Google and Fortune 200 companies Compute Engine App Engine Cloud Storage Cloud SQL BigQuery 2 of 36
  • 4.
    Big Data Isn'tThat Important: It's The Small Insights That Will Change Everything - forbes.com ciandt.com3 of 36
  • 5.
  • 6.
    The real valueof data and analytics lie in their ability to deliver rich insights. - localytics.com ciandt.com5 of 36
  • 7.
  • 8.
  • 9.
    The Evolution ofApache Beam ciandt.com MapReduce BigTable DremelColossus FlumeMegastoreSpanner PubSub Millwheel Apache Beam Google Cloud Dataflow 8 of 36
  • 10.
    ● Clearly separatesdata processing logic from runtime requirements ● Expresses data-parallel Batch & Streaming algorithms using one unified API What is Apache Beam? ciandt.com Unified Programming Model for Batch & Streaming ● Supports execution on multiple distributed processing runtime environments 9 of 36
  • 11.
  • 12.
    Beam Concepts ciandt.com PCollections public staticvoid main(String[] args) { // Create the pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // Create the PCollection 'lines' by applying a 'Read' transform. PCollection<String> lines = p.apply( "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt")); } The PCollection abstraction represents a potentially distributed, multi-element data set. It is immutable and can be either bounded (size is fixed/known) or unbounded (unlimited size). 11 of 36
  • 13.
    Beam Concepts ciandt.com PTransforms Transforms arethe operations in your pipeline. A transform takes a PCollection (or more than one PCollection) as input, performs an operation that you specify on each element in that collection, and produces a new output PCollection. [Output PCollection] = [Input PCollection].apply([Transform]) The Beam SDK provides the following core transforms: ● ParDo ● GroupByKey ● Combine ● Flatten and Partition [Output PCollection] = [Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]) 12 of 36
  • 14.
    Beam Concepts -Core PTransforms ParDo // Apply a ParDo to the PCollection PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn())); // Implementation of a DoFn static class ComputeWordLengthFn extends DoFn<String, Integer> { @ProcessElement public void processElement(ProcessContext c) { // Get the input element String word = c.element(); // Emit the output element. c.output(word.length()); } } ciandt.com Combine/Flatten // Sum.SumIntegerFn() combines the elements in the input PCollection. PCollection<Integer> pc = ...; PCollection<Integer> sum = pc.apply( Combine.globally(new Sum.SumIntegerFn())); // Flatten a PCollectionList of PCollection objects of a given type. PCollection<String> pc1 = ...; PCollection<String> pc2 = ...; PCollection<String> pc3 = ...; PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection<String> merged = collections.apply(Flatten.<String>pCollections( )); GroupByKey cat, [1,5,9] dog, [5,2] and, [1,2,6] jump, [3] tree, [2] ... cat, 1 dog, 5 and, 1 jump, 3 tree, 2 cat, 5 dog, 2 and, 2 cat, 9 and, 6 ... ciandt.com13 of 36
  • 15.
    Beam Concepts ciandt.com Pipeline Options Usethe pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline, any runner-specific configuration or even provide input to dynamically apply your data transformations. --<option>=<value> Custom Options MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); } 14 of 36
  • 16.
    Beam Concepts ciandt.com Pipeline I/O Beamprovides read and write transforms for a number of common data storage types, including file-based, messaging and database types. Google Cloud Platform is also supported. File-based reading p.apply(“ReadFromText”, TextIO.read().from("protocol://my_bucket/path/to/input-*.csv"); records.apply("WriteToText", TextIO.write().to("protocol://my_bucket/path/to/numbers") .withSuffix(".csv")); File-based writing BigQuery Cloud Pub/Sub Cloud Storage Cloud Bigtable Cloud Datastore Cloud Spanner 15 of 36
  • 17.
  • 18.
    Batch Processing onBounded Data ciandt.com Image: Tyler Akidau. A finite pool of unstructured data on the left is run through a data processing engine, resulting in corresponding structured data on the right 17 of 36
  • 19.
    Batch Processing onBounded Data ciandt.com Image: Tyler Akidau. 18 of 36
  • 20.
  • 21.
    Streaming Processing onUnbounded Data ciandt.com Windowing into fixed windows by event time Windowing into fixed windows by processing time. 20 of 36 If you care about correctness in a system that also cares about time, you need to use event-time windowing.
  • 22.
    ➔ What resultsare calculated? ciandt.com ➔ Where in event time are results calculated? ➔ When in processing time are results materialized? ➔ How do refinements of results relate? 21 of 36
  • 23.
    What results arecalculated? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Sum.integersPerKey()); 22 of 36
  • 24.
    Where in EventTime are results calculated? ciandt.com23 of 36
  • 25.
    Where in EventTime? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .apply(Sum.integersPerKey()); 24 of 36
  • 26.
    When in processingtime are results materialized? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .apply(Sum.integersPerKey()); 25 of 36
  • 27.
    How do refinementsof results relate? ciandt.com PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .accumulatingFiredPanes() .apply(Sum.integersPerKey()); 26 of 36
  • 28.
    ➔ What resultsare calculated? Transformations ➔ Where in event time are results calculated? Windowing ➔ When in processing time are results materialized? Watermark / Triggers ➔ How do refinements of results relate? Accumulations ciandt.com27 of 36
  • 29.
  • 30.
    ciandt.com Beam Runners &SDKs ★ Beam Capability Matrix ★ Choice of SDK: Users write their pipelines in a language that’s familiar and integrated with their other tooling ★ Future Proof: keep the code you invested so much to write while big data processing solutions evolve ★ Choice of Runner: ability to choose from a variety of runtimes according to your current needs 29 of 36
  • 31.
    <Insert here animage> Logo Dataflow fully automates management of required processing resources. Auto-scaling means optimum throughput and better overall price-to-performance. Monitor your jobs near real-time. Fully managed, no-ops environment ciandt.com Integrates with Cloud Storage, Cloud Pub/Sub, Cloud Datastore, Cloud Bigtable, and BigQuery for seamless data processing. And can be extended to interact with others sources and sinks like Apache Kafka / HDFS. Integrated with GCP products [text] Developers wishing to extend the Dataflow programming model can fork and or submit pull requests on the Apache Beam SDKs. Pipelines can also run on alternate runtimes like Spark and Flink. Based on Apache Beam’s SDK <Insert here an image> Cloud Dataflow provides built-in support for fault-tolerant execution that is consistent and correct regardless of data size, cluster size, processing pattern or pipeline complexity. Reliable & Consistent Processing 30 of 36
  • 32.
    Built on topof Compute Engine ciandt.com Google Compute Engine ranked #1 in price-performance by Cloud Spectator 31 of 36
  • 33.
    Examples of DataSource/Sink in GCP ciandt.com Cloud Pub/Sub Fully-managed real-time messaging service that allows you to send and receive messages between independent applications. Google BigQuery Fully-managed, serverless, petabyte scale, low cost enterprise data warehouse for analytics. BigQuery scans TB in seconds. 32 of 36
  • 34.
    34The Products logoscontained in this icon library may be used freely and without permission to accurately reference Google's technology and tools, for instance in books or architecture diagrams. Streaming Batch Analytics Engine BigQuery Authentication App Engine Log Data Cloud Storage Data Processing Cloud Dataflow Data Exploration Cloud Datalab Async Messaging Cloud Pub/Sub Gaming Logs Batch Load Real-Time Events Multiple Platforms Report & Share Business Analysis Architecture: Gaming > Gaming Analytics Architecture Diagram ciandt.com33 of 36
  • 35.
    Monitoring & Alerting ArchitectureDiagram ciandt.com Data Analytics Platform Architecture Source Databases Report & Share Business Analysis StackDriverMonitoringLogging Error Reporting Scheduled Cron App Engine Data Processing Cloud Dataflow Analytics BigQuery Raw Files Cloud Storage Event-based Cloud Functions File Staging Cloud Storage 1 Ingestion with gsutil 2 Streamed Processing of data 3 Scheduled processing of data 4 Data transformation process 5 Data prepared for analysis 6 Staging of processed data 1 2 3 4 5 6 On Premise? AWS? 7 7 Generation of reports 8 Monitoring of all resources 8 34 of 36
  • 36.
  • 37.
    Thank you! - Q&A- ciandt.com36 of 36