Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
simply acc logic
  • Loading branch information
asm582 committed Jun 30, 2023
commit 25166ce1f8e216c9a6235fa5389d88ca78847130
69 changes: 12 additions & 57 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,8 +905,10 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
continue
} else if !value.Status.CanRun {
klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it can not run.", time.Now().String(), value.Name)
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
preemptable = preemptable.Add(totalResource)
continue
} else if value.Status.SystemPriority < targetpr {
} else if value.Status.SystemPriority < targetpr || !value.Status.CanRun {
// Dispatcher Mode: Ensure this job is part of the target cluster
if qjm.isDispatcher {
// Get the job key
Expand Down Expand Up @@ -938,69 +940,22 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
klog.V(10).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since priority %f is >= %f of requesting job: %s.", time.Now().String(),
value.Name, value.Status.SystemPriority, targetpr, requestingJob.Name)
continue
} else if value.Status.State == arbv1.AppWrapperStateEnqueued {
// Don't count the resources that can run but not yet realized (job orchestration pending or partially running).
} else if value.Status.CanRun && (targetpr < value.Status.SystemPriority) {
qjv := clusterstateapi.EmptyResource()
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
qjv = resctrl.GetAggregatedResources(value)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
qjv, _ = genericresource.GetResources(&genericItem)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
}

continue
} else if value.Status.State == arbv1.AppWrapperStateActive {
if value.Status.Pending > 0 {
//Don't count partially running jobs with pods still pending.
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
}

} else {
// TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs)
// This hack uses the golang struct implied behavior of defining the object without a value. In this case
// of using 'int32' novalue and value of 0 are the same.
if value.Status.Pending == 0 && value.Status.Running == 0 && value.Status.Succeeded == 0 && value.Status.Failed == 0 {

// In some cases the object wrapped in the appwrapper never creates pod. This likely happens
// in a custom resource that does some processing and errors occur before creating the pod or
// even there is not a problem within the CR controler but when the K8s quota is hit not
// allowing pods to get create due the admission controller. This check will now put a timeout
// on reserving these resources that are "in-flight")
dispatchedCond := qjm.getLatestStatusConditionType(value, arbv1.AppWrapperCondDispatched)

// If pod counts for AW have not updated within the timeout window, account for
// this object's resources to give the object controller more time to start creating
// pods. This matters when resources are scare. Once the timeout expires,
// resources for this object will not be held and other AW may be dispatched which
// could consume resources initially allocated for this object. This is to handle
// object controllers (essentially custom resource controllers) that do not work as
// expected by creating pods.
if qjm.waitForPodCountUpdates(dispatchedCond) {
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
}
} else {
klog.V(4).Infof("[getAggAvaiResPri] Resources will no longer be reserved for %s/%s due to timeout of %d ms for pod creating.", value.Name, value.Namespace, qjm.serverOption.DispatchResourceReservationTimeout)
}
}
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
pending, err = qjv.NonNegSub(totalResource)
if err != nil {
klog.Errorf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v", qjv)
pending = qjv
}
continue
} else {
Expand Down