Low-latency ingestion and analytics with Apache Kafka and Apache Apex Thomas Weise, Architect DataTorrent, PPMC member Apache Apex March 28th 2016
Apache Apex Features • In-memory Stream Processing • Scale out, Distributed, Parallel, High Throughput • Windowing (temporal boundary) • Reliability, Fault Tolerance • Operability • YARN native • Compute Locality • Dynamic updates 2
Apex Platform Overview 3
Apache Apex Malhar Library 4
Apache Kafka 5 “A high-throughput distributed messaging system.” “Fast, Scalable, Durable, Distributed” Kafka is a natural fit to deliver events into Apex for low-latency processing.
Kafka Integration - Consumer 6 • Low-latency, high throughput ingest • Scales with Kafka topics ᵒ Auto-partitioning ᵒ Flexible and customizable partition mapping • Fault-tolerance (in 0.8 based on SimpleConsumer) ᵒ Metadata monitoring/failover to new broker ᵒ Offset checkpointing ᵒ Idempotency ᵒ External offset storage • Support for multiple clusters ᵒ Built for better resource utilization • Bandwidth control ᵒ Bytes per second
Kafka Integration - Producer 7 • Output operator is a Kafka producer • Exactly once strategy ᵒ On failure data already sent to message queue should not be re-sent ᵒ Sends a key along with data that is monotonically increasing ᵒ On recovery operator asks the message queue for the last sent message • Gets the recovery key from the message ᵒ Ignores all replayed data with key that is less than or equal to the recovered key ᵒ If the key is not monotonically increasing then data can be sorted on the key at the end of the window and sent to message queue • Implemented in operator AbstractExactlyOnceKafkaOutputOperator in apache/incubator-apex-malhar github repository available here
Apex Application Specification 8
Logical and Physical Plan 9
Partitioning 10 NxM PartitionsUnifier 0 1 2 3 Logical DAG 0 1 2 1 1 Unifier 1 20 Logical Diagram Physical Diagram with operator 1 with 3 partitions 0 Unifier 1a 1b 1c 2a 2b Unifier 3 Physical DAG with (1a, 1b, 1c) and (2a, 2b): No bottleneck Unifier Unifier0 1a 1b 1c 2a 2b Unifier 3 Physical DAG with (1a, 1b, 1c) and (2a, 2b): Bottleneck on intermediate Unifier
Advanced Partitioning 11 0 1a 1b 2 3 4Unifier Physical DAG 0 4 3a2a1a 1b 2b 3b Unifier Physical DAG with Parallel Partition Parallel Partition Container uopr uopr1 uopr2 uopr3 uopr4 uopr1 uopr2 uopr3 uopr4 dopr dopr doprunifier unifier unifier unifier Container Container NICNIC NICNIC NIC Container NIC Logical Plan Execution Plan, for N = 4; M = 1 Execution Plan, for N = 4; M = 1, K = 2 with cascading unifiers Cascading Unifiers 0 1 2 3 4 Logical DAG
Dynamic Scaling 12  Partitioning change while application is running • Change number of partitions at runtime based on stats • Determine initial number of partitions dynamically – Kafka operators scale according to number of Kafka partitions • Supports re-distribution of state when number of partitions change • API for custom scaling or partitioning 2b 2c 3 2a 2d 1b 1a1a 2a 1b 2b 3 1a 2b 1b 2c 3b 2a 2d 3a Unifiers not shown
Fault Tolerance 13 • Operator state is checkpointed to persistent store ᵒ Automatically performed by engine, no additional coding needed ᵒ Asynchronous and distributed ᵒ In case of failure operators are restarted from checkpoint state • Automatic detection and recovery of failed containers ᵒ Heartbeat mechanism ᵒ YARN process status notification • Buffering to enable replay of data from recovered point ᵒ Fast, incremental recovery, spike handling • Application master state checkpointed ᵒ Snapshot of physical (and logical) plan ᵒ Execution layer change log
Streaming Windows 14  Application window  Sliding window and tumbling window  Checkpoint window  No artificial latency
Checkpointing Operator State 15 • Save state of operator so that it can be recovered on failure • Pluggable storage handler • Default implementation ᵒ Serialization with Kryo ᵒ All non-transient fields serialized ᵒ Serialized state written to HDFS ᵒ Writes asynchronous, non-blocking • Possible to implement custom handlers for alternative approach to extract state or different storage backend (such as IMDG) • For operators that rely on previous state for computation ᵒ Operators can be marked @Stateless to skip checkpointing • Checkpoint frequency tunable (by default 30s) ᵒ Based on streaming windows for consistent state
Processing Guarantees 16 At-least-once • On recovery data will be replayed from a previous checkpoint ᵒ No messages lost ᵒ Default, suitable for most applications • Can be used to ensure data is written once to store ᵒ Transactions with meta information, Rewinding output, Feedback from external entity, Idempotent operations At-most-once • On recovery the latest data is made available to operator ᵒ Useful in use cases where some data loss is acceptable and latest data is sufficient Exactly-once ᵒ At-least-once + idempotency + transactional mechanisms (operator logic) to achieve end-to-end exactly once behavior
Idempotency with Kafka Consumer 17
Use Case – Ad Tech Customer: • Leading digital automation software company for publishers • Helps publishers monetize their digital assets • Enables publishers to make smarter inventory decisions and improve revenue Features: • Reporting of critical metrics from auctions and client logs • Revenue, impression, and click information • Aggregate counters and reporting on top N metrics • Low latency querying using pub-sub model 18
Use Case – Ad Tech 19 User Browser AdServer REST proxy REST proxy Kafka Cluster Client logs Kafka Input (Auction logs) Kafka Input (Client logs) CDN (Caching of logs) ETL ETL Filter Filter Dimensions Aggregator Dimensions Aggregator Dimensions Store Query Query Result Kafka Cluster Auction Logs Client logs Middleware Auction Logs Client logs Kafka Messages Kafka Messages Decompress & Flatten Decompress & Flatten Filtered Events Filtered Events Aggregates Query from MW Query Query Results Kafka Cluster
Use Case – Ad Tech 20
Use Case – Ad Tech • 15+ billion impressions per day • Average data inflow of 200K events/sec • 64 Kafka Input operators reading from 6 geographically distributed DCs • 32 instances of in-memory distributed store • 64 aggregators • ~150 container processes, 30+ nodes • 1.2 TB memory footprint @ peak load 21
Resources 22 • Exactly-once processing: https://www.datatorrent.com/blog/end-to-end- exactly-once-with-apache-apex/ • Examples with Kafka and Files: https://github.com/tweise/apex- samples/tree/master/exactly-once • Learn more: http://apex.incubator.apache.org/docs.html • Subscribe - http://apex.incubator.apache.org/community.html • Download - http://apex.incubator.apache.org/downloads.html • Apex website - http://apex.incubator.apache.org/ • Follow @ApacheApex - https://twitter.com/apacheapex • Meetups - http://www.meetup.com/topics/apache-apex
Q&A 23

