The Power of Both Choices Practical Load Balancing for Distributed Stream Processing Engines Muhammad Anis Uddin Nasir, Gianmarco De Francisci Morales, David Garcia-Soriano Nicolas Kourtellis, Marco Serafini International Conference on Data Engineering (ICDE 2015)
Stream Processing Engines • Streaming Application – Online Machine Learning – Real Time Query Processing – Continuous Computation • Streaming Frameworks – Storm, Borealis, S4, Samza, Spark Streaming 2The Power of Both Choices
Stream Processing Model • Streaming Applications are represented by Directed Acyclic Graphs (DAGs) Worker Worker Worker Source Source 3The Power of Both Choices Data Stream Operators Data Channels
Stream Grouping • Key or Fields Grouping – Hash-based assignment – Stateful operations, e.g., page rank, degree count • Shuffle Grouping – Round-robin assignment – Stateless operations, e.g., data logging, OLTP 4The Power of Both Choices
Key Grouping 5The Power of Both Choices • Key Grouping • Scalable ✔ • Low Memory ✔ • Load Imbalance ✖
Shuffle Grouping 6The Power of Both Choices • Shuffle Grouping • Load Balance ✔ • Memory O(W) ✖ • Aggregation O(W) ✖
Problem Formulation • Input is a unbounded sequence of messages from a key distribution • Each message is assigned to a worker for processing • Load balance properties – Memory Load Balance – Network Load Balance – Processing Load Balance • Metric: Load Imbalance The Power of Both Choices 7
Power of two choices • Balls-and-bins problem • Algorithm – For each ball, pick two bins uniformly at random – Assign the ball to least loaded of the two bins • Issues – Distributed ✖ – Consensus on Keys ✖ – Skewed distribution ✖ – Continuous Data✖ – Load Information ✖ 8The Power of Both Choices Img source: http://s17.postimg.org/qqctbpftr/Galton_prime_box.jpg
Partial Key Grouping (PKG) • Key Splitting – Split each key into two server – Assign each instance using power of two choices • Local Load Estimation – each source estimates load on – using the local routing history 9The Power of Both Choices
Partial Key Grouping (PKG) • Key Splitting • Local Load Estimation The Power of Both Choices 10 Source Source Worker Worker Worker 2 0 1 1 0 2 2 0 2
Partial Key Grouping (PKG) • Key Splitting – Distributed – Stateless – Handle Skew • Local load estimation – No coordination among sources – No communication with workers 11The Power of Both Choices
Partial Key Grouping The Power of Both Choices 12 • PKG • Load Balance ✔ • Memory O(1) ✔ • Aggregation O(1) ✔
Analysis: Chromatic Balls and Bins • Problem Formulation – If messages are drawn from a key distribution where probabilities of keys are p1≥p2≥p3….. ≥pn – Each key has d choices out of n workers • Minimize the difference between maximum and average workload The Power of Both Choices 13
Analysis • Necessary Condition: If pi represents the probability of occurrence of a key i • Bounds: The Power of Both Choices 14
Streaming Applications • Most algorithms that use Shuffle Grouping can be expressed using Partial Key Grouping to reduce: – Memory footprint – Aggregation overhead • Algorithms that use Key Grouping can be rewritten to achieve load balance The Power of Both Choices 15
Streaming Examples • Naïve Bayes Classifier • Streaming Parallel Decision Trees • Heavy Hitters and Space Saving The Power of Both Choices 16
Stream Grouping: Summary Grouping • Pros • Cons Key Grouping • Scalable • Memory • Load Imbalance Shuffle Grouping • Load Balance • Memory O(W) • Aggregation O(W) Partial Key Grouping • Scalable • Load Balance • Memory O(1) • Aggregation O(1) The Power of Both Choices 17
Experiments • What is the effect of key splitting on POTC? • How does local estimation compare to a global oracle? • How does PKG perform on a real deployment on Apache Storm? The Power of Both Choices 18
Experimental Setup • Metric – the difference of maximum and the average load of the workers at time t • Datasets – Twitter, 1.2G tweets (crawled July 2012) – Wikipedia, 22M access logs – Twitter, 690K cashtags (crawled Nov 2013) – Social Networks, 69M edges – Synthetic, 10M keys 19The Power of Both Choices
Effect of Key Splitting The Power of Both Choices 20
Local Load Estimation The Power of Both Choices 21
Real deployment: Apache Storm The Power of Both Choices 22 0 200 400 600 800 1000 1200 1400 1600 0 0.2 0.4 0.6 0.8 1 Throughput(keys/s) (a) CPU delay (ms) PKG SG KG 1000 1100 1200 0.10 0 2.10 6 4.10 6 6.10 6 (b) Memory (keys) 10s 10s 30s 30s 60s 60s 300s 300s 600s 600s PKG SG KG
Conclusion • Partial Key Grouping (PKG) reduces the load imbalance by up to seven orders of magnitude compared to Key Grouping • PKG imposes constant memory and aggregation overhead, i.e., O(1), compared to Shuffle Grouping that is O(W) • Apache Storm – 60% improvement in throughput – 45% improvement in latency • PKG has been integrated in Apache Storm ver 0.10. 23The Power of Both Choices
Future Work • Load Balancing for Stateful Operators using key migration • Adaptive Load Balancing for highly skewed data • Load Balancing for graph processing systems The Power of Both Choices 24
The Power of Both Choices Practical Load Balancing for Distributed Stream Processing Engines Muhammad Anis Uddin Nasir, Gianmarco De Francisci Morales, David Garcia-Soriano Nicolas Kourtellis, Marco Serafini International Conference on Data Engineering (ICDE) 2015

