Cloud-Native Spark Scheduling with YuniKorn Scheduler
Li Gao Tech lead and engineer @ Databricks Compute Fabric Previous tech lead at data infrastructure @ Lyft Weiwei Yang Tech Lead @ Cloudera Compute Platform Apache Hadoop Committer & PMC member Previous tech lead at Real-time Compute Infra @ Alibaba
Agenda Li Gao Why Lyft is choosing Spark on K8s The need for custom k8s scheduler for Spark Weiwei Yang Spark Scheduling with YuniKorn Deep Dive into YuniKorn Features Community and Roadmap
Role of K8s in Lyft’s Data Landscape
Why Choose K8s for Spark ▪ Containerized spark compute to provide shared resources across different ML and ETL jobs ▪ Support for multiple Spark versions, Python versions, and version controlled containers on the shared K8s clusters for both faster iteration and stable production ▪ A single, unified infrastructure for both majority of our data compute and micro services with advanced, unified observability and resource isolation support ▪ Fine grained access controls on shared clusters
The Spark K8s Infra @ Lyft
Multi-step creation for a Spark K8s job Resource Labels Jobs Cluster Pool K8s Cluster Namespace Group Namespace Spark CRD Spark Pods DataLake
Problems of existing Spark K8s infrastructure ▪ Complexity of layers of custom K8s controllers to handle the scale of the spark jobs ▪ Tight coupling of controller layers makes latency issues amplified in certain cases ▪ Priority queues between jobs, clusters, and namespaces are managed by multiple layers of controllers to achieve desired performance
Why we need a customized K8s Scheduler ▪ High latency (~100 seconds) using the default scheduler is observed on a single K8s cluster for large volumes of batch workloads ▪ Large batch fair sharing in the same resource pool is unpredictable with the default scheduler ▪ Mix of FIFO and FAIR requirements on shared jobs clusters ▪ The need for an elastic and hierarchical priority management for jobs in K8s ▪ Richer and online user visibility into the scheduling behavior ▪ Simplified layers of controllers with custom K8s scheduler
Spark Scheduling with YuniKorn
Flavors of Running Spark on K8s Native Spark on K8s Spark K8s Operator Identify Spark jobs by the pod labels Identify Spark jobs by CRDs (e.g SparkApplication)
Resource Scheduling in K8s Scheduler workflow in human language: The scheduler picks up a pod each time, find the best fit node and then launch the pod on that node.
Spark on K8s: the scheduling challenges ▪ Job Scheduling Requirements ▪ Job ordering/queueing ▪ Job level priority ▪ Resource fairness (between jobs / queues) ▪ Gang scheduling ▪ Resource Sharing and Utilization Challenges ▪ Spark driver pods occupied all resources in a namespace ▪ Resource competition, deadlock between large jobs ▪ Misbehave jobs could abuse resources ▪ High throughput Ad-Hoc Queries Batch Jobs Workflow (DAG) Streaming The need of an unified architecture for both on-prem, cloud, multi-cloud and hybrid cloud K8s default scheduler was NOT created to tackle these challenges
Apache YuniKorn (Incubating) What is it: ▪ A standalone resource scheduler for K8s ▪ Focus on building scheduling capabilities to empower Big Data on K8s Simple to use: ▪ A stateless service on K8s ▪ A secondary K8s scheduler or replacement of the default scheduler
Resource Scheduling in YuniKorn (and compare w/ default scheduler) Apps API Server ETCD Resource Scheduler master Apps Nodes Queues Request Kubelet Filter Score Sort Extensions Queue Sort App Sort Node Sort Pluggable Policies YUNIKORN Default Scheduler 31 2 YuniKorn QUEUE, APP concepts are critical to provide advanced job scheduling and fine-grained resource management capabilities
Main difference (YuniKorn v.s Default Scheduler) Feature Default Scheduler YUNIKORN Note Scheduling at app dimension App is the 1st class citizen in YuniKorn, YuniKorn schedules apps with respect to, e,g their submission order, priority, resource usage, etc. Job ordering YuniKorn supports FIFO/FAIR/Priority (WIP) job ordering policies Fine-grained resource capacity management Manage cluster resources with hierarchy queues, queue provides the guaranteed resources (min) and the resource quota (max). Resource fairness Inter-queue resource fairness Natively support Big Data workloads The default scheduler is main for long-running services. YuniKorn is designed for Big Data app workloads, it natively supports Spark/Flink/Tensorflow, etc. Scale & Performance YuniKorn is optimized for performance, it is suitable for high throughput and large scale environments.
Run Spark with YuniKorn Submit a Spark job 1) Run spark-submit 2) Create SparkApplication CRD Spark Driver pod Pending Spark-job-001 Spark-job-001 Spark Driver Pod Spark-job-001 Spark-job-001 Spark Driver Pod Spark Executor Pod Api-server creates the driver pod Spark job is registered to YuniKorn in a leaf queue Sort queues -> sort apps -> select request -> select node Driver pod is started, it requests for Spark executor pods from api-server Api-server binds the pod to the assigned node Driver pod requests for executors, api-server creates executor pods Spark Driver pod Pending Spark Driver pod Bound Spark Driver pod Bound Spark Driver pod Bound Spark Executor pod Spark Executor pod Spark Executor pod Bound Job is Starting Spark driver is running Spark executors are created Spark job is running Spark-job-001 Spark Driver Pod Spark Executor Pod Spark Executor pod Spark Executor pod Spark Executor pod Pending New executors are added as pending requests Ask api-server to bind the pod to the node Schedule, and bind executors Pending
Deep Dive into YuniKorn Features/Performance
Job Ordering Why this matters? ▪ If I submit the job earlier, I want my job to run first ▪ I don’t want my job gets starved as resources are used by others ▪ I have a urgent job, let me run first! Per queue sorting policy ▪ FIFO - Order jobs by submission time ▪ FAIR - Order jobs by resource consumption ▪ Priority (WIP-0.9) - Order jobs by job-level prioritizes within the same queue
Resource Quota Management: K8s Namespace ResourceQuota K8s Namespace Resource Quota ▪ Defines resource limits ▪ Enforced by the quota admission-controller Problems ▪ Hard to control when resource quotas are overcommitted ▪ Users has no guaranteed resources ▪ Quota could be abused (e.g by pending pods) ▪ No queueing for jobs… ▪ Low utilization?! Namespace Resource Quota is suboptimal to support resource sharing between multi-tenants
Resource Quota Management: YuniKorn Queue Capacity YuniKorn Queue provides a optimal solution to manage resource quotas ▪ A queue can map to one (or more) namespaces automatically ▪ Capacity is elastic from min to max ▪ Honor resource fairness ▪ Quota is only counted for pods which actually consumes resources ▪ Enable Job queueing Namespace YuniKorn Queue CPU: 1 Memory: 1024Mi CPU: 2 Memory: 2048Mi CPU: 2 Memory: 2048Mi Queue Max CPU: 5 Memory: 5120Mi -> better resource sharing, ensure guarantee, enforce max -> zero config queue mgmt -> avoid starving jobs/users -> accurate resource counting, improve utilization -> jobs can be queued in the scheduler, keep client side logic simple
Resource Fairness in YuniKorn Queues Queue Guaranteed Resource (Mem) Requests (NumOfPods * Mem) root.default 500,000 1000 * 10 root.search 400,000 500 * 10 root.test 100,000 200 * 10 root.sandbox 100,000 200 * 50 Scheduling workloads with different requests in 4 queues with different guaranteed resources. Usage ratios of queues increased with similar trend
Scheduler Throughput Benchmark Schedule 50,000 pods on 2,000/4,000 nodes. Compare Scheduling throughput (Pods per second allocated by scheduler) Red line (YuniKorn) Green line (Default Scheduler) 617 vs 263 ↑ 134% 373 vs 141 ↑ 164% Detail report: https://github.com/apache/incubator-yunikorn-core/blob/master/docs/evaluate-perf-function-with-Kubemark.md 50k pods on 2k nodes 50k pods on 4k nodes
Fully K8s Compatible ▪ Support K8s Predicates ▪ Node selector ▪ Pod affinity/anti-affinity ▪ Taints and toleration ▪ … ▪ Support PersistentVolumeClaim and PersistentVolume ▪ Volume bindings ▪ Dynamical provisioning ▪ Publishes key scheduling events to K8s event system ▪ Work with cluster autoscaler ▪ Support management commands ▪ cordon nodes
YuniKorn Management Console
Compare YuniKorn with other K8s schedulers Scheduler Capabilities Resource Sharing Resource Fairness Preemption Gang Scheduling Bin Packing Throughput Hierarchy queues Queue elastic capacity Cross queue fairness User level fairness App level fairness Basic preemption With fairness K8s default scheduler x x x x x v x x v 260 allocs/s (2k nodes) Kube-batch x x v x v v x v v ? Likely slower than kube-default from [1] YuniKorn v v v v v v v v* YUNIKORN-2 v 610 allocs/s (2k nodes) [1] https://github.com/kubernetes-sigs/kube-batch/issues/930
Community, Summary and Next
Current Status ▪ Open source at July 17, 2019, Apache 2.0 License ▪ Enter Apache Incubator since Jan 21, 2020 ▪ Latest stable version 0.8.0 released on May 4, 2020 ▪ Diverse community with members from Alibaba, Cloudera, Microsoft, LinkedIn, Apple, Tencent, Nvidia and more…
The Community ▪ Deployed in non-production K8s clusters ▪ Launched 100s of large jobs per day on some of the YuniKorn queues ▪ Reduced our large job scheduler latency by factor of ~ 3x at peak time ▪ K8s cluster overall resource utilization efficiency (cost per compute) improved over the default kube-scheduler for mixed workloads ▪ FIFO and FAIR requests are more frequently met than before ▪ Shipping with Cloudera Public Cloud offerings ▪ Provide resource quota management and advanced job scheduling capabilities for Spark ▪ Responsible for both micro-service, and batch jobs scheduling ▪ Running on Cloud with auto-scaling enabled ▪ Deployed on pre-production on-prem cluster with ~100 nodes ▪ Plan to deploy on 1000+ nodes production K8s cluster this year ▪ Leverage YuniKorn features such as hiercharchy queues, resource fairness to run large scale Flink jobs on K8s ▪ Gained x4 scheduling performance improvements
Roadmap Current (0.8.0) ● Hirechay queues ● Cross queue fairness ● Fair/FIFO job ordering policies ● Fair/Bin-packing node sorting policies ● Self queue management ● Pluggable app discover ● Metrics system and Prometheus integration Upcoming (0.9.0) ● Gang Scheduling ● Job/task priority support (scheduling & preemption) ● Support Spark dynamic allocation 3rd quarter of 2020
Our Vision - Resource Mgmt for Big Data & ML Data Engineering, Realtime Streaming, Machine Learning Micro services, batch jobs, long running workloads, interactive sessions, model serving Multi-tenancy, SLA, Resource Utilization, Cost Mgmt, Budget Computes Types Targets Unified Compute Platform for Big Data & ML
Join us in the YuniKorn Community !! ▪ Project web site: http://yunikorn.apache.org/ ▪ Github repo: apache/incubator-yunikorn-core ▪ Mailing list: dev@yunikorn.apache.org ▪ Slack channel: ▪ Bi-weekly/Monthly sync up meetings for different time zones
Thank you!!

