@@ -302,14 +302,21 @@ func (qjrPod *QueueJobResPod) manageQueueJob(qj *arbv1.AppWrapper, pods []*v1.Po
302302go func (ix int32 ) {
303303defer wait .Done ()
304304newPod := qjrPod .createQueueJobPod (qj , ix , ar )
305- _ , err := qjrPod .clients .Core ().Pods (newPod .Namespace ).Create (newPod )
306- if err != nil {
307- // Failed to create Pod, wait a moment and then create it again
308- // This is to ensure all pods under the same QueueJob created
309- // So gang-scheduling could schedule the QueueJob successfully
310- glog .Errorf ("Failed to create pod %s for QueueJob %s, err %#v" ,
311- newPod .Name , qj .Name , err )
305+
306+ if newPod == nil {
307+ err := fmt .Errorf ("Job resource template item not define as a PodTemplate" )
308+ glog .Errorf ("Failed to create a pod for Job %s, error: %#v." , qj .Name , err )
312309errs = append (errs , err )
310+ } else {
311+ _ , err := qjrPod .clients .Core ().Pods (newPod .Namespace ).Create (newPod )
312+ if err != nil {
313+ // Failed to create Pod, wait a moment and then create it again
314+ // This is to ensure all pods under the same QueueJob created
315+ // So gang-scheduling could schedule the QueueJob successfully
316+ glog .Errorf ("Failed to create pod %s for QueueJob %s, err %#v" ,
317+ newPod .Name , qj .Name , err )
318+ errs = append (errs , err )
319+ }
313320}
314321}(i )
315322}
@@ -399,18 +406,23 @@ func (qjrPod *QueueJobResPod) manageQueueJobPods(activePods []*v1.Pod, succeeded
399406go func (ix int32 ) {
400407defer wait .Done ()
401408newPod := qjrPod .createQueueJobPod (qj , ix , ar )
402- //newPod := buildPod(fmt.Sprintf("%s-%d-%s", qj.Name, ix, generateUUID()), qj.Namespace, qj.Spec.Template, []metav1.OwnerReference{*metav1.NewControllerRef(qj, controllerKind)}, ix)
403- for {
404- _ , err := qjrPod .clients .Core ().Pods (newPod .Namespace ).Create (newPod )
405- if err == nil {
406- // Create Pod successfully
407- break
408- } else {
409- // Failed to create Pod, wait a moment and then create it again
410- // This is to ensure all pods under the same QueueJob created
411- // So gang-scheduling could schedule the QueueJob successfully
412- glog .Warningf ("Failed to create pod %s for QueueJob %s, err %#v, wait 2 seconds and re-create it" , newPod .Name , qj .Name , err )
413- time .Sleep (2 * time .Second )
409+ if newPod == nil {
410+ err = fmt .Errorf ("Job resource template item not define as a PodTemplate" )
411+ glog .Errorf ("Failed to create pod %s for Job %s, err %#v" ,
412+ newPod .Name , qj .Name , err )
413+ } else {
414+ for {
415+ _ , err := qjrPod .clients .Core ().Pods (newPod .Namespace ).Create (newPod )
416+ if err == nil {
417+ // Create Pod successfully
418+ break
419+ } else {
420+ // Failed to create Pod, wait a moment and then create it again
421+ // This is to ensure all pods under the same QueueJob created
422+ // So gang-scheduling could schedule the QueueJob successfully
423+ glog .Warningf ("Failed to create pod %s for Job %s, err %#v, wait 2 seconds and re-create it" , newPod .Name , qj .Name , err )
424+ time .Sleep (2 * time .Second )
425+ }
414426}
415427}
416428}(i )
@@ -535,7 +547,7 @@ func (qjrPod *QueueJobResPod) GetPodTemplate(qjobRes *arbv1.AppWrapperResource)
535547
536548template , ok := obj .(* v1.PodTemplate )
537549if ! ok {
538- return nil , fmt .Errorf ("Queuejob resource template not define a Pod " )
550+ return nil , fmt .Errorf ("Job resource template item not define as a PodTemplate " )
539551}
540552
541553return & template .Template , nil
@@ -550,46 +562,56 @@ func (qjrPod *QueueJobResPod) GetAggregatedResources(job *arbv1.AppWrapper) *clu
550562 //calculate scaling
551563 for _ , ar := range job .Spec .AggrResources .Items {
552564 if ar .Type == arbv1 .ResourceTypePod {
553- template , _ := qjrPod .GetPodTemplate (& ar )
554- replicas := ar .Replicas
555- myres := queuejobresources .GetPodResources (template )
556- myres .MilliCPU = float64 (replicas ) * myres .MilliCPU
557- myres .Memory = float64 (replicas ) * myres .Memory
558- myres .GPU = int64 (replicas ) * myres .GPU
559- total = total .Add (myres )
560- }
565+ template , err := qjrPod .GetPodTemplate (& ar )
566+ if err != nil {
567+ glog .Errorf ("Can not parse pod template in item: %+v error: %+v. Aggregated resources set to 0." , ar , err )
568+ } else {
569+ replicas := ar .Replicas
570+ myres := queuejobresources .GetPodResources (template )
571+
572+ myres .MilliCPU = float64 (replicas ) * myres .MilliCPU
573+ myres .Memory = float64 (replicas ) * myres .Memory
574+ myres .GPU = int64 (replicas ) * myres .GPU
575+ total = total .Add (myres )
576+ }
577+ }
561578 }
562579 }
563580 return total
564581}
565582
566583func (qjrPod * QueueJobResPod ) GetAggregatedResourcesByPriority (priority int , job * arbv1.AppWrapper ) * clusterstateapi.Resource {
567- total := clusterstateapi .EmptyResource ()
568- if job .Spec .AggrResources .Items != nil {
569- //calculate scaling
570- for _ , ar := range job .Spec .AggrResources .Items {
571- if ar .Priority < float64 (priority ) {
572- continue
573- }
574- if ar .Type == arbv1 .ResourceTypePod {
575- template , _ := qjrPod .GetPodTemplate (& ar )
576- total = total .Add (queuejobresources .GetPodResources (template ))
577- }
578- }
579- }
580- return total
584+ total := clusterstateapi .EmptyResource ()
585+ if job .Spec .AggrResources .Items != nil {
586+ //calculate scaling
587+ for _ , ar := range job .Spec .AggrResources .Items {
588+ if ar .Priority < float64 (priority ) {
589+ continue
590+ }
591+
592+ if ar .Type == arbv1 .ResourceTypePod {
593+ template , err := qjrPod .GetPodTemplate (& ar )
594+ if err != nil {
595+ glog .Errorf ("Cannot parse pod template in item: %+v error: %+v. Aggregated resources set to 0." , ar , err )
596+ } else {
597+ total = total .Add (queuejobresources .GetPodResources (template ))
598+ }
599+ }
600+ }
601+ }
602+ return total
581603}
582604
583605func (qjrPod * QueueJobResPod ) createQueueJobPod (qj * arbv1.AppWrapper , ix int32 , qjobRes * arbv1.AppWrapperResource ) * corev1.Pod {
584606templateCopy , err := qjrPod .GetPodTemplate (qjobRes )
585607
586608if err != nil {
587- glog .Errorf ("Cannot parse pod template for QJ" )
609+ glog .Errorf ("Cannot parse PodTemplate in job: %+v, item: %+v error: %+v." , qj , qjobRes , err )
588610return nil
589611}
590612podName := fmt .Sprintf ("%s-%d-%s" , qj .Name , ix , generateUUID ())
591613
592- glog .Infof ("I have template copy for the pod %+v" , templateCopy )
614+ glog .Infof ("Template copy for the pod %+v" , templateCopy )
593615
594616tmpl := templateCopy .Labels
595617
0 commit comments