Skip to content

Commit b6028a0

Browse files
committed
Merge from quota-management latest changes
Signed-off-by: dmatch01 <darroyo@us.ibm.com>
1 parent dfa534f commit b6028a0

File tree

6 files changed

+417
-42
lines changed

6 files changed

+417
-42
lines changed

CONTROLLER_VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.29.41
1+
1.29.43

pkg/apis/controller/v1beta1/zz_generated.deepcopy.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616
/*
17-
Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors.
17+
Copyright 2019, 2021, 2022 The Multi-Cluster App Dispatcher Authors.
1818
1919
Licensed under the Apache License, Version 2.0 (the "License");
2020
you may not use this file except in compliance with the License.
@@ -420,15 +420,16 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
420420
func (qjm *XController) PreemptQueueJobs() {
421421
qjobs := qjm.GetQueueJobsEligibleForPreemption()
422422
var updateNewJob *arbv1.AppWrapper
423-
for _, q := range qjobs {
424-
if q.Status.Running < int32(q.Spec.SchedSpec.MinAvailable) {
425-
newjob, e := qjm.queueJobLister.AppWrappers(q.Namespace).Get(q.Name)
423+
var message string
424+
for _, aw := range qjobs {
425+
if aw.Status.Running < int32(aw.Spec.SchedSpec.MinAvailable) {
426+
newjob, e := qjm.queueJobLister.AppWrappers(aw.Namespace).Get(aw.Name)
426427
if e != nil {
427428
continue
428429
}
429430
newjob.Status.CanRun = false
430431

431-
message := fmt.Sprintf("Insufficient number of Running pods, minimum=%d, running=%v.", q.Spec.SchedSpec.MinAvailable, q.Status.Running)
432+
message = fmt.Sprintf("Insufficient number of Running pods, minimum=%d, running=%v.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running)
432433
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message)
433434
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
434435
updateNewJob = newjob.DeepCopy()
@@ -437,12 +438,12 @@ func (qjm *XController) PreemptQueueJobs() {
437438
//ignore co-scheduler failed scheduling events. This is a temp
438439
//work around until co-scheduler perf issues are resolved.
439440
} else {
440-
newjob, e := qjm.queueJobLister.AppWrappers(q.Namespace).Get(q.Name)
441+
newjob, e := qjm.queueJobLister.AppWrappers(aw.Namespace).Get(aw.Name)
441442
if e != nil {
442443
continue
443444
}
444445
newjob.Status.CanRun = false
445-
message := fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(q.Status.PendingPodConditions), q.Status.Running)
446+
message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(aw.Status.PendingPodConditions), aw.Status.Running)
446447
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling")
447448
if index < 0 {
448449
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "PodsFailedScheduling", message)
@@ -455,9 +456,11 @@ func (qjm *XController) PreemptQueueJobs() {
455456
updateNewJob = newjob.DeepCopy()
456457
}
457458
if err := qjm.updateEtcd(updateNewJob, "PreemptQueueJobs - CanRun: false"); err != nil {
458-
klog.Errorf("Failed to update status of AppWrapper %v/%v: %v", q.Namespace, q.Name, err)
459+
klog.Errorf("Failed to update status of AppWrapper %v/%v: %v", aw.Namespace, aw.Name, err)
459460
}
460-
461+
klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.",
462+
aw.Name, aw.Namespace)
463+
go qjm.backoff(aw, "PreemptionTriggered", string(message))
461464
}
462465
}
463466
func (qjm *XController) preemptAWJobs(preemptAWs []*arbv1.AppWrapper) {
@@ -1909,43 +1912,50 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
19091912
}
19101913

19111914
//Cleanup function
1912-
func (cc *XController) Cleanup(queuejob *arbv1.AppWrapper) error {
1913-
klog.V(3).Infof("[Cleanup] begin AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status)
1915+
func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
1916+
klog.V(3).Infof("[Cleanup] begin AppWrapper %s Version=%s Status=%+v\n", appwrapper.Name, appwrapper.ResourceVersion, appwrapper.Status)
19141917

19151918
if !cc.isDispatcher {
1916-
if queuejob.Spec.AggrResources.Items != nil {
1919+
if appwrapper.Spec.AggrResources.Items != nil {
19171920
// we call clean-up for each controller
1918-
for _, ar := range queuejob.Spec.AggrResources.Items {
1919-
cc.qjobResControls[ar.Type].Cleanup(queuejob, &ar)
1921+
for _, ar := range appwrapper.Spec.AggrResources.Items {
1922+
err00 := cc.qjobResControls[ar.Type].Cleanup(appwrapper, &ar)
1923+
if err00 != nil {
1924+
klog.Errorf("[Cleanup] Error deleting item %s from job=%s Status=%+v err=%+v.",
1925+
ar.Type, appwrapper.Name, appwrapper.Status, err00)
1926+
}
1927+
}
1928+
}
1929+
if appwrapper.Spec.AggrResources.GenericItems != nil {
1930+
for _, ar := range appwrapper.Spec.AggrResources.GenericItems {
1931+
genericResourceName, gvk, err00 := cc.genericresources.Cleanup(appwrapper, &ar)
1932+
if err00 != nil {
1933+
klog.Errorf("[Cleanup] Error deleting generic item %s, GVK=%s.%s.%s from job=%s Status=%+v err=%+v.",
1934+
genericResourceName, gvk.Group, gvk.Version, gvk.Kind, appwrapper.Name, appwrapper.Status, err00)
1935+
}
19201936
}
19211937
}
1922-
// if queuejob.Spec.AggrResources.GenericItems != nil {
1923-
// // we call clean-up for each controller
1924-
// for _, ar := range queuejob.Spec.AggrResources.GenericItems {
1925-
// cc.qjobResControls[ar.Type].Cleanup(queuejob, &ar)
1926-
// }
1927-
// }
19281938
} else {
1929-
// klog.Infof("[Dispatcher] Cleanup: State=%s\n", queuejob.Status.State)
1930-
//if ! queuejob.Status.CanRun && queuejob.Status.IsDispatched {
1931-
if queuejob.Status.IsDispatched {
1932-
queuejobKey, _ := GetQueueJobKey(queuejob)
1939+
// klog.Infof("[Dispatcher] Cleanup: State=%s\n", appwrapper.Status.State)
1940+
//if ! appwrapper.Status.CanRun && appwrapper.Status.IsDispatched {
1941+
if appwrapper.Status.IsDispatched {
1942+
queuejobKey, _ := GetQueueJobKey(appwrapper)
19331943
if obj, ok := cc.dispatchMap[queuejobKey]; ok {
1934-
cc.agentMap[obj].DeleteJob(queuejob)
1944+
cc.agentMap[obj].DeleteJob(appwrapper)
19351945
}
1936-
queuejob.Status.IsDispatched = false
1946+
appwrapper.Status.IsDispatched = false
19371947
}
19381948
}
19391949

19401950
// Release quota if quota is enabled and quota manager instance exists
19411951
if cc.serverOption.QuotaEnabled && cc.quotaManager != nil {
1942-
cc.quotaManager.Release(queuejob)
1952+
cc.quotaManager.Release(appwrapper)
19431953
}
1944-
queuejob.Status.Pending = 0
1945-
queuejob.Status.Running = 0
1946-
queuejob.Status.Succeeded = 0
1947-
queuejob.Status.Failed = 0
1948-
klog.V(10).Infof("[Cleanup] end AppWrapper %s Version=%s Status=%+v\n", queuejob.Name, queuejob.ResourceVersion, queuejob.Status)
1954+
appwrapper.Status.Pending = 0
1955+
appwrapper.Status.Running = 0
1956+
appwrapper.Status.Succeeded = 0
1957+
appwrapper.Status.Failed = 0
1958+
klog.V(10).Infof("[Cleanup] end AppWrapper %s Version=%s Status=%+v\n", appwrapper.Name, appwrapper.ResourceVersion, appwrapper.Status)
19491959

19501960
return nil
19511961
}

pkg/controller/queuejobresources/genericresource/genericresource.go

Lines changed: 147 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,122 @@ func join(strs ...string) string {
6969
return result
7070
}
7171

72+
func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (genericResourceName string, groupversionkind *schema.GroupVersionKind, erro error) {
73+
var err error
74+
err = nil
75+
76+
// Default generic source group-version-kind
77+
default_gvk := &schema.GroupVersionKind{
78+
Group: "unknown",
79+
Version: "unknown",
80+
Kind: "unknown",
81+
}
82+
// Default generic resource name
83+
name :=""
84+
85+
namespaced := true
86+
//todo:DELETEME dd := common.KubeClient.Discovery()
87+
dd := gr.clients.Discovery()
88+
apigroups, err := restmapper.GetAPIGroupResources(dd)
89+
if err != nil {
90+
klog.Errorf("[Cleanup] Error getting API resources, err=%#v", err)
91+
return name, default_gvk, err
92+
}
93+
ext := awr.GenericTemplate
94+
restmapper := restmapper.NewDiscoveryRESTMapper(apigroups)
95+
_, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, default_gvk, nil)
96+
if err != nil {
97+
klog.Errorf("Decoding error, please check your CR! Aborting handling the resource creation, err: `%v`", err)
98+
return name, gvk, err
99+
}
100+
101+
mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version)
102+
if err != nil {
103+
klog.Errorf("mapping error from raw object: `%v`", err)
104+
return name, gvk, err
105+
}
106+
107+
//todo:DELETEME restconfig := common.KubeConfig
108+
restconfig := gr.kubeClientConfig
109+
restconfig.GroupVersion = &schema.GroupVersion{
110+
Group: mapping.GroupVersionKind.Group,
111+
Version: mapping.GroupVersionKind.Version,
112+
}
113+
dclient, err := dynamic.NewForConfig(restconfig)
114+
if err != nil {
115+
klog.Errorf("[Cleanup] Error creating new dynamic client, err=%#v.", err)
116+
return name, gvk, err
117+
}
118+
119+
_, apiresourcelist, err := dd.ServerGroupsAndResources()
120+
if err != nil {
121+
klog.Errorf("Error getting supported groups and resources, err=%#v", err)
122+
return name, gvk, err
123+
}
124+
rsrc := mapping.Resource
125+
for _, apiresourcegroup := range apiresourcelist {
126+
if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
127+
for _, apiresource := range apiresourcegroup.APIResources {
128+
if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
129+
rsrc = mapping.Resource
130+
namespaced = apiresource.Namespaced
131+
}
132+
}
133+
}
134+
}
135+
136+
// Unmarshal generic item raw object
137+
var unstruct unstructured.Unstructured
138+
unstruct.Object = make(map[string]interface{})
139+
var blob interface{}
140+
if err = json.Unmarshal(ext.Raw, &blob); err != nil {
141+
klog.Errorf("[Cleanup] Error unmarshalling, err=%#v", err)
142+
return name, gvk, err
143+
}
144+
145+
unstruct.Object = blob.(map[string]interface{}) //set object to the content of the blob after Unmarshalling
146+
namespace := ""
147+
if md, ok := unstruct.Object["metadata"]; ok {
148+
149+
metadata := md.(map[string]interface{})
150+
if objectName, ok := metadata["name"]; ok {
151+
name = objectName.(string)
152+
}
153+
if objectns, ok := metadata["namespace"]; ok {
154+
namespace = objectns.(string)
155+
}
156+
}
157+
158+
// Get the resource to see if it exists
159+
labelSelector := fmt.Sprintf("%s=%s, %s=%s", appwrapperJobName, aw.Name, resourceName, unstruct.GetName())
160+
inEtcd, err := dclient.Resource(rsrc).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
161+
if err != nil {
162+
return name, gvk, err
163+
}
164+
165+
// Check to see if object already exists in etcd, if not, create the object.
166+
if inEtcd != nil || len(inEtcd.Items) > 0 {
167+
newName := name
168+
if len(newName) > 63 {
169+
newName = newName[:63]
170+
}
171+
172+
err = deleteObject(namespaced, namespace, newName, rsrc, dclient)
173+
if err != nil {
174+
if errors.IsAlreadyExists(err) {
175+
klog.V(4).Infof("%v\n", err.Error())
176+
} else {
177+
klog.Errorf("[Cleanup] Error deleting the object `%v`, the error is `%v`.", newName, errors.ReasonForError(err))
178+
return name, gvk, err
179+
}
180+
}
181+
} else {
182+
klog.Warningf("[Cleanup] %s/%s not found using label selector: %s.\n", name, namespace, labelSelector)
183+
}
184+
185+
return name, gvk, err
186+
}
187+
72188
func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) {
73189
startTime := time.Now()
74190
defer func() {
@@ -340,6 +456,25 @@ func createObject(namespaced bool, namespace string, name string, rsrc schema.Gr
340456
}
341457
}
342458

459+
func deleteObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) (erro error) {
460+
var err error
461+
if !namespaced {
462+
res := dclient.Resource(rsrc)
463+
err = res.Delete(context.Background(), name, metav1.DeleteOptions{})
464+
} else {
465+
res := dclient.Resource(rsrc).Namespace(namespace)
466+
err = res.Delete(context.Background(), name, metav1.DeleteOptions{})
467+
}
468+
469+
if err != nil {
470+
klog.Errorf("[deleteObject] Error deleting the object `%v`, the error is `%v`.", name, errors.ReasonForError(err))
471+
return err
472+
} else {
473+
klog.V(4).Infof("[deleteObject] Resource `%v` deleted.\n", name)
474+
return nil
475+
}
476+
}
477+
343478
func GetListOfPodResourcesFromOneGenericItem(awr *arbv1.AppWrapperGenericResource) (resource []*clusterstateapi.Resource, er error) {
344479
var podResourcesList []*clusterstateapi.Resource
345480

@@ -380,21 +515,25 @@ func GetResources(awr *arbv1.AppWrapperGenericResource) (resource *clusterstatea
380515
var err error
381516
err = nil
382517
if awr.GenericTemplate.Raw != nil {
518+
if len(awr.CustomPodResources) > 0 {
519+
podresources := awr.CustomPodResources
520+
for _, item := range podresources {
521+
res := getPodResources(item)
522+
totalresource = totalresource.Add(res)
523+
}
524+
klog.V(4).Infof("[GetResources] Requested total allocation resource from custompodresources `%v`.\n", totalresource)
525+
return totalresource, err
526+
}
383527
hasContainer, replicas, containers := hasFields(awr.GenericTemplate)
384528
if hasContainer {
385529
for _, item := range containers {
386530
res := getContainerResources(item, replicas)
387531
totalresource = totalresource.Add(res)
388532
}
389-
klog.V(8).Infof("[GetResources] Requested total allocation resource from containers `%v`.\n", totalresource)
390-
} else {
391-
podresources := awr.CustomPodResources
392-
for _, item := range podresources {
393-
res := getPodResources(item)
394-
totalresource = totalresource.Add(res)
395-
}
396-
klog.V(8).Infof("[GetResources] Requested total allocation resource from pods `%v`.\n", totalresource)
533+
klog.V(4).Infof("[GetResources] Requested total allocation resource from containers `%v`.\n", totalresource)
534+
return totalresource, err
397535
}
536+
398537
} else {
399538
err = fmt.Errorf("generic template raw object is not defined (nil)")
400539
}

0 commit comments

Comments
 (0)