Cloud-Native Apache Spark Scheduling with YuniKorn Scheduler

  • 2.
    Cloud-Native Spark Schedulingwith YuniKorn Scheduler
  • 3.
    Li Gao Tech leadand engineer @ Databricks Compute Fabric Previous tech lead at data infrastructure @ Lyft Weiwei Yang Tech Lead @ Cloudera Compute Platform Apache Hadoop Committer & PMC member Previous tech lead at Real-time Compute Infra @ Alibaba
  • 4.
    Agenda Li Gao Why Lyftis choosing Spark on K8s The need for custom k8s scheduler for Spark Weiwei Yang Spark Scheduling with YuniKorn Deep Dive into YuniKorn Features Community and Roadmap
  • 5.
    Role of K8sin Lyft’s Data Landscape
  • 6.
    Why Choose K8sfor Spark ▪ Containerized spark compute to provide shared resources across different ML and ETL jobs ▪ Support for multiple Spark versions, Python versions, and version controlled containers on the shared K8s clusters for both faster iteration and stable production ▪ A single, unified infrastructure for both majority of our data compute and micro services with advanced, unified observability and resource isolation support ▪ Fine grained access controls on shared clusters
  • 7.
    The Spark K8sInfra @ Lyft
  • 8.
    Multi-step creation fora Spark K8s job Resource Labels Jobs Cluster Pool K8s Cluster Namespace Group Namespace Spark CRD Spark Pods DataLake
  • 9.
    Problems of existingSpark K8s infrastructure ▪ Complexity of layers of custom K8s controllers to handle the scale of the spark jobs ▪ Tight coupling of controller layers makes latency issues amplified in certain cases ▪ Priority queues between jobs, clusters, and namespaces are managed by multiple layers of controllers to achieve desired performance
  • 10.
    Why we needa customized K8s Scheduler ▪ High latency (~100 seconds) using the default scheduler is observed on a single K8s cluster for large volumes of batch workloads ▪ Large batch fair sharing in the same resource pool is unpredictable with the default scheduler ▪ Mix of FIFO and FAIR requirements on shared jobs clusters ▪ The need for an elastic and hierarchical priority management for jobs in K8s ▪ Richer and online user visibility into the scheduling behavior ▪ Simplified layers of controllers with custom K8s scheduler
  • 11.
  • 12.
    Flavors of RunningSpark on K8s Native Spark on K8s Spark K8s Operator Identify Spark jobs by the pod labels Identify Spark jobs by CRDs (e.g SparkApplication)
  • 13.
    Resource Scheduling inK8s Scheduler workflow in human language: The scheduler picks up a pod each time, find the best fit node and then launch the pod on that node.
  • 14.
    Spark on K8s:the scheduling challenges ▪ Job Scheduling Requirements ▪ Job ordering/queueing ▪ Job level priority ▪ Resource fairness (between jobs / queues) ▪ Gang scheduling ▪ Resource Sharing and Utilization Challenges ▪ Spark driver pods occupied all resources in a namespace ▪ Resource competition, deadlock between large jobs ▪ Misbehave jobs could abuse resources ▪ High throughput Ad-Hoc Queries Batch Jobs Workflow (DAG) Streaming The need of an unified architecture for both on-prem, cloud, multi-cloud and hybrid cloud K8s default scheduler was NOT created to tackle these challenges
  • 15.
    Apache YuniKorn (Incubating) Whatis it: ▪ A standalone resource scheduler for K8s ▪ Focus on building scheduling capabilities to empower Big Data on K8s Simple to use: ▪ A stateless service on K8s ▪ A secondary K8s scheduler or replacement of the default scheduler
  • 16.
    Resource Scheduling inYuniKorn (and compare w/ default scheduler) Apps API Server ETCD Resource Scheduler master Apps Nodes Queues Request Kubelet Filter Score Sort Extensions Queue Sort App Sort Node Sort Pluggable Policies YUNIKORN Default Scheduler 31 2 YuniKorn QUEUE, APP concepts are critical to provide advanced job scheduling and fine-grained resource management capabilities
  • 17.
    Main difference (YuniKornv.s Default Scheduler) Feature Default Scheduler YUNIKORN Note Scheduling at app dimension App is the 1st class citizen in YuniKorn, YuniKorn schedules apps with respect to, e,g their submission order, priority, resource usage, etc. Job ordering YuniKorn supports FIFO/FAIR/Priority (WIP) job ordering policies Fine-grained resource capacity management Manage cluster resources with hierarchy queues, queue provides the guaranteed resources (min) and the resource quota (max). Resource fairness Inter-queue resource fairness Natively support Big Data workloads The default scheduler is main for long-running services. YuniKorn is designed for Big Data app workloads, it natively supports Spark/Flink/Tensorflow, etc. Scale & Performance YuniKorn is optimized for performance, it is suitable for high throughput and large scale environments.
  • 18.
    Run Spark withYuniKorn Submit a Spark job 1) Run spark-submit 2) Create SparkApplication CRD Spark Driver pod Pending Spark-job-001 Spark-job-001 Spark Driver Pod Spark-job-001 Spark-job-001 Spark Driver Pod Spark Executor Pod Api-server creates the driver pod Spark job is registered to YuniKorn in a leaf queue Sort queues -> sort apps -> select request -> select node Driver pod is started, it requests for Spark executor pods from api-server Api-server binds the pod to the assigned node Driver pod requests for executors, api-server creates executor pods Spark Driver pod Pending Spark Driver pod Bound Spark Driver pod Bound Spark Driver pod Bound Spark Executor pod Spark Executor pod Spark Executor pod Bound Job is Starting Spark driver is running Spark executors are created Spark job is running Spark-job-001 Spark Driver Pod Spark Executor Pod Spark Executor pod Spark Executor pod Spark Executor pod Pending New executors are added as pending requests Ask api-server to bind the pod to the node Schedule, and bind executors Pending
  • 19.
    Deep Dive intoYuniKorn Features/Performance
  • 20.
    Job Ordering Why thismatters? ▪ If I submit the job earlier, I want my job to run first ▪ I don’t want my job gets starved as resources are used by others ▪ I have a urgent job, let me run first! Per queue sorting policy ▪ FIFO - Order jobs by submission time ▪ FAIR - Order jobs by resource consumption ▪ Priority (WIP-0.9) - Order jobs by job-level prioritizes within the same queue
  • 21.
    Resource Quota Management:K8s Namespace ResourceQuota K8s Namespace Resource Quota ▪ Defines resource limits ▪ Enforced by the quota admission-controller Problems ▪ Hard to control when resource quotas are overcommitted ▪ Users has no guaranteed resources ▪ Quota could be abused (e.g by pending pods) ▪ No queueing for jobs… ▪ Low utilization?! Namespace Resource Quota is suboptimal to support resource sharing between multi-tenants
  • 22.
    Resource Quota Management:YuniKorn Queue Capacity YuniKorn Queue provides a optimal solution to manage resource quotas ▪ A queue can map to one (or more) namespaces automatically ▪ Capacity is elastic from min to max ▪ Honor resource fairness ▪ Quota is only counted for pods which actually consumes resources ▪ Enable Job queueing Namespace YuniKorn Queue CPU: 1 Memory: 1024Mi CPU: 2 Memory: 2048Mi CPU: 2 Memory: 2048Mi Queue Max CPU: 5 Memory: 5120Mi -> better resource sharing, ensure guarantee, enforce max -> zero config queue mgmt -> avoid starving jobs/users -> accurate resource counting, improve utilization -> jobs can be queued in the scheduler, keep client side logic simple
  • 23.
    Resource Fairness inYuniKorn Queues Queue Guaranteed Resource (Mem) Requests (NumOfPods * Mem) root.default 500,000 1000 * 10 root.search 400,000 500 * 10 root.test 100,000 200 * 10 root.sandbox 100,000 200 * 50 Scheduling workloads with different requests in 4 queues with different guaranteed resources. Usage ratios of queues increased with similar trend
  • 24.
    Scheduler Throughput Benchmark Schedule50,000 pods on 2,000/4,000 nodes. Compare Scheduling throughput (Pods per second allocated by scheduler) Red line (YuniKorn) Green line (Default Scheduler) 617 vs 263 ↑ 134% 373 vs 141 ↑ 164% Detail report: https://github.com/apache/incubator-yunikorn-core/blob/master/docs/evaluate-perf-function-with-Kubemark.md 50k pods on 2k nodes 50k pods on 4k nodes
  • 25.
    Fully K8s Compatible ▪Support K8s Predicates ▪ Node selector ▪ Pod affinity/anti-affinity ▪ Taints and toleration ▪ … ▪ Support PersistentVolumeClaim and PersistentVolume ▪ Volume bindings ▪ Dynamical provisioning ▪ Publishes key scheduling events to K8s event system ▪ Work with cluster autoscaler ▪ Support management commands ▪ cordon nodes
  • 26.
  • 27.
    Compare YuniKorn withother K8s schedulers Scheduler Capabilities Resource Sharing Resource Fairness Preemption Gang Scheduling Bin Packing Throughput Hierarchy queues Queue elastic capacity Cross queue fairness User level fairness App level fairness Basic preemption With fairness K8s default scheduler x x x x x v x x v 260 allocs/s (2k nodes) Kube-batch x x v x v v x v v ? Likely slower than kube-default from [1] YuniKorn v v v v v v v v* YUNIKORN-2 v 610 allocs/s (2k nodes) [1] https://github.com/kubernetes-sigs/kube-batch/issues/930
  • 28.
  • 29.
    Current Status ▪ Opensource at July 17, 2019, Apache 2.0 License ▪ Enter Apache Incubator since Jan 21, 2020 ▪ Latest stable version 0.8.0 released on May 4, 2020 ▪ Diverse community with members from Alibaba, Cloudera, Microsoft, LinkedIn, Apple, Tencent, Nvidia and more…
  • 30.
    The Community ▪ Deployedin non-production K8s clusters ▪ Launched 100s of large jobs per day on some of the YuniKorn queues ▪ Reduced our large job scheduler latency by factor of ~ 3x at peak time ▪ K8s cluster overall resource utilization efficiency (cost per compute) improved over the default kube-scheduler for mixed workloads ▪ FIFO and FAIR requests are more frequently met than before ▪ Shipping with Cloudera Public Cloud offerings ▪ Provide resource quota management and advanced job scheduling capabilities for Spark ▪ Responsible for both micro-service, and batch jobs scheduling ▪ Running on Cloud with auto-scaling enabled ▪ Deployed on pre-production on-prem cluster with ~100 nodes ▪ Plan to deploy on 1000+ nodes production K8s cluster this year ▪ Leverage YuniKorn features such as hiercharchy queues, resource fairness to run large scale Flink jobs on K8s ▪ Gained x4 scheduling performance improvements
  • 31.
    Roadmap Current (0.8.0) ● Hirechayqueues ● Cross queue fairness ● Fair/FIFO job ordering policies ● Fair/Bin-packing node sorting policies ● Self queue management ● Pluggable app discover ● Metrics system and Prometheus integration Upcoming (0.9.0) ● Gang Scheduling ● Job/task priority support (scheduling & preemption) ● Support Spark dynamic allocation 3rd quarter of 2020
  • 32.
    Our Vision -Resource Mgmt for Big Data & ML Data Engineering, Realtime Streaming, Machine Learning Micro services, batch jobs, long running workloads, interactive sessions, model serving Multi-tenancy, SLA, Resource Utilization, Cost Mgmt, Budget Computes Types Targets Unified Compute Platform for Big Data & ML
  • 33.
    Join us inthe YuniKorn Community !! ▪ Project web site: http://yunikorn.apache.org/ ▪ Github repo: apache/incubator-yunikorn-core ▪ Mailing list: dev@yunikorn.apache.org ▪ Slack channel: ▪ Bi-weekly/Monthly sync up meetings for different time zones
  • 34.