WIFI SSID:SparkAISummit | Password: UnifiedAnalytics
Panagiotis Garefalakis (Imperial College London) Konstantinos Karanasos (Microsoft) Peter Pietzuch (Imperial College London) Cooperative Task Execution for Apache Spark #UnifiedAnalytics #SparkAISummit
Evolution of analytics 3#UnifiedAnalytics #SparkAISummit Batch frameworks 20142010 2018 Frameworks with hybrid stream/batch applicationsStream frameworks Unified stream/batch frameworks Structured Streaming
4#UnifiedAnalytics #SparkAISummit InferenceJob Low-latency responses Trained Model Historical data Real-time data TrainingJob Stream Batch Iterate Application Job Stages Unified application example
Stream/Batch = unified applications combining > latency-sensitive (stream) jobs with > latency-tolerant (batch) jobs as part of the same application 5#UnifiedAnalytics #SparkAISummit Unified applications Advantages > Sharing application logic & state > Result consistency > Sharing of computation
> Unified processing platform on top of Spark SQL fast, scalable, fault tolerant > Large ecosystem of data sources integrate with many storage systems > Rich, unified, high-level APIs deal with complex data and complex workloads 6#UnifiedAnalytics #SparkAISummit Structured Streaming API #UnifiedAnalytics val trainData = context.read("malicious−train−data") val pipeline = new Pipeline().setStages(Array( new OneHotEncoderEstimator(), new VectorAssembler(), new Classifier(/* select estimator */))) val pipelineModel = pipeline.fit(trainData) val streamingData = context .readStream("kafkaTopic") .groupBy("userId") .schema() /* input schema */ val streamRates = pipelineModel .transform(streamingData) streamRates.start() /* start streaming */ Batch Stream
Requirements > Latency: Execute inference job with minimum delay > Throughput: Batch jobs should not be compromised > Efficiency: Achieve high cluster resource utilization 7#UnifiedAnalytics #SparkAISummit Scheduling of Stream/Batch jobs Challenge: schedule stream/batch jobs to satisfy their diverse requirements
8#UnifiedAnalytics #SparkAISummit Stream/Batch application scheduling 2xT Inference (stream) Job 2xT 3T TTraining (batch) Job Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x Application Code Driver DAG Scheduler submitSpark Context run job
Time (T) 9#UnifiedAnalytics #SparkAISummit Stream/Batch scheduling 3T 3T 3T T T T T 4 3T executor1executor2 8 T T T Wasted resources Cores 2 6 Resources cannot be shared across jobs > Static allocation: dedicate resources to each job 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x
Time (T) 10#UnifiedAnalytics #SparkAISummit > FIFO: first job runs to completion 4 82 6 3T 3T 3T 3T T T T T T T Cores T Long batch jobs increase stream job latency 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x sharedexecutors Stream/Batch scheduling
Time (T) 11#UnifiedAnalytics #SparkAISummit > FAIR: weight share resources across jobs 4 82 6 Cores 3T 3T 3T 3T T T T T T T T queuing Better packing with non-optimal latency 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x sharedexecutors Stream/Batch scheduling
Time (T) 12#UnifiedAnalytics #SparkAISummit > KILL: avoid queueing by preempting batch tasks 4 82 6 Cores 3T 3T 3T 3T T T T T T T 3T T 3T Better latency at the expense of extra work 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x sharedexecutors Stream/Batch scheduling
3T 3T 13#UnifiedAnalytics #SparkAISummit > NEPTUNE: minimize queueing and wasted work 4 82 6 Cores 3T 3T T T T T T 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x Time (T) sharedexecutors 2T 2TT T Stream/Batch scheduling
> How to minimize queuing for latency-sensitive jobs and wasted work? Implement suspendable tasks > How to natively support stream/batch applications? Provide a unified execution framework > How to satisfy different stream/batch application requirements and high-level objectives? Introduces custom scheduling policies 14#UnifiedAnalytics #SparkAISummit Challenges
> How to minimize queuing for latency-sensitive jobs and wasted work? Implement suspendable tasks > How to natively support stream/batch applications? Provide a unified execution framework > How to satisfy different stream/batch application requirements and high-level objectives? Introduces custom scheduling policies 15#UnifiedAnalytics #SparkAISummit NEPTUNE Execution framework for Stream/Batch applications Support suspendable tasks Unified execution framework on top of Structure Streaming Introduce pluggable scheduling policies
16#UnifiedAnalytics #SparkAISummit Spark tasks > Tasks: apply a function to a partition of data > Subroutines that run in executor to completion > Preemption problem: > Loss of progress (kill) > Unpredictable preemption times (checkpointing) Executor Stack Task run Value Context Iterator Function State
17 #UnifiedAnalytics #SparkAISummit Suspendable tasks > Idea: use coroutines > Separate stacks to store task state > Yield points handing over control to executor > Cooperative preemption: > Suspend and resume in milliseconds > Work-preserving > Transparent to the user Function Context Iterator Coroutine Stack callyield Executor Stack Task run Value State Context https://github.com/storm-enroute/coroutines
18#UnifiedAnalytics #SparkAISummit val collect (TaskContext, Iterator[T]) => (Int, Array[T]) = { val result = new mutable.ArrayBuffer[T] while (itr.hasNext) { result.append(itr.next) } result.toArray } val collect (TaskContext, Iterator[T]) => (Int, Array[T]) = { coroutine {(context: TaskContext, itr: Iterator[T]) => { val result = new mutable.ArrayBuffer[T] while (itr.hasNext) { result.append(itr.next) if (context.isPaused()) yieldval(0) } result.toArray } } Subroutine Coroutine Suspendable tasks
19#UnifiedAnalytics #SparkAISummit Execution framework ExecutorExecutor DAG scheduler Task Scheduler Scheduling policy Executor Tasks Low-pri job High-pri job Running Paused suspend & run task App + job priorities LowHigh Tasks Incrementalizer Optimizer launch task
20#UnifiedAnalytics #SparkAISummit Scheduling Policies > Idea: policies trigger task suspension and resumption > Guarantee that stream tasks bypass batch tasks > Satisfy higher-level objectives i.e. balance cluster load > Avoid starvation by suspending up to a number of times > Load-balancing: equalize the number of tasks per node & reduce preemption > Cache-aware load-balancing: respect task locality preferences in addition to load-balancing
> Built as an extension to 2.4.0 (code to be open-sourced) > Ported all ResultTask, ShuffleMapTask functionality across programming interfaces to coroutines > Extended Spark’s DAG Scheduler to allow job stages with different requirements (priorities) 21#UnifiedAnalytics #SparkAISummit Implementation
22#UnifiedAnalytics #SparkAISummit Demo > Run a simple unified application with > A high-priority latency-sensitive job > A low-priority latency-tolerant job > Schedule them with default Spark and Neptune > Goal: show benefit of Neptune and ease of use
> Cluster – 75 nodes with 4 cores and 32 GB of memory each > Workloads – TPC-H decision support benchmark – Yahoo Streaming Benchmark: ad-analytics on a stream of ad impressions – LDA: ML training/inference application uncovering hidden topics from a group of documents 23#UnifiedAnalytics #SparkAISummit Azure deployment
24#UnifiedAnalytics #SparkAISummit DIFF-EXEC FIFO FAIR KILL NEP-CL NEP-LB PRI-ONLY 0 1 2 3 4 5 6 Streaminglatency(s) LB Neptune CLB Neptune IsolationKILLFAIRFIFOStatic allocation 37% 13% 61% 54% Benefit of NEPTUNE in stream latency NEPTUNE achieves latencies comparable to the ideal for the latency-sensitive jobs
25#UnifiedAnalytics #SparkAISummit Suspension mechanism effectiveness Q1 Q2 Q3 Q4 Q5 Q6 Q7 Q8 Q9 Q10 Q11 Q12 Q13 Q14 Q15 Q16 Q17 Q18 Q19 Q20 Q21 Q22 TCPH Scale Factor 10 0.01 0.1 1.0 10.0 100.0 1000.0 10000.0 ms-logscale Pause latency Resume latency Task runtime > TPCH: Task runtime distribution for each query ranges from 100s of milliseconds to 10s of seconds
26#UnifiedAnalytics #SparkAISummit Suspension mechanism effectiveness Q1 Q2 Q3 Q4 Q5 Q6 Q7 Q8 Q9 Q10 Q11 Q12 Q13 Q14 Q15 Q16 Q17 Q18 Q19 Q20 Q21 Q22 TCPH Scale Factor 10 0.01 0.1 1.0 10.0 100.0 1000.0 10000.0 ms-logscale Pause latency Resume latency Task runtime > TPCH: Continuously transition tasks from Paused to Resumed states until completion Effectively pause and resume with sub- millisecond latency
0% 20% 40% 60% 80% 100% Cores used for Streaming 0 2 4 6 Streaminglatency(s) 3.85 3.88 3.90 3.92 3.95 Batch(Mevents/s) 27#UnifiedAnalytics #SparkAISummit Impact of resource demands in performance 1.5% Past to future Efficiently share resources with low impact on throughput
28#UnifiedAnalytics #SparkAISummit Summary Neptune supports complex unified applications with diverse job requirements! > Suspendable tasks using coroutines > Pluggable scheduling policies > Continuous analytics Thank you! Questions? Panagiotis Garefalakis pgaref@imperial.ac.uk
DON’T FORGET TO RATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT

