Skip to content
16 changes: 16 additions & 0 deletions config/crd/bases/mcad.ibm.com_appwrappers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ spec:
The associated item's level .status.conditions[].type field is monitored for any one of these conditions. Once all items with this
option is set and the conditionstatus is met the entire appwrapper state will be changed to one of the valid appwrapper completion state. Note :- this is an AND
operation for all items where this option is set. See the list of appwrapper states for a list of valid complete states.
type: string
custompodresources:
description: Optional section that specifies resource requirements
for non-standard k8s resources, follows same format as
Expand Down Expand Up @@ -261,6 +262,16 @@ spec:
additionalProperties:
type: string
type: object
dispatchDuration:
description: Wall clock duration time of appwrapper
properties:
expected:
format: int32
type: integer
limit:
format: int32
type: integer
type: object
type: object
selector:
description: A label selector is a label query over a set of resources.
Expand Down Expand Up @@ -726,6 +737,11 @@ spec:
QueueJob (by Informer)
format: date-time
type: string
controllerfirstdispatchtimestamp:
description: Microsecond level timestamp when controller first sees
Appwrapper in Running state
format: date-time
type: string
failed:
description: The number of resources which reached phase Failed.
format: int32
Expand Down
10 changes: 10 additions & 0 deletions config/crd/bases/mcad.ibm.com_queuejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ spec:
properties:
minAvailable:
type: integer
dispatchDuration:
description: wall clock duration time of appwrapper
properties:
expected:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see expected used anywhere in the queuejob_controller_ex.go so should remove

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go struct has the field which will be used in multi-cluster scenario hence we have to keep this in the schema. this filed will always be ignored as it is optional for now

