Skip to content

Commit d728580

Browse files
authored
Merge pull request #217 from dmatch01/quota-management-fix-generic-items-cleanup
Fix to generic items cleanup. Merging per request from @asm582.
2 parents f9ef292 + b6337c5 commit d728580

File tree

5 files changed

+211
-23
lines changed

5 files changed

+211
-23
lines changed

CONTROLLER_VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.29.42
1+
1.29.43

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -419,21 +419,25 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
419419

420420
func (qjm *XController) PreemptQueueJobs() {
421421
qjobs := qjm.GetQueueJobsEligibleForPreemption()
422-
for _, q := range qjobs {
423-
newjob, e := qjm.queueJobLister.AppWrappers(q.Namespace).Get(q.Name)
422+
for _, aw := range qjobs {
423+
newjob, e := qjm.queueJobLister.AppWrappers(aw.Namespace).Get(aw.Name)
424424
if e != nil {
425425
continue
426426
}
427427
newjob.Status.CanRun = false
428428

429-
message := fmt.Sprintf("Insufficient number of Running pods, minimum=%d, running=%v.", q.Spec.SchedSpec.MinAvailable, q.Status.Running)
429+
message := fmt.Sprintf("Insufficient number of Running pods, minimum=%d, running=%v.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running)
430430
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message)
431431
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
432432

433433
if err := qjm.updateEtcd(newjob, "PreemptQueueJobs - CanRun: false"); err != nil {
434434
klog.Errorf("Failed to update status of AppWrapper %v/%v: %v",
435-
q.Namespace, q.Name, err)
435+
aw.Namespace, aw.Name, err)
436436
}
437+
klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.",
438+
aw.Name, aw.Namespace)
439+
go qjm.backoff(aw, "PreemptionTriggered", message)
440+
437441
}
438442
}
439443
func (qjm *XController) preemptAWJobs(preemptAWs []*arbv1.AppWrapper) {
@@ -1862,37 +1866,51 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
18621866
}
18631867

