Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTROLLER_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.29.46
1.29.47
11 changes: 9 additions & 2 deletions pkg/controller/clusterstate/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"github.com/IBM/multi-cluster-app-dispatcher/pkg/apis/controller/utils"
arbv1 "github.com/IBM/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
Expand Down Expand Up @@ -224,10 +225,16 @@ func (ps *JobInfo) deleteTaskIndex(ti *TaskInfo) {

func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) error {
if task, found := ps.Tasks[pi.UID]; found {
ps.TotalRequest.Sub(task.Resreq)
_, err := ps.TotalRequest.Sub(task.Resreq)
if err != nil {
klog.Warningf("[DeleteTaskInfo] Total requested subtraction err=%v", err)
}

if AllocatedStatus(task.Status) {
ps.Allocated.Sub(task.Resreq)
_, err := ps.Allocated.Sub(task.Resreq)
if err != nil {
klog.Warningf("[DeleteTaskInfo] Allocated subtraction err=%v", err)
}
}

delete(ps.Tasks, task.UID)
Expand Down
28 changes: 23 additions & 5 deletions pkg/controller/clusterstate/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.Releasing.Add(task.Resreq)
}

ni.Idle.Sub(task.Resreq)
_, err := ni.Idle.Sub(task.Resreq)
if err != nil {
klog.Warningf("[SetNode] Node idle amount subtraction err=%v", err)
}

ni.Used.Add(task.Resreq)
}
}
Expand All @@ -145,7 +149,11 @@ func (ni *NodeInfo) PipelineTask(task *TaskInfo) error {
ti := task.Clone()

if ni.Node != nil {
ni.Releasing.Sub(ti.Resreq)
_, err := ni.Releasing.Sub(ti.Resreq)
if err != nil {
klog.Warningf("[PipelineTask] Node release subtraction err=%v", err)
}

ni.Used.Add(ti.Resreq)
}

Expand All @@ -169,7 +177,11 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
if ti.Status == Releasing {
ni.Releasing.Add(ti.Resreq)
}
ni.Idle.Sub(ti.Resreq)
_, err := ni.Idle.Sub(ti.Resreq)
if err != nil {
klog.Warningf("[AddTask] Idle resource subtract err=%v", err)
}

ni.Used.Add(ti.Resreq)
}

Expand All @@ -192,11 +204,17 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
if ni.Node != nil {
klog.V(10).Infof("Found node for task: %s, node: %s, task status: %v", task.Name, ni.Name, task.Status)
if task.Status == Releasing {
ni.Releasing.Sub(task.Resreq)
_, err := ni.Releasing.Sub(task.Resreq)
if err != nil {
klog.Warningf("[RemoveTask] Node release subtraction err=%v", err)
}
}

ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
_, err := ni.Used.Sub(task.Resreq)
if err != nil {
klog.Warningf("[RemoveTask] Node usage subtraction err=%v", err)
}
} else {
klog.V(10).Infof("No node info found for task: %s, node: %s", task.Name, ni.Name)
}
Expand Down
56 changes: 33 additions & 23 deletions pkg/controller/clusterstate/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"fmt"
"math"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