Stream data from Apache Kafka for processing with Apache Apex

  • 1.
    Low-latency ingestion andanalytics with Apache Kafka and Apache Apex Thomas Weise, Architect DataTorrent, PPMC member Apache Apex March 28th 2016
  • 2.
    Apache Apex Features •In-memory Stream Processing • Scale out, Distributed, Parallel, High Throughput • Windowing (temporal boundary) • Reliability, Fault Tolerance • Operability • YARN native • Compute Locality • Dynamic updates 2
  • 3.
  • 4.
  • 5.
    Apache Kafka 5 “A high-throughputdistributed messaging system.” “Fast, Scalable, Durable, Distributed” Kafka is a natural fit to deliver events into Apex for low-latency processing.
  • 6.
    Kafka Integration -Consumer 6 • Low-latency, high throughput ingest • Scales with Kafka topics ᵒ Auto-partitioning ᵒ Flexible and customizable partition mapping • Fault-tolerance (in 0.8 based on SimpleConsumer) ᵒ Metadata monitoring/failover to new broker ᵒ Offset checkpointing ᵒ Idempotency ᵒ External offset storage • Support for multiple clusters ᵒ Built for better resource utilization • Bandwidth control ᵒ Bytes per second
  • 7.
    Kafka Integration -Producer 7 • Output operator is a Kafka producer • Exactly once strategy ᵒ On failure data already sent to message queue should not be re-sent ᵒ Sends a key along with data that is monotonically increasing ᵒ On recovery operator asks the message queue for the last sent message • Gets the recovery key from the message ᵒ Ignores all replayed data with key that is less than or equal to the recovered key ᵒ If the key is not monotonically increasing then data can be sorted on the key at the end of the window and sent to message queue • Implemented in operator AbstractExactlyOnceKafkaOutputOperator in apache/incubator-apex-malhar github repository available here
  • 8.
  • 9.
  • 10.
    Partitioning 10 NxM PartitionsUnifier 0 12 3 Logical DAG 0 1 2 1 1 Unifier 1 20 Logical Diagram Physical Diagram with operator 1 with 3 partitions 0 Unifier 1a 1b 1c 2a 2b Unifier 3 Physical DAG with (1a, 1b, 1c) and (2a, 2b): No bottleneck Unifier Unifier0 1a 1b 1c 2a 2b Unifier 3 Physical DAG with (1a, 1b, 1c) and (2a, 2b): Bottleneck on intermediate Unifier
  • 11.
    Advanced Partitioning 11 0 1a 1b 2 34Unifier Physical DAG 0 4 3a2a1a 1b 2b 3b Unifier Physical DAG with Parallel Partition Parallel Partition Container uopr uopr1 uopr2 uopr3 uopr4 uopr1 uopr2 uopr3 uopr4 dopr dopr doprunifier unifier unifier unifier Container Container NICNIC NICNIC NIC Container NIC Logical Plan Execution Plan, for N = 4; M = 1 Execution Plan, for N = 4; M = 1, K = 2 with cascading unifiers Cascading Unifiers 0 1 2 3 4 Logical DAG
  • 12.
    Dynamic Scaling 12  Partitioningchange while application is running • Change number of partitions at runtime based on stats • Determine initial number of partitions dynamically – Kafka operators scale according to number of Kafka partitions • Supports re-distribution of state when number of partitions change • API for custom scaling or partitioning 2b 2c 3 2a 2d 1b 1a1a 2a 1b 2b 3 1a 2b 1b 2c 3b 2a 2d 3a Unifiers not shown
  • 13.
    Fault Tolerance 13 • Operatorstate is checkpointed to persistent store ᵒ Automatically performed by engine, no additional coding needed ᵒ Asynchronous and distributed ᵒ In case of failure operators are restarted from checkpoint state • Automatic detection and recovery of failed containers ᵒ Heartbeat mechanism ᵒ YARN process status notification • Buffering to enable replay of data from recovered point ᵒ Fast, incremental recovery, spike handling • Application master state checkpointed ᵒ Snapshot of physical (and logical) plan ᵒ Execution layer change log
  • 14.
    Streaming Windows 14  Applicationwindow  Sliding window and tumbling window  Checkpoint window  No artificial latency
  • 15.
    Checkpointing Operator State 15 •Save state of operator so that it can be recovered on failure • Pluggable storage handler • Default implementation ᵒ Serialization with Kryo ᵒ All non-transient fields serialized ᵒ Serialized state written to HDFS ᵒ Writes asynchronous, non-blocking • Possible to implement custom handlers for alternative approach to extract state or different storage backend (such as IMDG) • For operators that rely on previous state for computation ᵒ Operators can be marked @Stateless to skip checkpointing • Checkpoint frequency tunable (by default 30s) ᵒ Based on streaming windows for consistent state
  • 16.
    Processing Guarantees 16 At-least-once • Onrecovery data will be replayed from a previous checkpoint ᵒ No messages lost ᵒ Default, suitable for most applications • Can be used to ensure data is written once to store ᵒ Transactions with meta information, Rewinding output, Feedback from external entity, Idempotent operations At-most-once • On recovery the latest data is made available to operator ᵒ Useful in use cases where some data loss is acceptable and latest data is sufficient Exactly-once ᵒ At-least-once + idempotency + transactional mechanisms (operator logic) to achieve end-to-end exactly once behavior
  • 17.
  • 18.
    Use Case –Ad Tech Customer: • Leading digital automation software company for publishers • Helps publishers monetize their digital assets • Enables publishers to make smarter inventory decisions and improve revenue Features: • Reporting of critical metrics from auctions and client logs • Revenue, impression, and click information • Aggregate counters and reporting on top N metrics • Low latency querying using pub-sub model 18
  • 19.
    Use Case –Ad Tech 19 User Browser AdServer REST proxy REST proxy Kafka Cluster Client logs Kafka Input (Auction logs) Kafka Input (Client logs) CDN (Caching of logs) ETL ETL Filter Filter Dimensions Aggregator Dimensions Aggregator Dimensions Store Query Query Result Kafka Cluster Auction Logs Client logs Middleware Auction Logs Client logs Kafka Messages Kafka Messages Decompress & Flatten Decompress & Flatten Filtered Events Filtered Events Aggregates Query from MW Query Query Results Kafka Cluster
  • 20.
    Use Case –Ad Tech 20
  • 21.
    Use Case –Ad Tech • 15+ billion impressions per day • Average data inflow of 200K events/sec • 64 Kafka Input operators reading from 6 geographically distributed DCs • 32 instances of in-memory distributed store • 64 aggregators • ~150 container processes, 30+ nodes • 1.2 TB memory footprint @ peak load 21
  • 22.
    Resources 22 • Exactly-once processing:https://www.datatorrent.com/blog/end-to-end- exactly-once-with-apache-apex/ • Examples with Kafka and Files: https://github.com/tweise/apex- samples/tree/master/exactly-once • Learn more: http://apex.incubator.apache.org/docs.html • Subscribe - http://apex.incubator.apache.org/community.html • Download - http://apex.incubator.apache.org/downloads.html • Apex website - http://apex.incubator.apache.org/ • Follow @ApacheApex - https://twitter.com/apacheapex • Meetups - http://www.meetup.com/topics/apache-apex
  • 23.

Editor's Notes

  • #3 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #6 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #7 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #8 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #19 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #20 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #21 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries
  • #22 Partitioning & Scaling built-in Operators can be dynamically scaled Throughput, latency or any custom logic Streams can be split in flexible ways Tuple hashcode, tuple field or custom logic Parallel partitioning for parallel pipelines MxN partitioning for generic pipelines Unifier concept for merging results from partitions Helps in handling skew imbalance Advanced Windowing support Application window configurable per operator Sliding window and tumbling window support Checkpoint window control for fault recovery Windowing does not introduce artificial latency Stateful fault tolerance out of the box Operators recover automatically from a precise point before failure At least once At most once Exactly once at window boundaries