18641868
//Cleanup function
1865-
func (cc *XController) Cleanup(queuejob *arbv1.AppWrapper) error {
1866-
klog.V(3).Infof("[Cleanup] begin AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status)
1869+
func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
1870+
klog.V(3).Infof("[Cleanup] begin AppWrapper %s Version=%s Status=%+v\n", appwrapper.Name, appwrapper.ResourceVersion, appwrapper.Status)
18671871

18681872
if !cc.isDispatcher {
1869-
if queuejob.Spec.AggrResources.Items != nil {
1873+
if appwrapper.Spec.AggrResources.Items != nil {
18701874
// we call clean-up for each controller
1871-
for _, ar := range queuejob.Spec.AggrResources.Items {
1872-
cc.qjobResControls[ar.Type].Cleanup(queuejob, &ar)
1875+
for _, ar := range appwrapper.Spec.AggrResources.Items {
1876+
err00 := cc.qjobResControls[ar.Type].Cleanup(appwrapper, &ar)
1877+
if err00 != nil {
1878+
klog.Errorf("[Cleanup] Error deleting item %s from job=%s Status=%+v err=%+v.",
1879+
ar.Type, appwrapper.Name, appwrapper.Status, err00)
1880+
}
1881+
}
1882+
}
1883+
if appwrapper.Spec.AggrResources.GenericItems != nil {
1884+
for _, ar := range appwrapper.Spec.AggrResources.GenericItems {
1885+
genericResourceName, gvk, err00 := cc.genericresources.Cleanup(appwrapper, &ar)
1886+
if err00 != nil {
1887+
klog.Errorf("[Cleanup] Error deleting generic item %s, GVK=%s.%s.%s from job=%s Status=%+v err=%+v.",
1888+
genericResourceName, gvk.Group, gvk.Version, gvk.Kind, appwrapper.Name, appwrapper.Status, err00)
1889+
}
18731890
}
18741891
}
1892+
18751893
} else {
1876-
// klog.Infof("[Dispatcher] Cleanup: State=%s\n", queuejob.Status.State)
1877-
//if ! queuejob.Status.CanRun && queuejob.Status.IsDispatched {
1878-
if queuejob.Status.IsDispatched {
1879-
queuejobKey, _ := GetQueueJobKey(queuejob)
1894+
// klog.Infof("[Dispatcher] Cleanup: State=%s\n", appwrapper.Status.State)
1895+
//if ! appwrapper.Status.CanRun && appwrapper.Status.IsDispatched {
1896+
if appwrapper.Status.IsDispatched {
1897+
queuejobKey, _ := GetQueueJobKey(appwrapper)
18801898
if obj, ok := cc.dispatchMap[queuejobKey]; ok {
1881-
cc.agentMap[obj].DeleteJob(queuejob)
1899+
cc.agentMap[obj].DeleteJob(appwrapper)
18821900
}
1883-
queuejob.Status.IsDispatched = false
1901+
appwrapper.Status.IsDispatched = false
18841902
}
18851903
}
18861904

18871905
// Release quota if quota is enabled and quota manager instance exists
18881906
if cc.serverOption.QuotaEnabled && cc.quotaManager != nil {
1889-
cc.quotaManager.Release(queuejob)
1907+
cc.quotaManager.Release(appwrapper)
18901908
}
1891-
queuejob.Status.Pending = 0
1892-
queuejob.Status.Running = 0
1893-
queuejob.Status.Succeeded = 0
1894-
queuejob.Status.Failed = 0
1895-
klog.V(10).Infof("[Cleanup] end AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status)
1909+
appwrapper.Status.Pending = 0
1910+
appwrapper.Status.Running = 0
1911+
appwrapper.Status.Succeeded = 0
1912+
appwrapper.Status.Failed = 0
1913+
klog.V(10).Infof("[Cleanup] end AppWrapper %s Version=%s Status=%+v\n", appwrapper.Name, appwrapper.ResourceVersion, appwrapper.Status)
18961914

18971915
return nil
18981916
}

pkg/controller/queuejobresources/genericresource/genericresource.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,122 @@ func join(strs ...string) string {
6969
return result
7070
}
7171

72+
func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (genericResourceName string, groupversionkind *schema.GroupVersionKind, erro error) {
73+
var err error
74+
err = nil
75+
76+
// Default generic source group-version-kind
77+
default_gvk := &schema.GroupVersionKind{
78+
Group: "unknown",
79+
Version: "unknown",
80+
Kind: "unknown",
81+
}
82+
// Default generic resource name
83+
name :=""
84+
85+
namespaced := true
86+
//todo:DELETEME dd := common.KubeClient.Discovery()
87+
dd := gr.clients.Discovery()
88+
apigroups, err := restmapper.GetAPIGroupResources(dd)
89+
if err != nil {
90+
klog.Errorf("[Cleanup] Error getting API resources, err=%#v", err)
91+
return name, default_gvk, err
92+
}
93+
ext := awr.GenericTemplate
94+
restmapper := restmapper.NewDiscoveryRESTMapper(apigroups)
95+
_, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, default_gvk, nil)
96+
if err != nil {
97+
klog.Errorf("Decoding error, please check your CR! Aborting handling the resource creation, err: `%v`", err)
98+
return name, gvk, err
99+
}
100+
101+
mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version)
102+
if err != nil {
103+
klog.Errorf("mapping error from raw object: `%v`", err)
104+
return name, gvk, err
105+
}
106+
107+
//todo:DELETEME restconfig := common.KubeConfig
108+
restconfig := gr.kubeClientConfig
109+
restconfig.GroupVersion = &schema.GroupVersion{
110+
Group: mapping.GroupVersionKind.Group,
111+
Version: mapping.GroupVersionKind.Version,
112+
}
113+
dclient, err := dynamic.NewForConfig(restconfig)
114+
if err != nil {
115+
klog.Errorf("[Cleanup] Error creating new dynamic client, err=%#v.", err)
116+
return name, gvk, err
117+
}
118+
119+
_, apiresourcelist, err := dd.ServerGroupsAndResources()
120+
if err != nil {
121+
klog.Errorf("Error getting supported groups and resources, err=%#v", err)
122+
return name, gvk, err
123+
}
124+
rsrc := mapping.Resource
125+
for _, apiresourcegroup := range apiresourcelist {
126+
if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
127+
for _, apiresource := range apiresourcegroup.APIResources {
128+
if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
129+
rsrc = mapping.Resource
130+
namespaced = apiresource.Namespaced
131+
}
132+
}
133+
}
134+
}
135+
136+
// Unmarshal generic item raw object
137+
var unstruct unstructured.Unstructured
138+
unstruct.Object = make(map[string]interface{})
139+
var blob interface{}
140+
if err = json.Unmarshal(ext.Raw, &blob); err != nil {
141+
klog.Errorf("[Cleanup] Error unmarshalling, err=%#v", err)
142+
return name, gvk, err
143+
}
144+
145+
unstruct.Object = blob.(map[string]interface{}) //set object to the content of the blob after Unmarshalling
146+
namespace := ""
147+
if md, ok := unstruct.Object["metadata"]; ok {
148+
149+
metadata := md.(map[string]interface{})
150+
if objectName, ok := metadata["name"]; ok {
151+
name = objectName.(string)
152+
}
153+
if objectns, ok := metadata["namespace"]; ok {
154+
namespace = objectns.(string)
155+
}
156+
}
157+
158+
// Get the resource to see if it exists
159+
labelSelector := fmt.Sprintf("%s=%s, %s=%s", appwrapperJobName, aw.Name, resourceName, unstruct.GetName())
160+
inEtcd, err := dclient.Resource(rsrc).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
161+
if err != nil {
162+
return name, gvk, err
163+
}
164+
165+
// Check to see if object already exists in etcd, if not, create the object.
166+
if inEtcd != nil || len(inEtcd.Items) > 0 {
167+
newName := name
168+
if len(newName) > 63 {
169+
newName = newName[:63]
170+
}
171+
172+
err = deleteObject(namespaced, namespace, newName, rsrc, dclient)
173+
if err != nil {
174+
if errors.IsAlreadyExists(err) {
175+
klog.V(4).Infof("%v\n", err.Error())
176+
} else {
177+
klog.Errorf("[Cleanup] Error deleting the object `%v`, the error is `%v`.", newName, errors.ReasonForError(err))
178+
return name, gvk, err
179+
}
180+
}
181+
} else {
182+
klog.Warningf("[Cleanup] %s/%s not found using label selector: %s.\n", name, namespace, labelSelector)
183+
}
184+
185+
return name, gvk, err
186+
}
187+
72188
func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) {
73189
startTime := time.Now()
74190
defer func() {
@@ -340,6 +456,25 @@ func createObject(namespaced bool, namespace string, name string, rsrc schema.Gr
340456
}
341457
}
342458

459+
func deleteObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) (erro error) {
460+
var err error
461+
if !namespaced {
462+
res := dclient.Resource(rsrc)
463+
err = res.Delete(context.Background(), name, metav1.DeleteOptions{})
464+
} else {
465+
res := dclient.Resource(rsrc).Namespace(namespace)
466+
err = res.Delete(context.Background(), name, metav1.DeleteOptions{})
467+
}
468+
469+
if err != nil {
470+
klog.Errorf("[deleteObject] Error deleting the object `%v`, the error is `%v`.", name, errors.ReasonForError(err))
471+
return err
472+
} else {
473+
klog.V(4).Infof("[deleteObject] Resource `%v` deleted.\n", name)
474+
return nil
475+
}
476+
}
477+
343478
func GetListOfPodResourcesFromOneGenericItem(awr *arbv1.AppWrapperGenericResource) (resource []*clusterstateapi.Resource, er error) {
344479
var podResourcesList []*clusterstateapi.Resource
345480

test/e2e/queue.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,37 @@ var _ = Describe("AppWrapper E2E Test", func() {
107107

108108
})
109109

110+
It("MCAD CPU Preemption Test", func() {
111+
fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Preemption Test - Started.\n")
112+
context := initTestContext()
113+
var appwrappers []*arbv1.AppWrapper
114+
appwrappersPtr := &appwrappers
115+
defer cleanupTestObjectsPtr(context, appwrappersPtr)
116+
117+
// This should fill up the worker node and most of the master node
118+
aw := createDeploymentAWwith550CPU(context, "aw-deployment-2-550cpu")
119+
appwrappers = append(appwrappers, aw)
120+
121+
err := waitAWPodsReady(context, aw)
122+
Expect(err).NotTo(HaveOccurred())
123+
124+
// This should not fit on cluster
125+
aw2 := createDeploymentAWwith426CPU(context, "aw-deployment-2-426cpu")
126+
appwrappers = append(appwrappers, aw2)
127+
128+
err = waitAWAnyPodsExists(context, aw2)
129+
Expect(err).To(HaveOccurred())
130+
131+
// This should fit on cluster, initially queued because of aw2 above but should eventually
132+
// run after prevention of aw2 above.
133+
aw3 := createDeploymentAWwith425CPU(context, "aw-deployment-2-425cpu")
134+
appwrappers = append(appwrappers, aw3)
135+
136+
// Since preemption takes some time, increasing timeout wait time to 2 minutes
137+
err = waitAWPodsExists(context, aw3, 120000*time.Millisecond)
138+
Expect(err).NotTo(HaveOccurred())
139+
})
140+
110141
It("Create AppWrapper - StatefulSet Only - 2 Pods", func() {
111142
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - StatefulSet Only - 2 Pods - Started.\n")
112143

test/e2e/util.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,11 @@ func waitAWReadyQuiet(ctx *context, aw *arbv1.AppWrapper) error {
614614
}
615615

616616
func waitAWAnyPodsExists(ctx *context, aw *arbv1.AppWrapper) error {
617-
return wait.Poll(100*time.Millisecond, ninetySeconds, anyPodsExist(ctx, aw.Namespace, aw.Name))
617+
return waitAWPodsExists(ctx, aw, ninetySeconds)
618+
}
619+
620+
func waitAWPodsExists(ctx *context, aw *arbv1.AppWrapper, timeout time.Duration) error {
621+
return wait.Poll(100*time.Millisecond, timeout, anyPodsExist(ctx, aw.Namespace, aw.Name))
618622
}
619623

620624
func waitAWDeleted(ctx *context, aw *arbv1.AppWrapper, pods []*v1.Pod) error {

0 commit comments

Comments
 (0)