type Resource struct {
Expand Down Expand Up @@ -88,16 +88,17 @@ func (r *Resource) IsEmpty() bool {
return r.MilliCPU < minMilliCPU && r.Memory < minMemory && r.GPU == 0
}

func (r *Resource) IsZero(rn v1.ResourceName) bool {
func (r *Resource) IsZero(rn v1.ResourceName) (bool, error) {
switch rn {
case v1.ResourceCPU:
return r.MilliCPU < minMilliCPU
return r.MilliCPU < minMilliCPU, nil
case v1.ResourceMemory:
return r.Memory < minMemory
return r.Memory < minMemory, nil
case GPUResourceName:
return r.GPU == 0
return r.GPU == 0, nil
default:
panic("unknown resource")
e := fmt.Errorf("unknown resource %v", rn)
return false, e
}
}

Expand All @@ -116,38 +117,46 @@ func (r *Resource) Replace(rr *Resource) *Resource {
}

//Sub subtracts two Resource objects.
func (r *Resource) Sub(rr *Resource) *Resource {
if rr.LessEqual(r) {
r.MilliCPU -= rr.MilliCPU
r.Memory -= rr.Memory
r.GPU -= rr.GPU
return r
}

panic(fmt.Errorf("Resource is not sufficient to do operation: <%v> sub <%v>",
r, rr))
func (r *Resource) Sub(rr *Resource) (*Resource, error) {
return r.NonNegSub(rr)
}

//Sub subtracts two Resource objects and return zero for negative subtractions.
func (r *Resource) NonNegSub(rr *Resource) *Resource {
func (r *Resource) NonNegSub(rr *Resource) (*Resource, error) {
// Check for negative calculation
var isNegative bool
var err error = nil
var rCopy *Resource = nil
if r.MilliCPU < rr.MilliCPU {
r.MilliCPU = 0
isNegative = true
rCopy = r.Clone()
} else {
r.MilliCPU -= rr.MilliCPU
}
if r.Memory < rr.Memory {
r.Memory = 0
isNegative = true
if rCopy == nil {
rCopy = r.Clone()
}
} else {
r.Memory -= rr.Memory
}

if r.GPU < rr.GPU {
r.GPU = 0
isNegative = true
if rCopy == nil {
rCopy = r.Clone()
}
} else {
r.GPU -= rr.GPU
}
return r
if isNegative {
err = fmt.Errorf("resource subtraction resulted in negative value, total resource: %v, subtracting resource: %v", rCopy, rr)
}
return r, err
}

func (r *Resource) Less(rr *Resource) bool {
Expand All @@ -165,16 +174,17 @@ func (r *Resource) String() string {
r.MilliCPU, r.Memory, r.GPU)
}

func (r *Resource) Get(rn v1.ResourceName) float64 {
func (r *Resource) Get(rn v1.ResourceName) (float64, error) {
switch rn {
case v1.ResourceCPU:
return r.MilliCPU
return r.MilliCPU, nil
case v1.ResourceMemory:
return r.Memory
return r.Memory, nil
case GPUResourceName:
return float64(r.GPU)
return float64(r.GPU), nil
default:
panic("not support resource.")
err := fmt.Errorf("resource not supported %v", rn)
return 0.0, err
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust

klog.V(6).Infof("[getAggAvaiResPri] Schedulable idle cluster resources: %+v, subtracting dispatched resources: %+v and adding preemptable cluster resources: %+v", r, pending, preemptable)
r = r.Add(preemptable)
r = r.NonNegSub(pending)
r, _ = r.NonNegSub(pending)

klog.V(3).Infof("[getAggAvaiResPri] %+v available resources to schedule", r)
return r, proposedPremptions
Expand Down
26 changes: 26 additions & 0 deletions test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,32 @@ var _ = Describe("AppWrapper E2E Test", func() {

})

It("MCAD Job Large Compute Requirement Test", func() {
fmt.Fprintf(os.Stdout, "[e2e] MCAD Job Large Compute Requirement Test - Started.\n")
context := initTestContext()
var appwrappers []*arbv1.AppWrapper
appwrappersPtr := &appwrappers
defer cleanupTestObjectsPtr(context, appwrappersPtr)

aw := createGenericJobAWtWithLargeCompute(context, "aw-test-job-with-large-comp-1")
err1 := waitAWPodsReady(context, aw)
Expect(err1).NotTo(HaveOccurred())
time.Sleep(1 * time.Minute)
aw1, err := context.karclient.ArbV1().AppWrappers(aw.Namespace).Get(aw.Name, metav1.GetOptions{})
if err != nil {
fmt.Fprintf(os.Stdout, "Error getting status")
}
pass := false
fmt.Fprintf(os.Stdout, "[e2e] status of AW %v.\n", aw1.Status.State)
if aw1.Status.State == arbv1.AppWrapperStateEnqueued {
pass = true
}
Expect(pass).To(BeTrue())
appwrappers = append(appwrappers, aw)
fmt.Fprintf(os.Stdout, "[e2e] MCAD Job Large Compute Requirement Test - Completed.\n")

})

It("MCAD CPU Accounting Queuing Test", func() {
fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Accounting Queuing Test - Started.\n")
context := initTestContext()
Expand Down
97 changes: 93 additions & 4 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,91 @@ func createGenericJobAWWithStatus(context *context, name string) *arbv1.AppWrapp
return appwrapper
}

func createGenericJobAWtWithLargeCompute(context *context, name string) *arbv1.AppWrapper {
rb := []byte(`{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "aw-test-job-with-large-comp-1",
"namespace": "test"
},
"spec": {
"completions": 1,
"parallelism": 1,
"template": {
"metadata": {
"labels": {
"appwrapper.mcad.ibm.com": "aw-test-job-with-large-comp-1"
}
},
"spec": {
"containers": [
{
"args": [
"sleep 5"
],
"command": [
"/bin/bash",
"-c",
"--"
],
"image": "ubuntu:latest",
"imagePullPolicy": "IfNotPresent",
"name": "aw-test-job-with-comp-1",
"resources": {
"limits": {
"cpu": "10000m",
"memory": "256M",
"nvidia.com/gpu": "100"
},
"requests": {
"cpu": "100000m",
"memory": "256M",
"nvidia.com/gpu": "100"
}
}
}
],
"restartPolicy": "Never"
}
}
}
}`)
//var schedSpecMin int = 1

aw := &arbv1.AppWrapper{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "test",
},
Spec: arbv1.AppWrapperSpec{
SchedSpec: arbv1.SchedulingSpecTemplate{
//MinAvailable: schedSpecMin,
},
AggrResources: arbv1.AppWrapperResourceList{
GenericItems: []arbv1.AppWrapperGenericResource{
{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", name, "aw-test-job-with-large-comp-1"),
Namespace: "test",
},
DesiredAvailable: 1,
GenericTemplate: runtime.RawExtension{
Raw: rb,
},
//CompletionStatus: "Complete",
},
},
},
},
}

appwrapper, err := context.karclient.ArbV1().AppWrappers(context.namespace).Create(aw)
Expect(err).NotTo(HaveOccurred())

return appwrapper
}

func createGenericServiceAWWithNoStatus(context *context, name string) *arbv1.AppWrapper {
rb := []byte(`{
"apiVersion": "v1",
Expand Down Expand Up @@ -2965,11 +3050,13 @@ func clusterSize(ctx *context, req v1.ResourceList) int32 {

// Removed used resources.
if res, found := used[node.Name]; found {
alloc.Sub(res)
_, err := alloc.Sub(res)
Expect(err).NotTo(HaveOccurred())
}

for slot.LessEqual(alloc) {
alloc.Sub(slot)
_, err := alloc.Sub(slot)
Expect(err).NotTo(HaveOccurred())
res++
}
}
Expand Down Expand Up @@ -3033,11 +3120,13 @@ func computeNode(ctx *context, req v1.ResourceList) (string, int32) {

// Removed used resources.
if res, found := used[node.Name]; found {
alloc.Sub(res)
_, err := alloc.Sub(res)
Expect(err).NotTo(HaveOccurred())
}

for slot.LessEqual(alloc) {
alloc.Sub(slot)
_, err := alloc.Sub(slot)
Expect(err).NotTo(HaveOccurred())
res++
}

Expand Down