Skip to content
Merged
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,6 @@ kubernetes.tar.gz

# .devcontainer files
.devcontainer

#local debug files
cmd/kar-controllers/__debug_bin
apiserver.local.config
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
require (
github.com/emicklei/go-restful v2.16.0+incompatible
github.com/golang/protobuf v1.4.3
github.com/hashicorp/go-multierror v1.1.1
github.com/kubernetes-sigs/custom-metrics-apiserver v0.0.0-20210311094424-0ca2b1909cdc
github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.7.0
Expand Down Expand Up @@ -49,6 +50,7 @@ require (
github.com/google/uuid v1.1.2 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hpcloud/tail v1.0.0 // indirect
github.com/imdario/mergo v0.3.5 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,14 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ package queuejob

import (
"fmt"
qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util"
"math"
"math/rand"
"reflect"
Expand All @@ -42,6 +41,8 @@ import (
"strings"
"time"

qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager"
dto "github.com/prometheus/client_model/go"

Expand Down Expand Up @@ -386,8 +387,12 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
// Setup Quota
if serverOption.QuotaEnabled {
dispatchedAWDemands, dispatchedAWs := cc.getDispatchedAppWrappers()
cc.quotaManager, _ = quotaforestmanager.NewQuotaManager(dispatchedAWDemands, dispatchedAWs, cc.queueJobLister,
cc.quotaManager, err = quotaforestmanager.NewQuotaManager(dispatchedAWDemands, dispatchedAWs, cc.queueJobLister,
config, serverOption)
if err != nil {
klog.Error("Failed to instantiate quota manager: %#v", err)
return nil
}
} else {
cc.quotaManager = nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"

"github.com/hashicorp/go-multierror"
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options"
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
listersv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/listers/controller/v1"
Expand Down Expand Up @@ -100,7 +101,7 @@ var _ = quota.QuotaManagerInterface(&QuotaManager{})
func getDispatchedAppWrapper(dispatchedAWs map[string]*arbv1.AppWrapper, awId string) *arbv1.AppWrapper {
// Find Appwrapper that is run (runnable)
aw := dispatchedAWs[awId]
if aw != nil && aw.Status.CanRun == true {
if aw != nil && aw.Status.CanRun {
return aw
}
return nil
Expand All @@ -109,7 +110,9 @@ func getDispatchedAppWrapper(dispatchedAWs map[string]*arbv1.AppWrapper, awId st
func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, dispatchedAWs map[string]*arbv1.AppWrapper,
awJobLister listersv1.AppWrapperLister, config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) {

if serverOptions.QuotaEnabled == false {
var err error

if !serverOptions.QuotaEnabled {
klog.
Infof("[NewQuotaManager] Quota management is not enabled.")
return nil, nil
Expand All @@ -128,25 +131,24 @@ func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, d
klog.V(10).Infof("[NewQuotaManager] Before initialization QuotaSubtree informer - %s", qm.quotaManagerBackend.String())

// Create a resource plan manager
qm.quotaSubtreeManager, _ = qstmanager.NewQuotaSubtreeManager(config, qm.quotaManagerBackend)
qm.quotaSubtreeManager, err = qstmanager.NewQuotaSubtreeManager(config, qm.quotaManagerBackend)
if err != nil {
klog.Errorf("[NewQuotaManager] failed to instantiate new quota subtree manager, err=%#v", err)
return nil, err
}

// Initialize Forest/Trees if new resource plan manager added to the cache
err := qm.updateForestFromCache()
err = qm.updateForestFromCache()
if err != nil {
klog.Errorf("[dispatchedAWDemands] Failure during Quota Manager Backend Cache refresh, err=%#v", err)
klog.Errorf("[NewQuotaManager] Failure during Quota Manager Backend Cache refresh, err=%#v", err)
return nil, err
}

// Add AppWrappers that have been evaluated as runnable to QuotaManager
err2 := qm.loadDispatchedAWs(dispatchedAWDemands, dispatchedAWs)
if err2 != nil {
klog.Errorf("[dispatchedAWDemands] Failure during Quota Manager Backend Cache refresh, err=%#v",
err2)
// Combine errors for function return
if err != nil {
err = fmt.Errorf("%w; Next error %s", err, err2.Error())
} else {
err = err2
}
err = qm.loadDispatchedAWs(dispatchedAWDemands, dispatchedAWs)
if err != nil {
klog.Errorf("[dispatchedAWDemands] Failure during Quota Manager Backend Cache refresh, err=%#v", err)
return nil, err
}
// Set mode of quota manager
qm.quotaManagerBackend.SetMode(qmbackend.Normal)
Expand All @@ -158,7 +160,7 @@ func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, d
}

qm.initializationDone = true
return qm, err
return qm, nil
}

func (qm *QuotaManager) loadDispatchedAWs(dispatchedAWDemands map[string]*clusterstateapi.Resource,
Expand All @@ -171,76 +173,55 @@ func (qm *QuotaManager) loadDispatchedAWs(dispatchedAWDemands map[string]*cluste
}

// Process list of AppWrappers that are already dispatched
var err error
err = nil
var result *multierror.Error

for k, v := range dispatchedAWDemands {
aw := getDispatchedAppWrapper(dispatchedAWs, k)
if aw != nil {

doesFit, preemptionIds, err2 := qm.Fits(aw, v, nil)
if doesFit == false {
doesFit, preemptionIds, errorMessage := qm.Fits(aw, v, nil)
if !doesFit {
klog.Errorf("[loadDispatchedAWs] Loading of AppWrapper %s/%s failed.",
aw.Namespace, aw.Name)
err = fmt.Errorf("Loading of AppWrapper %s/%s failed, %#v \n",
aw.Namespace, aw.Name, err2)
result = multierror.Append(result, fmt.Errorf("loading of AppWrapper %s/%s failed, %s",
aw.Namespace, aw.Name, errorMessage))
}

if preemptionIds != nil && len(preemptionIds) > 0 {
klog.Errorf("[loadDispatchedAWs] Loading of AppWrapper %s/%s caused invalid preemptions: %v. Quota Manager is in inconsistent state.",
aw.Namespace, aw.Name, preemptionIds)
if err == nil {
err = fmt.Errorf("Loading of AppWrapper %s/%s caused invalid preemptions: %v. Quota Manager is in inconsistent state. \n",
aw.Namespace, aw.Name, preemptionIds)
} else {
err = fmt.Errorf("%w; Next error %s Loading of AppWrapper %s/%s caused invalid preemptions: %v. Quota Manager is in inconsistent state. \n",
err, aw.Namespace, aw.Name, preemptionIds)
}
if len(preemptionIds) > 0 {
klog.Errorf("[loadDispatchedAWs] Loading of AppWrapper %s/%s caused invalid preemptions: %v. Quota Manager is in inconsistent state, reason:",
aw.Namespace, aw.Name, preemptionIds, errorMessage)
result = multierror.Append(result, fmt.Errorf("loading of AppWrapper %s/%s caused invalid preemptions: %v. Quota Manager is in inconsistent state",
aw.Namespace, aw.Name, preemptionIds))
}
klog.V(4).Infof("[loadDispatchedAWs] Dispatched AppWrappers %s/%s found to preload.", aw.Namespace, aw.Name)
} else {
klog.Warningf("[loadDispatchedAWs] Unable to obtain AppWrapper from key: %s. Loading of AppWrapper will be skipped.",
k)
klog.Warningf("[loadDispatchedAWs] Unable to obtain AppWrapper from key: %s. Loading of AppWrapper will be skipped.", k)
}
}

return err
return result.ErrorOrNil()
}

func (qm *QuotaManager) updateForestFromCache() error {
unallocatedConsumers, treeCacheCreateResponse, err := qm.quotaManagerBackend.UpdateForest(QuotaManagerForestName)

if treeCacheCreateResponse != nil {
for k, v := range treeCacheCreateResponse {
danglingNodeNames := v.DanglingNodeNames
if danglingNodeNames != nil {
for _, danglingNodeName := range danglingNodeNames {
klog.Errorf("[updateForestFromCache] Failure to link node %s to tree %s after Quota Manager Backend Cache refresh.", danglingNodeName, k)
}
}
klog.V(10).Infof("[updateForestFromCache] %s", qm.quotaManagerBackend.String())
if err != nil {
return err
}
var result *multierror.Error
for k, v := range treeCacheCreateResponse {
danglingNodeNames := v.DanglingNodeNames
for _, danglingNodeName := range danglingNodeNames {
klog.Errorf("[updateForestFromCache] Failure to link node %s to tree %s after Quota Manager Backend Cache refresh.", danglingNodeName, k)
result = multierror.Append(result, fmt.Errorf("failure to link node %s to tree %s after Quota Manager Backend Cache refresh", danglingNodeName, k))
}
klog.V(10).Infof("[updateForestFromCache] %s", qm.quotaManagerBackend.String())
}

if unallocatedConsumers != nil && len(unallocatedConsumers) > 0 {
for _, unallocatedConsumer := range unallocatedConsumers {
klog.Errorf("[updateForestFromCache] Failure to allocate %s after Quota Manager Backend Cache refresh.", unallocatedConsumer)
}
for _, unallocatedConsumer := range unallocatedConsumers {
klog.Errorf("[updateForestFromCache] Failure to allocate %s after Quota Manager Backend Cache refresh.", unallocatedConsumer)
result = multierror.Append(result, fmt.Errorf("failure to allocate %s after Quota Manager Backend Cache refresh", unallocatedConsumer))
}

return err
}

// Recrusive call to add names of Tree
func (qm *QuotaManager) addChildrenNodes(parentNode TreeNode, treeIDs []string) []string {
if len(parentNode.Children) > 0 {
for _, childNode := range parentNode.Children {
klog.V(10).Infof("[getQuotaTreeIDs] Quota tree response child node from quota mananger: %s", childNode.Name)
treeIDs = qm.addChildrenNodes(childNode, treeIDs)
}
}
treeIDs = append(treeIDs, parentNode.Name)
return treeIDs
return result.ErrorOrNil()
}

func isValidQuota(quotaGroup QuotaGroup, qmTreeIDs []string) bool {
Expand Down Expand Up @@ -324,7 +305,7 @@ func (qm *QuotaManager) getQuotaDesignation(aw *arbv1.AppWrapper) ([]QuotaGroup,
fmt.Fprintf(&allocationMessage, ".")
err = fmt.Errorf(allocationMessage.String())
} else {
err = fmt.Errorf("Unknown error verifying quota designations.")
err = fmt.Errorf("unknown error verifying quota designations")
}
klog.V(6).Infof("[getQuotaDesignation] No valid quota management IDs found for AppWrapper Job: %s/%s, err=%#v",
aw.Namespace, aw.Name, err)
Expand Down Expand Up @@ -446,7 +427,7 @@ func (qm *QuotaManager) buildRequest(aw *arbv1.AppWrapper,
awResDemands *clusterstateapi.Resource) (*qmbackend.ConsumerInfo, error) {
awId := util.CreateId(aw.Namespace, aw.Name)
if len(awId) <= 0 {
err := fmt.Errorf("[buildRequest] Request failed due to invalid AppWrapper due to empty namespace: %s or name:%s.", aw.Namespace, aw.Name)
err := fmt.Errorf("[buildRequest] Request failed due to invalid AppWrapper due to empty namespace: %s or name:%s", aw.Namespace, aw.Name)
return nil, err
}

Expand Down Expand Up @@ -501,14 +482,6 @@ func (qm *QuotaManager) buildRequest(aw *arbv1.AppWrapper,

return consumerInfo, err
}

func (qm *QuotaManager) refreshQuotaDefiniions() error {
// Initialize Forest/Trees if new resource plan manager added to the cache
err := qm.updateForestFromCache()

return err
}

func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi.Resource,
proposedPreemptions []*arbv1.AppWrapper) (bool, []*arbv1.AppWrapper, string) {

Expand All @@ -522,7 +495,7 @@ func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi
}

// If Quota Manager initialization is complete but quota manager backend is in maintenance mode assume quota
// Processing quota requests is allow during initialization and backend is in maitenace mode for recovery purposes
// Processing quota requests is allow during initialization and backend is in maintenance mode for recovery purposes
if qm.quotaManagerBackend.GetMode() == qmbackend.Maintenance && qm.initializationDone {
klog.Warningf("[Fits] Quota Manager backend in maintenance mode. Unable to process request for AppWrapper: %s/%s",
aw.Namespace, aw.Name)
Expand All @@ -533,7 +506,7 @@ func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi
if qm.quotaSubtreeManager.IsQuotasubtreeChanged() {
// Load QuotaSubtree Cache into Quoto Management Backend Cache
qm.quotaSubtreeManager.LoadQuotaSubtreesIntoBackend()
// Realize new Quoto Management tree(s) from Backend Cache
// Realize new Quota Management tree(s) from Backend Cache
err := qm.updateForestFromCache()
if err != nil {
klog.Errorf("[Fits] Failure during refresh of quota tree(s), err=%#v.", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// b private
// ------------------------------------------------------ {COPYRIGHT-TOP} ---
// Copyright 2019, 2021, 2022, 2023 The Multi-Cluster App Dispatcher Authors.
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
Expand All @@ -18,12 +18,14 @@
package quotasubtmgr

import (
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/core"
"k8s.io/klog/v2"
"errors"
"strconv"
"strings"
"sync"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/core"
"k8s.io/klog/v2"

qstv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/quotaplugins/quotasubtree/v1"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr/quotasubtmgr/util"
qmlib "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota"
Expand All @@ -39,7 +41,6 @@ import (
qstinformers "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/quotasubtree/informers/externalversions/quotasubtree/v1"
)


// New returns a new implementation.
func NewQuotaSubtreeManager(config *rest.Config, quotaManagerBackend *qmlib.Manager) (*QuotaSubtreeManager, error) {
return newQuotaSubtreeManager(config, quotaManagerBackend)
Expand Down Expand Up @@ -95,7 +96,9 @@ func newQuotaSubtreeManager(config *rest.Config, quotaManagerBackend *qmlib.Mana
// Wait for cache sync
klog.V(10).Infof("[newQuotaSubtreeManager] Waiting for QuotaSubtree informer cache sync. to complete.")
qstm.qstSynced = qstm.quotaSubtreeInformer.Informer().HasSynced
cache.WaitForCacheSync(neverStop, qstm.qstSynced)
if ! cache.WaitForCacheSync(neverStop, qstm.qstSynced) {
return nil, errors.New("failed to wait for the quota sub tree informer to synch")
}

// Initialize Quota Trees
qstm.initializeQuotaTreeBackend()
Expand Down