Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions pkg/controller/clusterstate/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type NodeInfo struct {
Taints []v1.Taint

Tasks map[TaskID]*TaskInfo
//Node status

IsReady v1.ConditionStatus
}

func NewNodeInfo(node *v1.Node) *NodeInfo {
Expand All @@ -75,11 +78,12 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
Allocatable: EmptyResource(),
Capability: EmptyResource(),

Labels: make(map[string]string),
Labels: make(map[string]string),
Unschedulable: false,
Taints: []v1.Taint{},
Taints: []v1.Taint{},

Tasks: make(map[TaskID]*TaskInfo),
Tasks: make(map[TaskID]*TaskInfo),
IsReady: "True",
}
}

Expand All @@ -94,11 +98,12 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),

Labels: node.GetLabels(),
Labels: node.GetLabels(),
Unschedulable: node.Spec.Unschedulable,
Taints: node.Spec.Taints,
Taints: node.Spec.Taints,

Tasks: make(map[TaskID]*TaskInfo),
Tasks: make(map[TaskID]*TaskInfo),
IsReady: GetCondition(*node),
}
}

Expand Down Expand Up @@ -133,6 +138,7 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.Labels = NewStringsMap(node.Labels)
ni.Unschedulable = node.Spec.Unschedulable
ni.Taints = NewTaints(node.Spec.Taints)
ni.IsReady = node.Status.Conditions[0].Status
}

func (ni *NodeInfo) PipelineTask(task *TaskInfo) error {
Expand Down Expand Up @@ -179,7 +185,7 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
}

func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
klog.V(10).Infof("Attempting to remove task: %s on node: %s", ti.Name, ni.Name)
klog.V(10).Infof("Attempting to remove task: %s on node: %s", ti.Name, ni.Name)

key := PodKey(ti.Pod)

Expand All @@ -190,15 +196,15 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
}

if ni.Node != nil {
klog.V(10).Infof("Found node for task: %s, node: %s, task status: %v", task.Name, ni.Name, task.Status)
klog.V(10).Infof("Found node for task: %s, node: %s, task status: %v", task.Name, ni.Name, task.Status)
if task.Status == Releasing {
ni.Releasing.Sub(task.Resreq)
}

ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
} else {
klog.V(10).Infof("No node info found for task: %s, node: %s", task.Name, ni.Name)
klog.V(10).Infof("No node info found for task: %s, node: %s", task.Name, ni.Name)
}

delete(ni.Tasks, key)
Expand All @@ -207,7 +213,7 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
}

func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {
klog.V(10).Infof("Attempting to update task: %s on node: %s", ti.Name, ni.Name)
klog.V(10).Infof("Attempting to update task: %s on node: %s", ti.Name, ni.Name)
if err := ni.RemoveTask(ti); err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/clusterstate/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"reflect"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -70,6 +70,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
Releasing: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
IsReady: v1.ConditionTrue,
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01_pod1),
"c1/p2": NewTaskInfo(case01_pod2),
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
Releasing: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
IsReady: "True",
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01_pod1),
"c1/p3": NewTaskInfo(case01_pod3),
Expand Down
16 changes: 15 additions & 1 deletion pkg/controller/clusterstate/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"fmt"
"math"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

type Resource struct {
Expand Down Expand Up @@ -84,6 +84,20 @@ func NewResource(rl v1.ResourceList) *Resource {
return r
}

func GetCondition(nodeStatus v1.Node) v1.ConditionStatus {
var currentNodeStatus v1.ConditionStatus = v1.ConditionFalse
condition := 0
for condition < len(nodeStatus.Status.Conditions) {
// return status of Ready condition.type only
if nodeStatus.Status.Conditions[condition].Type == "Ready" {
currentNodeStatus = nodeStatus.Status.Conditions[condition].Status
return currentNodeStatus
}
condition = condition + 1
}
return currentNodeStatus
}

func (r *Resource) IsEmpty() bool {
return r.MilliCPU < minMilliCPU && r.Memory < minMemory && r.GPU == 0
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/clusterstate/api/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"fmt"
"reflect"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -78,6 +78,13 @@ func buildNode(name string, alloc v1.ResourceList) *v1.Node {
Status: v1.NodeStatus{
Capacity: alloc,
Allocatable: alloc,
Phase: "",
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}
}
Expand Down
30 changes: 16 additions & 14 deletions pkg/controller/clusterstate/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ package cache
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"sync"
"time"

"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -210,7 +211,6 @@ func (sc *ClusterStateCache) GetUnallocatedResources() *api.Resource {
return r.Add(sc.availableResources)
}


func (sc *ClusterStateCache) GetUnallocatedHistograms() map[string]*dto.Metric {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
Expand Down Expand Up @@ -238,7 +238,7 @@ func (sc *ClusterStateCache) GetResourceCapacities() *api.Resource {

// Save the cluster state.
func (sc *ClusterStateCache) saveState(available *api.Resource, capacity *api.Resource,
availableHistogram *api.ResourceHistogram) error {
availableHistogram *api.ResourceHistogram) error {
klog.V(12).Infof("Saving Cluster State")

sc.Mutex.Lock()
Expand All @@ -261,13 +261,15 @@ func (sc *ClusterStateCache) updateState() error {
idleMin := api.EmptyResource()
idleMax := api.EmptyResource()

firstNode := true
firstNode := true
for _, value := range cluster.Nodes {
// Do not use any Unschedulable nodes in calculations
if value.Unschedulable == true {
klog.V(6).Infof("[updateState] %s is marked as unschedulable node Total: %v, Used: %v, and Idle: %v will not be included in cluster state calculation.",
value.Name, value.Allocatable, value.Used, value.Idle)
continue
if !value.Unschedulable {
if value.IsReady == "true" {
klog.V(6).Infof("[updateState] %s is marked as unschedulable or not ready node Total: %v, Used: %v, and Idle: %v will not be included in cluster state calculation.",
value.Name, value.Allocatable, value.Used, value.Idle)
continue
}
}

total = total.Add(value.Allocatable)
Expand All @@ -277,12 +279,12 @@ func (sc *ClusterStateCache) updateState() error {
// Collect Min and Max for histogram
if firstNode {
idleMin.MilliCPU = idle.MilliCPU
idleMin.Memory = idle.Memory
idleMin.GPU = idle.GPU
idleMin.Memory = idle.Memory
idleMin.GPU = idle.GPU

idleMax.MilliCPU = idle.MilliCPU
idleMax.Memory = idle.Memory
idleMax.GPU = idle.GPU
idleMax.Memory = idle.Memory
idleMax.GPU = idle.GPU
firstNode = false
} else {
if value.Idle.MilliCPU < idleMin.MilliCPU {
Expand All @@ -293,7 +295,7 @@ func (sc *ClusterStateCache) updateState() error {

if value.Idle.Memory < idleMin.Memory {
idleMin.Memory = value.Idle.Memory
} else if value.Idle.Memory > idleMax.Memory{
} else if value.Idle.Memory > idleMax.Memory {
idleMax.Memory = value.Idle.Memory
}

Expand Down