The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines

  • 1.
    The Power ofBoth Choices Practical Load Balancing for Distributed Stream Processing Engines Muhammad Anis Uddin Nasir, Gianmarco De Francisci Morales, David Garcia-Soriano Nicolas Kourtellis, Marco Serafini International Conference on Data Engineering (ICDE 2015)
  • 2.
    Stream Processing Engines •Streaming Application – Online Machine Learning – Real Time Query Processing – Continuous Computation • Streaming Frameworks – Storm, Borealis, S4, Samza, Spark Streaming 2The Power of Both Choices
  • 3.
    Stream Processing Model •Streaming Applications are represented by Directed Acyclic Graphs (DAGs) Worker Worker Worker Source Source 3The Power of Both Choices Data Stream Operators Data Channels
  • 4.
    Stream Grouping • Keyor Fields Grouping – Hash-based assignment – Stateful operations, e.g., page rank, degree count • Shuffle Grouping – Round-robin assignment – Stateless operations, e.g., data logging, OLTP 4The Power of Both Choices
  • 5.
    Key Grouping 5The Powerof Both Choices • Key Grouping • Scalable ✔ • Low Memory ✔ • Load Imbalance ✖
  • 6.
    Shuffle Grouping 6The Powerof Both Choices • Shuffle Grouping • Load Balance ✔ • Memory O(W) ✖ • Aggregation O(W) ✖
  • 7.
    Problem Formulation • Inputis a unbounded sequence of messages from a key distribution • Each message is assigned to a worker for processing • Load balance properties – Memory Load Balance – Network Load Balance – Processing Load Balance • Metric: Load Imbalance The Power of Both Choices 7
  • 8.
    Power of twochoices • Balls-and-bins problem • Algorithm – For each ball, pick two bins uniformly at random – Assign the ball to least loaded of the two bins • Issues – Distributed ✖ – Consensus on Keys ✖ – Skewed distribution ✖ – Continuous Data✖ – Load Information ✖ 8The Power of Both Choices Img source: http://s17.postimg.org/qqctbpftr/Galton_prime_box.jpg
  • 9.
    Partial Key Grouping(PKG) • Key Splitting – Split each key into two server – Assign each instance using power of two choices • Local Load Estimation – each source estimates load on – using the local routing history 9The Power of Both Choices
  • 10.
    Partial Key Grouping(PKG) • Key Splitting • Local Load Estimation The Power of Both Choices 10 Source Source Worker Worker Worker 2 0 1 1 0 2 2 0 2
  • 11.
    Partial Key Grouping(PKG) • Key Splitting – Distributed – Stateless – Handle Skew • Local load estimation – No coordination among sources – No communication with workers 11The Power of Both Choices
  • 12.
    Partial Key Grouping ThePower of Both Choices 12 • PKG • Load Balance ✔ • Memory O(1) ✔ • Aggregation O(1) ✔
  • 13.
    Analysis: Chromatic Ballsand Bins • Problem Formulation – If messages are drawn from a key distribution where probabilities of keys are p1≥p2≥p3….. ≥pn – Each key has d choices out of n workers • Minimize the difference between maximum and average workload The Power of Both Choices 13
  • 14.
    Analysis • Necessary Condition:If pi represents the probability of occurrence of a key i • Bounds: The Power of Both Choices 14
  • 15.
    Streaming Applications • Mostalgorithms that use Shuffle Grouping can be expressed using Partial Key Grouping to reduce: – Memory footprint – Aggregation overhead • Algorithms that use Key Grouping can be rewritten to achieve load balance The Power of Both Choices 15
  • 16.
    Streaming Examples • NaïveBayes Classifier • Streaming Parallel Decision Trees • Heavy Hitters and Space Saving The Power of Both Choices 16
  • 17.
    Stream Grouping: Summary Grouping• Pros • Cons Key Grouping • Scalable • Memory • Load Imbalance Shuffle Grouping • Load Balance • Memory O(W) • Aggregation O(W) Partial Key Grouping • Scalable • Load Balance • Memory O(1) • Aggregation O(1) The Power of Both Choices 17
  • 18.
    Experiments • What isthe effect of key splitting on POTC? • How does local estimation compare to a global oracle? • How does PKG perform on a real deployment on Apache Storm? The Power of Both Choices 18
  • 19.
    Experimental Setup • Metric –the difference of maximum and the average load of the workers at time t • Datasets – Twitter, 1.2G tweets (crawled July 2012) – Wikipedia, 22M access logs – Twitter, 690K cashtags (crawled Nov 2013) – Social Networks, 69M edges – Synthetic, 10M keys 19The Power of Both Choices
  • 20.
    Effect of KeySplitting The Power of Both Choices 20
  • 21.
    Local Load Estimation ThePower of Both Choices 21
  • 22.
    Real deployment: ApacheStorm The Power of Both Choices 22 0 200 400 600 800 1000 1200 1400 1600 0 0.2 0.4 0.6 0.8 1 Throughput(keys/s) (a) CPU delay (ms) PKG SG KG 1000 1100 1200 0.10 0 2.10 6 4.10 6 6.10 6 (b) Memory (keys) 10s 10s 30s 30s 60s 60s 300s 300s 600s 600s PKG SG KG
  • 23.
    Conclusion • Partial KeyGrouping (PKG) reduces the load imbalance by up to seven orders of magnitude compared to Key Grouping • PKG imposes constant memory and aggregation overhead, i.e., O(1), compared to Shuffle Grouping that is O(W) • Apache Storm – 60% improvement in throughput – 45% improvement in latency • PKG has been integrated in Apache Storm ver 0.10. 23The Power of Both Choices
  • 24.
    Future Work • LoadBalancing for Stateful Operators using key migration • Adaptive Load Balancing for highly skewed data • Load Balancing for graph processing systems The Power of Both Choices 24
  • 25.
    The Power ofBoth Choices Practical Load Balancing for Distributed Stream Processing Engines Muhammad Anis Uddin Nasir, Gianmarco De Francisci Morales, David Garcia-Soriano Nicolas Kourtellis, Marco Serafini International Conference on Data Engineering (ICDE) 2015