Cooperative Task Execution for Apache Spark

  • 1.
    WIFI SSID:SparkAISummit |Password: UnifiedAnalytics
  • 2.
    Panagiotis Garefalakis (ImperialCollege London) Konstantinos Karanasos (Microsoft) Peter Pietzuch (Imperial College London) Cooperative Task Execution for Apache Spark #UnifiedAnalytics #SparkAISummit
  • 3.
    Evolution of analytics 3#UnifiedAnalytics#SparkAISummit Batch frameworks 20142010 2018 Frameworks with hybrid stream/batch applicationsStream frameworks Unified stream/batch frameworks Structured Streaming
  • 4.
  • 5.
    Stream/Batch = unifiedapplications combining > latency-sensitive (stream) jobs with > latency-tolerant (batch) jobs as part of the same application 5#UnifiedAnalytics #SparkAISummit Unified applications Advantages > Sharing application logic & state > Result consistency > Sharing of computation
  • 6.
    > Unified processingplatform on top of Spark SQL fast, scalable, fault tolerant > Large ecosystem of data sources integrate with many storage systems > Rich, unified, high-level APIs deal with complex data and complex workloads 6#UnifiedAnalytics #SparkAISummit Structured Streaming API #UnifiedAnalytics val trainData = context.read("malicious−train−data") val pipeline = new Pipeline().setStages(Array( new OneHotEncoderEstimator(), new VectorAssembler(), new Classifier(/* select estimator */))) val pipelineModel = pipeline.fit(trainData) val streamingData = context .readStream("kafkaTopic") .groupBy("userId") .schema() /* input schema */ val streamRates = pipelineModel .transform(streamingData) streamRates.start() /* start streaming */ Batch Stream
  • 7.
    Requirements > Latency: Executeinference job with minimum delay > Throughput: Batch jobs should not be compromised > Efficiency: Achieve high cluster resource utilization 7#UnifiedAnalytics #SparkAISummit Scheduling of Stream/Batch jobs Challenge: schedule stream/batch jobs to satisfy their diverse requirements
  • 8.
    8#UnifiedAnalytics #SparkAISummit Stream/Batch applicationscheduling 2xT Inference (stream) Job 2xT 3T TTraining (batch) Job Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x Application Code Driver DAG Scheduler submitSpark Context run job
  • 9.
    Time (T) 9#UnifiedAnalytics #SparkAISummit Stream/Batchscheduling 3T 3T 3T T T T T 4 3T executor1executor2 8 T T T Wasted resources Cores 2 6 Resources cannot be shared across jobs > Static allocation: dedicate resources to each job 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x
  • 10.
    Time (T) 10#UnifiedAnalytics #SparkAISummit >FIFO: first job runs to completion 4 82 6 3T 3T 3T 3T T T T T T T Cores T Long batch jobs increase stream job latency 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x sharedexecutors Stream/Batch scheduling
  • 11.
    Time (T) 11#UnifiedAnalytics #SparkAISummit >FAIR: weight share resources across jobs 4 82 6 Cores 3T 3T 3T 3T T T T T T T T queuing Better packing with non-optimal latency 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x sharedexecutors Stream/Batch scheduling
  • 12.
    Time (T) 12#UnifiedAnalytics #SparkAISummit >KILL: avoid queueing by preempting batch tasks 4 82 6 Cores 3T 3T 3T 3T T T T T T T 3T T 3T Better latency at the expense of extra work 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x sharedexecutors Stream/Batch scheduling
  • 13.
    3T 3T 13#UnifiedAnalytics #SparkAISummit > NEPTUNE:minimize queueing and wasted work 4 82 6 Cores 3T 3T T T T T T 2xT 2xT 3T T Stage1 T Stage2 T 2x 2x 3T3T3T Stage1 TT Stage2 4x 3x Time (T) sharedexecutors 2T 2TT T Stream/Batch scheduling
  • 14.
    > How tominimize queuing for latency-sensitive jobs and wasted work? Implement suspendable tasks > How to natively support stream/batch applications? Provide a unified execution framework > How to satisfy different stream/batch application requirements and high-level objectives? Introduces custom scheduling policies 14#UnifiedAnalytics #SparkAISummit Challenges
  • 15.
    > How tominimize queuing for latency-sensitive jobs and wasted work? Implement suspendable tasks > How to natively support stream/batch applications? Provide a unified execution framework > How to satisfy different stream/batch application requirements and high-level objectives? Introduces custom scheduling policies 15#UnifiedAnalytics #SparkAISummit NEPTUNE Execution framework for Stream/Batch applications Support suspendable tasks Unified execution framework on top of Structure Streaming Introduce pluggable scheduling policies
  • 16.
    16#UnifiedAnalytics #SparkAISummit Spark tasks >Tasks: apply a function to a partition of data > Subroutines that run in executor to completion > Preemption problem: > Loss of progress (kill) > Unpredictable preemption times (checkpointing) Executor Stack Task run Value Context Iterator Function State
  • 17.
    17 #UnifiedAnalytics #SparkAISummit Suspendable tasks >Idea: use coroutines > Separate stacks to store task state > Yield points handing over control to executor > Cooperative preemption: > Suspend and resume in milliseconds > Work-preserving > Transparent to the user Function Context Iterator Coroutine Stack callyield Executor Stack Task run Value State Context https://github.com/storm-enroute/coroutines
  • 18.
    18#UnifiedAnalytics #SparkAISummit val collect(TaskContext, Iterator[T]) => (Int, Array[T]) = { val result = new mutable.ArrayBuffer[T] while (itr.hasNext) { result.append(itr.next) } result.toArray } val collect (TaskContext, Iterator[T]) => (Int, Array[T]) = { coroutine {(context: TaskContext, itr: Iterator[T]) => { val result = new mutable.ArrayBuffer[T] while (itr.hasNext) { result.append(itr.next) if (context.isPaused()) yieldval(0) } result.toArray } } Subroutine Coroutine Suspendable tasks
  • 19.
    19#UnifiedAnalytics #SparkAISummit Execution framework ExecutorExecutor DAGscheduler Task Scheduler Scheduling policy Executor Tasks Low-pri job High-pri job Running Paused suspend & run task App + job priorities LowHigh Tasks Incrementalizer Optimizer launch task
  • 20.
    20#UnifiedAnalytics #SparkAISummit Scheduling Policies >Idea: policies trigger task suspension and resumption > Guarantee that stream tasks bypass batch tasks > Satisfy higher-level objectives i.e. balance cluster load > Avoid starvation by suspending up to a number of times > Load-balancing: equalize the number of tasks per node & reduce preemption > Cache-aware load-balancing: respect task locality preferences in addition to load-balancing
  • 21.
    > Built asan extension to 2.4.0 (code to be open-sourced) > Ported all ResultTask, ShuffleMapTask functionality across programming interfaces to coroutines > Extended Spark’s DAG Scheduler to allow job stages with different requirements (priorities) 21#UnifiedAnalytics #SparkAISummit Implementation
  • 22.
    22#UnifiedAnalytics #SparkAISummit Demo > Runa simple unified application with > A high-priority latency-sensitive job > A low-priority latency-tolerant job > Schedule them with default Spark and Neptune > Goal: show benefit of Neptune and ease of use
  • 23.
    > Cluster – 75nodes with 4 cores and 32 GB of memory each > Workloads – TPC-H decision support benchmark – Yahoo Streaming Benchmark: ad-analytics on a stream of ad impressions – LDA: ML training/inference application uncovering hidden topics from a group of documents 23#UnifiedAnalytics #SparkAISummit Azure deployment
  • 24.
    24#UnifiedAnalytics #SparkAISummit DIFF-EXEC FIFOFAIR KILL NEP-CL NEP-LB PRI-ONLY 0 1 2 3 4 5 6 Streaminglatency(s) LB Neptune CLB Neptune IsolationKILLFAIRFIFOStatic allocation 37% 13% 61% 54% Benefit of NEPTUNE in stream latency NEPTUNE achieves latencies comparable to the ideal for the latency-sensitive jobs
  • 25.
    25#UnifiedAnalytics #SparkAISummit Suspension mechanismeffectiveness Q1 Q2 Q3 Q4 Q5 Q6 Q7 Q8 Q9 Q10 Q11 Q12 Q13 Q14 Q15 Q16 Q17 Q18 Q19 Q20 Q21 Q22 TCPH Scale Factor 10 0.01 0.1 1.0 10.0 100.0 1000.0 10000.0 ms-logscale Pause latency Resume latency Task runtime > TPCH: Task runtime distribution for each query ranges from 100s of milliseconds to 10s of seconds
  • 26.
    26#UnifiedAnalytics #SparkAISummit Suspension mechanismeffectiveness Q1 Q2 Q3 Q4 Q5 Q6 Q7 Q8 Q9 Q10 Q11 Q12 Q13 Q14 Q15 Q16 Q17 Q18 Q19 Q20 Q21 Q22 TCPH Scale Factor 10 0.01 0.1 1.0 10.0 100.0 1000.0 10000.0 ms-logscale Pause latency Resume latency Task runtime > TPCH: Continuously transition tasks from Paused to Resumed states until completion Effectively pause and resume with sub- millisecond latency
  • 27.
    0% 20% 40%60% 80% 100% Cores used for Streaming 0 2 4 6 Streaminglatency(s) 3.85 3.88 3.90 3.92 3.95 Batch(Mevents/s) 27#UnifiedAnalytics #SparkAISummit Impact of resource demands in performance 1.5% Past to future Efficiently share resources with low impact on throughput
  • 28.
    28#UnifiedAnalytics #SparkAISummit Summary Neptune supportscomplex unified applications with diverse job requirements! > Suspendable tasks using coroutines > Pluggable scheduling policies > Continuous analytics Thank you! Questions? Panagiotis Garefalakis pgaref@imperial.ac.uk
  • 29.
    DON’T FORGET TORATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT