Kafka Input Operator Siyuan Hua, DataTorrent, Committer Apache Apex Apr 6, 2016 1
Feature Overview Apache Apex Meetup 0.8 (Simple Consumer) 0.9 LoC 5900 2406 Fault-Tolerant Yes (At least once, exactly once) Yes (At least once, exactly once) Scalability Scale with Kafka(static and dynamic) Scale with Kafka(static and dynamic) Multi-Cluster/Topic Yes Yes Throughput throttle Yes Yes Idempotent Yes Yes 2
Feature Overview Apache Apex Meetup 0.8 (Simple Consumer) 0.9 Offset Management Customized management Implicit but out-of-box management Partition Strategy 1:1, 1:M, Dynamic(Unstable), Customized 1:1, 1:M, Customized Dependency Both public and internal API Public API Metrics report Using old Counters API Using new Apex @AutoMetric 3
0.8 Kafka Input Operator Apache Apex Meetup ● Only Simple Consumer can deliver all features ● High-level Consumer doesn’t support customized assignor and sticky partition ● Have to deal with the metadata change in operator code ● One shared consumer per broker model ● 2.5 years old! (Tested and mature) 4
0.9 Kafka Input Operator Apache Apex Meetup ● Use Assign API comes with 0.9 Consumer class ● Assign API is good replacement for Simple Consumer in the new Kafka Input Operator ● Partitions are explicitly assigned to each operator instance ● Consumer is shared to all assigned partitions ● Operator doesn’t need to handle metadata change, broker failure ● 2 month old! 5
Workflow Apache Apex Meetup 6
Partition Strategy Apache Apex Meetup 1 to 1 Partition 1 to N Partition 7
Customized Partition Strategy Apache Apex Meetup Public abstract class AbstractKafkaPartitioner { ... abstract List<Set<PartitionMeta>> assign(Map<String, Map<String,List<PartitionInfo>>> metadata) ... void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) … Response processStats(BatchedOperatorStats batchedOperatorStats) } Customized Partition Strategy 8
Partition Strategy (Con’t) Apache Apex Meetup ● Sticky Partition (Each operator instance only consumes from Kafka partitions that are assigned by AM) is BEST practice! 9
Offset Checkpointing Apache Apex Meetup W = last offset in window i W W W Current offset Downstream operator window . . . . . . . . . . . . Check pointed offsets with window id i k j i 10
11 Offset Commitment (0.8 Operator) W = last offset in window i . . . . . . . . . . . . W Current offset Commit Window i i i Application Master Offset Manager
12 Offset Commitment (0.8 Operator) Public interface OffsetManager { ... public Map<KafkaPartition, Long> loadInitialOffsets(); ... public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions); }
Offset Commitment (0.9 Operator) Apache Apex Meetup W = last offset in window i . . . . . . . . . . . . W Current offset . . . Commit Window i Offset Topic contains App name i i 13
Some important properties Apache Apex Meetup ● initialOffset ● topics ● clusters ● strategy ● maxTuplesPerWindow ● initialPartitionCount ● consumerProps 14 ● initialOffset ● consumer.topic ● consumer.zookeeper ● strategy ● maxTuplesPerWindow ● initialPartitionCount ● offsetManager 0.8 Operator 0.9 Operator
MapR Streams support Apache Apex Meetup ● MapR Streams is compatible with 0.9 Kafka client API ● The 0.9 Input Operator has been tested with MapR sandbox and all major features are working without any code change ● Use MapR Streams Client library instead of Kafka one ● Leave “clusters” property empty because MapR doesn’t require broker host name settings ● Support special character “/” in topic name because MapR Streams topic name is just path to the topic file ● Multi-cluster is not supported 15
Performance : Kafka Input Operator Apache Apex Meetup ● 4 Kafka Brokers - 8 partitions ● 1 Zookeeper ● Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz ● 256GB RAM ● 10 GigE between nodes ● Use yahoo streaming benchmark application(https://github.com/yahoo/streaming-benchmarks) ● 940567 msg/S 245Bytes/Msg for 0.8 Input Operator ● 850000 msg/s 245Bytes/Msg for 0.9Input Operator
Q & A Apache Apex Meetup Follow Apex meetups: http://apex.incubator.apache.org/announcements.html Learn more about Apex: http://apex.incubator.apache.org/docs.html 17

Apache Apex Kafka Input Operator

  • 1.
    Kafka Input Operator SiyuanHua, DataTorrent, Committer Apache Apex Apr 6, 2016 1
  • 2.
    Feature Overview Apache ApexMeetup 0.8 (Simple Consumer) 0.9 LoC 5900 2406 Fault-Tolerant Yes (At least once, exactly once) Yes (At least once, exactly once) Scalability Scale with Kafka(static and dynamic) Scale with Kafka(static and dynamic) Multi-Cluster/Topic Yes Yes Throughput throttle Yes Yes Idempotent Yes Yes 2
  • 3.
    Feature Overview Apache ApexMeetup 0.8 (Simple Consumer) 0.9 Offset Management Customized management Implicit but out-of-box management Partition Strategy 1:1, 1:M, Dynamic(Unstable), Customized 1:1, 1:M, Customized Dependency Both public and internal API Public API Metrics report Using old Counters API Using new Apex @AutoMetric 3
  • 4.
    0.8 Kafka InputOperator Apache Apex Meetup ● Only Simple Consumer can deliver all features ● High-level Consumer doesn’t support customized assignor and sticky partition ● Have to deal with the metadata change in operator code ● One shared consumer per broker model ● 2.5 years old! (Tested and mature) 4
  • 5.
    0.9 Kafka InputOperator Apache Apex Meetup ● Use Assign API comes with 0.9 Consumer class ● Assign API is good replacement for Simple Consumer in the new Kafka Input Operator ● Partitions are explicitly assigned to each operator instance ● Consumer is shared to all assigned partitions ● Operator doesn’t need to handle metadata change, broker failure ● 2 month old! 5
  • 6.
  • 7.
    Partition Strategy Apache ApexMeetup 1 to 1 Partition 1 to N Partition 7
  • 8.
    Customized Partition Strategy ApacheApex Meetup Public abstract class AbstractKafkaPartitioner { ... abstract List<Set<PartitionMeta>> assign(Map<String, Map<String,List<PartitionInfo>>> metadata) ... void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) … Response processStats(BatchedOperatorStats batchedOperatorStats) } Customized Partition Strategy 8
  • 9.
    Partition Strategy (Con’t) ApacheApex Meetup ● Sticky Partition (Each operator instance only consumes from Kafka partitions that are assigned by AM) is BEST practice! 9
  • 10.
    Offset Checkpointing Apache ApexMeetup W = last offset in window i W W W Current offset Downstream operator window . . . . . . . . . . . . Check pointed offsets with window id i k j i 10
  • 11.
    11 Offset Commitment (0.8Operator) W = last offset in window i . . . . . . . . . . . . W Current offset Commit Window i i i Application Master Offset Manager
  • 12.
    12 Offset Commitment (0.8Operator) Public interface OffsetManager { ... public Map<KafkaPartition, Long> loadInitialOffsets(); ... public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions); }
  • 13.
    Offset Commitment (0.9Operator) Apache Apex Meetup W = last offset in window i . . . . . . . . . . . . W Current offset . . . Commit Window i Offset Topic contains App name i i 13
  • 14.
    Some important properties ApacheApex Meetup ● initialOffset ● topics ● clusters ● strategy ● maxTuplesPerWindow ● initialPartitionCount ● consumerProps 14 ● initialOffset ● consumer.topic ● consumer.zookeeper ● strategy ● maxTuplesPerWindow ● initialPartitionCount ● offsetManager 0.8 Operator 0.9 Operator
  • 15.
    MapR Streams support ApacheApex Meetup ● MapR Streams is compatible with 0.9 Kafka client API ● The 0.9 Input Operator has been tested with MapR sandbox and all major features are working without any code change ● Use MapR Streams Client library instead of Kafka one ● Leave “clusters” property empty because MapR doesn’t require broker host name settings ● Support special character “/” in topic name because MapR Streams topic name is just path to the topic file ● Multi-cluster is not supported 15
  • 16.
    Performance : KafkaInput Operator Apache Apex Meetup ● 4 Kafka Brokers - 8 partitions ● 1 Zookeeper ● Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz ● 256GB RAM ● 10 GigE between nodes ● Use yahoo streaming benchmark application(https://github.com/yahoo/streaming-benchmarks) ● 940567 msg/S 245Bytes/Msg for 0.8 Input Operator ● 850000 msg/s 245Bytes/Msg for 0.9Input Operator
  • 17.
    Q & A ApacheApex Meetup Follow Apex meetups: http://apex.incubator.apache.org/announcements.html Learn more about Apex: http://apex.incubator.apache.org/docs.html 17