Skip to content
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
BIN_DIR=_output/bin
RELEASE_VER=v1.16
RELEASE_VER=v1.17
CURRENT_DIR=$(shell pwd)
#MCAD_REGISTRY=$(shell docker ps --filter name=mcad-registry | grep -v NAME)
#LOCAL_HOST_NAME=localhost
Expand Down
8 changes: 4 additions & 4 deletions hack/run-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function kind-up-cluster {

# clean up
function cleanup {
echo "Cleaning up..."
echo "==========================>>>>> Cleaning up... <<<<<=========================="
echo " "

echo "Custom Resource Definitions..."
Expand All @@ -104,7 +104,7 @@ function cleanup {
kind delete cluster ${CLUSTER_CONTEXT}
}

function kube-batch-up {
function kube-test-env-up {
cd ${ROOT_DIR}

export KUBECONFIG="$(kind get kubeconfig-path ${CLUSTER_CONTEXT})"
Expand Down Expand Up @@ -165,9 +165,9 @@ trap cleanup EXIT

kind-up-cluster

kube-batch-up
kube-test-env-up

cd ${ROOT_DIR}

echo "Running E2E tests..."
echo "==========================>>>>> Running E2E tests... <<<<<=========================="
go test ./test/e2e -v -timeout 30m
2 changes: 1 addition & 1 deletion pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func GetPodTemplate(qjobRes *arbv1.AppWrapperResource) (*v1.PodTemplateSpec, err

template, ok := obj.(*v1.PodTemplate)
if !ok {
return nil, fmt.Errorf("Queuejob resource template not define a Pod")
return nil, fmt.Errorf("Resource template not define as a PodTemplate")
}

return &template.Template, nil
Expand Down
110 changes: 66 additions & 44 deletions pkg/controller/queuejobresources/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,21 @@ func (qjrPod *QueueJobResPod) manageQueueJob(qj *arbv1.AppWrapper, pods []*v1.Po
go func(ix int32) {
defer wait.Done()
newPod := qjrPod.createQueueJobPod(qj, ix, ar)
_, err := qjrPod.clients.Core().Pods(newPod.Namespace).Create(newPod)
if err != nil {
// Failed to create Pod, wait a moment and then create it again
// This is to ensure all pods under the same QueueJob created
// So gang-scheduling could schedule the QueueJob successfully
glog.Errorf("Failed to create pod %s for QueueJob %s, err %#v",
newPod.Name, qj.Name, err)

if newPod == nil {
err := fmt.Errorf("Job resource template item not define as a PodTemplate")
glog.Errorf("Failed to create a pod for Job %s, error: %#v.", qj.Name, err)
errs = append(errs, err)
} else {
_, err := qjrPod.clients.Core().Pods(newPod.Namespace).Create(newPod)
if err != nil {
// Failed to create Pod, wait a moment and then create it again
// This is to ensure all pods under the same QueueJob created
// So gang-scheduling could schedule the QueueJob successfully
glog.Errorf("Failed to create pod %s for QueueJob %s, err %#v",
newPod.Name, qj.Name, err)
errs = append(errs, err)
}
}
}(i)
}
Expand Down Expand Up @@ -399,18 +406,23 @@ func (qjrPod *QueueJobResPod) manageQueueJobPods(activePods []*v1.Pod, succeeded
go func(ix int32) {
defer wait.Done()
newPod := qjrPod.createQueueJobPod(qj, ix, ar)
//newPod := buildPod(fmt.Sprintf("%s-%d-%s", qj.Name, ix, generateUUID()), qj.Namespace, qj.Spec.Template, []metav1.OwnerReference{*metav1.NewControllerRef(qj, controllerKind)}, ix)
for {
_, err := qjrPod.clients.Core().Pods(newPod.Namespace).Create(newPod)
if err == nil {
// Create Pod successfully
break
} else {
// Failed to create Pod, wait a moment and then create it again
// This is to ensure all pods under the same QueueJob created
// So gang-scheduling could schedule the QueueJob successfully
glog.Warningf("Failed to create pod %s for QueueJob %s, err %#v, wait 2 seconds and re-create it", newPod.Name, qj.Name, err)
time.Sleep(2 * time.Second)
if newPod == nil {
err = fmt.Errorf("Job resource template item not define as a PodTemplate")
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
newPod.Name, qj.Name, err)
} else {
for {
_, err := qjrPod.clients.Core().Pods(newPod.Namespace).Create(newPod)
if err == nil {
// Create Pod successfully
break
} else {
// Failed to create Pod, wait a moment and then create it again
// This is to ensure all pods under the same QueueJob created
// So gang-scheduling could schedule the QueueJob successfully
glog.Warningf("Failed to create pod %s for Job %s, err %#v, wait 2 seconds and re-create it", newPod.Name, qj.Name, err)
time.Sleep(2 * time.Second)
}
}
}
}(i)
Expand Down Expand Up @@ -535,7 +547,7 @@ func (qjrPod *QueueJobResPod) GetPodTemplate(qjobRes *arbv1.AppWrapperResource)

template, ok := obj.(*v1.PodTemplate)
if !ok {
return nil, fmt.Errorf("Queuejob resource template not define a Pod")
return nil, fmt.Errorf("Job resource template item not define as a PodTemplate")
}

return &template.Template, nil
Expand All @@ -550,46 +562,56 @@ func (qjrPod *QueueJobResPod) GetAggregatedResources(job *arbv1.AppWrapper) *clu
//calculate scaling
for _, ar := range job.Spec.AggrResources.Items {
if ar.Type == arbv1.ResourceTypePod {
template, _ := qjrPod.GetPodTemplate(&ar)
replicas := ar.Replicas
myres := queuejobresources.GetPodResources(template)
myres.MilliCPU = float64(replicas) * myres.MilliCPU
myres.Memory = float64(replicas) * myres.Memory
myres.GPU = int64(replicas) * myres.GPU
total = total.Add(myres)
}
template, err := qjrPod.GetPodTemplate(&ar)
if err != nil {
glog.Errorf("Can not parse pod template in item: %+v error: %+v. Aggregated resources set to 0.", ar, err)
} else {
replicas := ar.Replicas
myres := queuejobresources.GetPodResources(template)

myres.MilliCPU = float64(replicas) * myres.MilliCPU
myres.Memory = float64(replicas) * myres.Memory
myres.GPU = int64(replicas) * myres.GPU
total = total.Add(myres)
}
}
}
}
return total
}

func (qjrPod *QueueJobResPod) GetAggregatedResourcesByPriority(priority int, job *arbv1.AppWrapper) *clusterstateapi.Resource {
total := clusterstateapi.EmptyResource()
if job.Spec.AggrResources.Items != nil {
//calculate scaling
for _, ar := range job.Spec.AggrResources.Items {
if ar.Priority < float64(priority) {
continue
}
if ar.Type == arbv1.ResourceTypePod {
template, _ := qjrPod.GetPodTemplate(&ar)
total = total.Add(queuejobresources.GetPodResources(template))
}
}
}
return total
total := clusterstateapi.EmptyResource()
if job.Spec.AggrResources.Items != nil {
//calculate scaling
for _, ar := range job.Spec.AggrResources.Items {
if ar.Priority < float64(priority) {
continue
}

if ar.Type == arbv1.ResourceTypePod {
template, err := qjrPod.GetPodTemplate(&ar)
if err != nil {
glog.Errorf("Cannot parse pod template in item: %+v error: %+v. Aggregated resources set to 0.", ar, err)
} else {
total = total.Add(queuejobresources.GetPodResources(template))
}
}
}
}
return total
}

func (qjrPod *QueueJobResPod) createQueueJobPod(qj *arbv1.AppWrapper, ix int32, qjobRes *arbv1.AppWrapperResource) *corev1.Pod {
templateCopy, err := qjrPod.GetPodTemplate(qjobRes)

if err != nil {
glog.Errorf("Cannot parse pod template for QJ")
glog.Errorf("Cannot parse PodTemplate in job: %+v, item: %+v error: %+v.", qj, qjobRes, err)
return nil
}
podName := fmt.Sprintf("%s-%d-%s", qj.Name, ix, generateUUID())

glog.Infof("I have template copy for the pod %+v", templateCopy)
glog.Infof("Template copy for the pod %+v", templateCopy)

tmpl := templateCopy.Labels

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/queuejobresources/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func GetPodResources(template *v1.PodTemplateSpec) *clusterstateapi.Resource {
glog.Errorf("Pod Spec not found in Pod Template: %+v. Aggregated resources set to 0.", template)
return total
}

for _, c := range template.Spec.Containers {
req.Add(clusterstateapi.NewResource(c.Resources.Requests))
limit.Add(clusterstateapi.NewResource(c.Resources.Limits))
Expand Down
29 changes: 28 additions & 1 deletion test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,34 @@ var _ = Describe("Predicates E2E Test", func() {

Expect(err).NotTo(HaveOccurred())
})
/*

//NOTE: Recommend this test not to be the last test in the test suite it may pass
// may pass the local test but may cause controller to fail which is not
// part of this test's validation.

It("Create AppWrapper - PodTemplate Only - 2 Pods", func() {
context := initTestContext()
defer cleanupTestContext(context)

aw := createBadPodTemplateAW(context,"aw-podtemplate-2")

err := waitAWReady(context, aw)

Expect(err).To(HaveOccurred())
})

It("Create AppWrapper - Bad PodTemplate", func() {
context := initTestContext()
defer cleanupTestContext(context)

aw := createPodTemplateAW(context,"aw-podtemplate-2")

err := waitAWReady(context, aw)

Expect(err).NotTo(HaveOccurred())
})

/*
It("Gang scheduling", func() {
context := initTestContext()
defer cleanupTestContext(context)
Expand Down
Loading