format: int32
type: integer
limit:
format: int32
type: integer
type: object
nodeSelector:
additionalProperties:
type: string
Expand Down
10 changes: 10 additions & 0 deletions config/crd/bases/mcad.ibm.com_schedulingspecs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ spec:
properties:
minAvailable:
type: integer
dispatchDuration:
description: wall clock duration time of appwrapper
properties:
expected:
format: int32
type: integer
limit:
format: int32
type: integer
type: object
nodeSelector:
additionalProperties:
type: string
Expand Down
43 changes: 42 additions & 1 deletion deployment/mcad-controller/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
../../pkg/apis/controller/v1beta1/appwrapper.goapiVersion: v1
kind: Service
metadata:
name: custom-metrics-apiserver
Expand Down Expand Up @@ -136,6 +136,18 @@ spec:
properties:
minAvailable:
type: integer
# dispatchdurationinseconds:
# type: integer
dispatchDuration:
description: wall clock duration time of appwrapper
properties:
expected:
format: int32
type: integer
limit:
format: int32
type: integer
type: object
nodeSelector:
additionalProperties:
type: string
Expand Down Expand Up @@ -190,10 +202,22 @@ spec:
properties:
minAvailable:
type: integer
# dispatchdurationinseconds:
# type: integer
nodeSelector:
additionalProperties:
type: string
type: object
dispatchDuration:
description: wall clock duration time of appwrapper
properties:
expected:
format: int32
type: integer
limit:
format: int32
type: integer
type: object
type: object
taskSpecs:
description: TaskSpecs specifies the task specification of QueueJob
Expand Down Expand Up @@ -7225,6 +7249,18 @@ spec:
properties:
minAvailable:
type: integer
# dispatchdurationinseconds:
# type: integer
dispatchDuration:
description: wall clock duration time of appwrapper
properties:
expected:
format: int32
type: integer
limit:
format: int32
type: integer
type: object
nodeSelector:
additionalProperties:
type: string
Expand Down Expand Up @@ -7694,6 +7730,11 @@ spec:
QueueJob (by Informer)
format: date-time
type: string
controllerfirstdispatchtimestamp:
description: Microsecond level timestamp when controller first sees
Appwrapper in Running state
format: date-time
type: string
failed:
description: The number of resources which reached phase Failed.
format: int32
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/controller/v1beta1/appwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ type AppWrapperStatus struct {
// Microsecond level timestamp when controller first sees QueueJob (by Informer)
ControllerFirstTimestamp metav1.MicroTime `json:"controllerfirsttimestamp,omitempty"`

// Microsecond level timestamp when controller first sets appwrapper in state Running
ControllerFirstDispatchTimestamp metav1.MicroTime `json:"controllerfirstdispatchtimestamp,omitempty"`

// Tell Informer to ignore this update message (do not generate a controller event)
FilterIgnore bool `json:"filterignore,omitempty"`

Expand Down
33 changes: 31 additions & 2 deletions pkg/apis/controller/v1beta1/schedulingspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,38 @@ type SchedulingSpec struct {
Spec SchedulingSpecTemplate `json:"spec,omitempty" protobuf:"bytes,1,rep,name=spec"`
}

type ClusterReference struct {
Name string `json:"name"`
}

type ClusterSchedulingSpec struct {
Clusters []ClusterReference `json:"clusters,omitempty"`
ClusterSelector *metav1.LabelSelector `json:"clusterSelector,omitempty"`
}

type ScheduleTimeSpec struct {
Min metav1.Time `json:"minTimestamp,omitempty"`
Desired metav1.Time `json:"desiredTimestamp,omitempty"`
Max metav1.Time `json:"maxTimestamp,omitempty"`
}

type DispatchDurationSpec struct {
Expected int `json:"expected,omitempty"`
Limit int `json:"limit,omitempty"`
Overrun bool `json:"overrun,omitempty"`
}

type DispatchingWindowSpec struct {
Start ScheduleTimeSpec `json:"start,omitempty"`
End ScheduleTimeSpec `json:"end,omitempty"`
}

type SchedulingSpecTemplate struct {
NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,1,rep,name=nodeSelector"`
MinAvailable int `json:"minAvailable,omitempty" protobuf:"bytes,2,rep,name=minAvailable"`
NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,1,rep,name=nodeSelector"`
MinAvailable int `json:"minAvailable,omitempty" protobuf:"bytes,2,rep,name=minAvailable"`
ClusterScheduling ClusterSchedulingSpec `json:"clusterScheduling,omitempty"`
DispatchingWindow DispatchingWindowSpec `json:"dispatchingWindow,omitempty"`
DispatchDuration DispatchDurationSpec `json:"dispatchDuration,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
52 changes: 49 additions & 3 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,34 @@ func (qjm *XController) PreemptQueueJobs() {
continue
}
newjob.Status.CanRun = false
//If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed.
if (aw.Status.State == arbv1.AppWrapperStateActive) && (aw.Spec.SchedSpec.DispatchDuration.Limit > 0) {

if (aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable) {
if aw.Spec.SchedSpec.DispatchDuration.Overrun {
message = fmt.Sprintf("Dispatch deadline exceeded. allowed to run for %v seconds", aw.Spec.SchedSpec.DispatchDuration.Limit)
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "DispatchDeadlineExceeded", message)
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
//should the AW state be set in this method??
newjob.Status.State = arbv1.AppWrapperStateFailed
newjob.Status.QueueJobState = arbv1.AppWrapperCondFailed
newjob.Status.Running = 0
updateNewJob = newjob.DeepCopy()
if err := qjm.updateEtcd(updateNewJob, "PreemptQueueJobs - CanRun: false"); err != nil {
klog.Errorf("Failed to update status of AppWrapper %v/%v: %v", aw.Namespace, aw.Name, err)
}

go qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob)

//Move to next AW
continue
}
}

if ((aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable)) && aw.Status.State == arbv1.AppWrapperStateActive {
message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded)
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message)
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
updateNewJob = newjob.DeepCopy()

//If pods failed scheduling generate new preempt condition
} else {
message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(aw.Status.PendingPodConditions), aw.Status.Running)
Expand All @@ -457,9 +478,13 @@ func (qjm *XController) PreemptQueueJobs() {
if err := qjm.updateEtcd(updateNewJob, "PreemptQueueJobs - CanRun: false"); err != nil {
klog.Errorf("Failed to update status of AppWrapper %v/%v: %v", aw.Namespace, aw.Name, err)
}

klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.",
aw.Name, aw.Namespace)
go qjm.backoff(aw, "PreemptionTriggered", string(message))
//Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateCompleted {
go qjm.backoff(aw, "PreemptionTriggered", string(message))
}

}
}
Expand Down Expand Up @@ -494,6 +519,22 @@ func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper

if !qjm.isDispatcher { // Agent Mode
for _, value := range queueJobs {

if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 {
awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit
dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second)
currentTime := time.Now()
hasDispatchTimeNotExceeded := currentTime.Before(dispatchDuration)

if !hasDispatchTimeNotExceeded {
klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration)
value.Spec.SchedSpec.DispatchDuration.Overrun = true
qjobs = append(qjobs, value)
//Got AW which exceeded dispatch runtime limit, move to next AW
continue
}
}

replicas := value.Spec.SchedSpec.MinAvailable

// Skip if AW Pending or just entering the system and does not have a state yet.
Expand Down Expand Up @@ -1472,6 +1513,7 @@ func (qjm *XController) UpdateQueueJobs() {
klog.Errorf("[UpdateQueueJobs] List of queueJobs err=%+v", err)
return
}

for _, newjob := range queueJobs {
// UpdateQueueJobs can be the first to see a new AppWrapper job, under heavy load
if newjob.Status.QueueJobState == "" {
Expand All @@ -1489,6 +1531,10 @@ func (qjm *XController) UpdateQueueJobs() {
klog.V(3).Infof("[UpdateQueueJobs] %s 0Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s",
newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob.CreationTimestamp, newjob.Status.ControllerFirstTimestamp)
}
//only set if appwrapper is running and dispatch time is not set previously
if newjob.Status.QueueJobState == "Running" && newjob.Status.ControllerFirstDispatchTimestamp.String() == "0001-01-01 00:00:00 +0000 UTC" {
newjob.Status.ControllerFirstDispatchTimestamp = firstTime
}
klog.V(10).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
if _, exists, _ := qjm.eventQueue.Get(newjob); exists {
Expand Down
9 changes: 2 additions & 7 deletions pkg/controller/queuejob/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ type SchedulingQueue interface {
Length() int
}



// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
func NewSchedulingQueue() SchedulingQueue {
return NewPriorityQueue()
return NewPriorityQueue()
}

// UnschedulablePods is an interface for a queue that is used to keep unschedulable
Expand Down Expand Up @@ -131,7 +129,7 @@ func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, _ := p.activeQ.Get(qj)
if (p.unschedulableQ.Get(qj)!= nil || exists) {
if p.unschedulableQ.Get(qj) != nil || exists {
return true
}
return false
Expand Down Expand Up @@ -167,9 +165,6 @@ func (p *PriorityQueue) MoveToActiveQueueIfExists(aw *qjobv1.AppWrapper) error {
return nil
}




// Add adds a QJ to the active queue. It should be called only when a new QJ
// is added so there is no chance the QJ is already in either queue.
func (p *PriorityQueue) Add(qj *qjobv1.AppWrapper) error {
Expand Down
25 changes: 25 additions & 0 deletions test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,31 @@ var _ = Describe("AppWrapper E2E Test", func() {

})

It("MCAD appwrapper timeout Test", func() {
fmt.Fprintf(os.Stdout, "[e2e] MCAD appwrapper timeout Test - Started.\n")
context := initTestContext()
var appwrappers []*arbv1.AppWrapper
appwrappersPtr := &appwrappers
defer cleanupTestObjectsPtr(context, appwrappersPtr)

aw := createGenericAWTimeoutWithStatus(context, "aw-test-jobtimeout-with-comp-1")
err1 := waitAWPodsReady(context, aw)
Expect(err1).NotTo(HaveOccurred())
time.Sleep(60 * time.Second)
aw1, err := context.karclient.ArbV1().AppWrappers(aw.Namespace).Get(aw.Name, metav1.GetOptions{})
if err != nil {
fmt.Fprintf(os.Stdout, "Error getting status")
}
pass := false
fmt.Fprintf(os.Stdout, "[e2e] status of AW %v.\n", aw1.Status.State)
if aw1.Status.State == arbv1.AppWrapperStateFailed {
pass = true
}
Expect(pass).To(BeTrue())
appwrappers = append(appwrappers, aw)
fmt.Fprintf(os.Stdout, "[e2e] MCAD appwrapper timeout Test - Completed.\n")
})

It("MCAD Job Completion No-requeue Test", func() {
fmt.Fprintf(os.Stdout, "[e2e] MCAD Job Completion No-requeue Test - Started.\n")
context := initTestContext()
Expand Down
Loading