Editor's Notes

  • #2 Hey everyone: I am Anis. A PhD student from KTH Royal Institute of Technology. The work I will present today is a result of my internship at Yahoo Labs Barcelona For this work, I was working with people at the top. There is Gianmarco, David, Nicolas and Marco In our work, we propose a very simple and practical approach for load balancing for Stream processing Engines
  • #3 Stream processing systems are specialized for low latency and high throughput processing for real time data Few common domains of streaming applications are online machine learning, real time query processing, continuous computation Due to the need of real time processing, various frameworks have been proposed in the last decade.
  • #4 Streaming Applications are represented as DAGs In a DAG a vertex are set of operators that are distributed across cluster, which apply various light weight transformation of the incoming stream. For example, filters, join, union, aggregrate are common stream transformation. Edges are data channels that are use to route the data from one operator to another In stream processing systems, there are various stream grouping strategies for routing the keys from one level of operators to the never level of operators In todays talk, I will be concentrating on load balancing between the group of operators at each level of DAG
  • #5 Key grouping uses hash based assignment. It applies a hash function on the incoming data and assign the worker to the message by taking the mod of the hash Key grouping is a good choice for stateful operators like aggregates Shuffle grouping is a simple round roibin assignment scheme. Shuffle grouping is a good choice for stateless operators
  • #6 To understand stream grouping strategies, lets take an example of word count Suppose we want to count the words in the tweets from twitter To implement twitter wordcount, we need to set of operators, A first level of operator to split the tweets into set of words. And the next level of operator to count the words As we know that many of the real workloads are highly skewed.
  • #8 Talk about different load balance schemes. Memory Network Processing In particular, we are interested in balancing the workload for processing. So total number of messages processing
  • #9 An elegant solution is using the power of 2 choices: In the past it has been introduced as a balls and bins problem: Given a ball (that means a key in our setup) and a set of bins (workers) pick 2 random bins, check which one has the least load of balls and send the new ball to that one. Surprisingly, this simple strategy leads to an imbalance that is independent of the balls thrown at the bins. So this looks like a good solution. However, when applied in DSPEs, it has some complications Better than hashing Power of choices takes two bins and put at random, and it is better load balance
  • #13 It uses twice at most memory than key grouping
  • #17 Motivate the people to read the paper
  • #19 We performed various experiments to assess the performance of our technique. We compare with pure PoTC to study the effect of key splitting We study how well local load estimation reaches a solution which is similar in balance as with when having global information We study how robust PKG is to shifting skew We implemented the PKG on Apache Storm to see how well it does on a real DSPE system.
  • #20 We measured imbalance in the system: The maximum load observed across workers – the average load across workers. Compared global, local, and a version that does probing of load on workers at regular intervals Used different real and synthetic data: tweets, wikipedia page access, etc.
  • #21 Here, we compare PKG with regular POTC, a greedy online and offline algorithm and hashing. We show average imbalance across time, given different number of workers, for Wikipedia and Twitter Online greedy picks the least loaded worker to handle a new key. Offline greedy first sorts the keys by frequency and then executes online greedy. Hashing just applies a hash function on the keys (so it’s the KG version) -> Hashing performs the worst. -> PKG performs very well and similar to the Greedy algorithms -> Adding workers increases imbalance. Hashing is the single choice paradigm
  • #22 This is the average imbalance across the system and time, for Twitter, Wikipedia, Cashtags and a synthetic lognormal distribution. We study how well the local load estimation compares with global information. It is always very close, regardless of how many sources we allow the system to have. Some different patterns for the various datasets, but the trend is the same. Hashing is worst. Hashing is just for reference
  • #23 These are some results from a real deployment on Storm. We tried to simulate different working time per key. Fore example, reading 400KB from memory is 0.1ms and 1/10th of disk seek is 1ms On the left plot, we don't have aggregation phase but apply different CPU load per key and measure the throughput supported. On the right plot, we keep constant the CPU load to 0.4msecs and vary how often we do aggregation. 0.4 is close to saturation to the system. The less frequent is the aggregation, the more the memory cost. We see that PKG can offer similar or better throughput than SG for the same or smaller memory cost.