Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,47 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
return cc
}

func (qjm *XController) waitForPodCountUpdates(searchCond *arbv1.AppWrapperCondition) bool {

// Continue reserviing resourses if dispatched condition not found
if searchCond == nil {
klog.V(10).Infof("[waitForPodCountUpdates] No condition not found.")
return true
}

// Current time
now := metav1.NowMicro()
nowPtr := &now

// Last time AW was dispatched
dispactedTS := searchCond.LastUpdateMicroTime
dispactedTSPtr := &dispactedTS

// Error checking
if nowPtr.Before(dispactedTSPtr) {
klog.Errorf("[waitForPodCountUpdates] Current timestamp: %s is before condition latest update timestamp: %s",
now.String(), dispactedTS.String())
return true
}

// Duration since last time AW was dispatched
timeSinceDispatched := now.Sub(dispactedTS.Time)

// Convert timeout default from milli-seconds to microseconds
timeoutMicroSeconds := qjm.serverOption.DispatchResourceReservationTimeout * 1000

// Don't reserve resources if timeout is hit
if timeSinceDispatched.Microseconds() > timeoutMicroSeconds {
return false
klog.V(4).Infof("[waitForPodCountUpdates] Dispatch duration time %d microseconds has reached timeout value of %d microseconds",
timeSinceDispatched.Microseconds(), timeoutMicroSeconds)
}

klog.V(10).Infof("[waitForPodCountUpdates] Dispatch duration time %d microseconds has not reached timeout value of %d microseconds",
timeSinceDispatched.Microseconds(), timeoutMicroSeconds)
return true
}

// TODO: We can use informer to filter AWs that do not meet the minScheduling spec.
// we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer
func (qjm *XController) PreemptQueueJobs() {
Expand Down Expand Up @@ -397,8 +438,17 @@ func (qjm *XController) PreemptQueueJobs() {
continue
}
}
//MCAD should allow other controller enough time to spawn and initialize pods
//DISPATCH_RESOURCE_RESERVATION_TIMEOUT config option in MCAD will be used to wait
var canPreempt bool = false
//finds outs if we ran out of reservation time by comparing with AW dispatched condition time
for _, cond := range aw.Status.Conditions {
if cond.Type == arbv1.AppWrapperCondDispatched {
canPreempt = qjm.waitForPodCountUpdates(&cond)
}
}

if ((aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable)) && aw.Status.State == arbv1.AppWrapperStateActive {
if canPreempt && ((aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable)) && aw.Status.State == arbv1.AppWrapperStateActive {
index := getIndexOfMatchedCondition(aw, arbv1.AppWrapperCondPreemptCandidate, "MinPodsNotRunning")
if index < 0 {
message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded)
Expand Down