Skip to content

Commit 54291a5

Browse files
authored
Merge pull request kubernetes#132096 from pohly/dra-kubelet-refactoring
DRA kubelet: refactoring
2 parents c1ed20c + 7b1f499 commit 54291a5

File tree

12 files changed

+624
-697
lines changed

12 files changed

+624
-697
lines changed

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ type containerManagerImpl struct {
131131
memoryManager memorymanager.Manager
132132
// Interface for Topology resource co-ordination
133133
topologyManager topologymanager.Manager
134-
// Interface for Dynamic Resource Allocation management.
135-
draManager dra.Manager
134+
// Implementation of Dynamic Resource Allocation (DRA).
135+
draManager *dra.Manager
136136
// kubeClient is the interface to the Kubernetes API server. May be nil if the kubelet is running in standalone mode.
137137
kubeClient clientset.Interface
138138
}
@@ -310,7 +310,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
310310
// Initialize DRA manager
311311
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
312312
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
313-
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
313+
cm.draManager, err = dra.NewManager(kubeClient, nodeConfig.KubeletRootDir)
314314
if err != nil {
315315
return nil, err
316316
}

pkg/kubelet/cm/dra/manager.go

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
"k8s.io/dynamic-resource-allocation/resourceclaim"
3333
"k8s.io/klog/v2"
3434
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
35-
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
35+
draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
3636
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
3737
"k8s.io/kubernetes/pkg/kubelet/config"
3838
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -46,14 +46,34 @@ const draManagerStateFileName = "dra_manager_state"
4646
// defaultReconcilePeriod is the default reconciliation period to keep all claim info state in sync.
4747
const defaultReconcilePeriod = 60 * time.Second
4848

49+
// The time that DRA drivers have to come back after being unregistered
50+
// before the kubelet removes their ResourceSlices.
51+
//
52+
// This must be long enough to actually allow stopping a pod and
53+
// starting the replacement (otherwise ResourceSlices get deleted
54+
// unnecessarily) and not too long (otherwise the time window were
55+
// pods might still get scheduled to the node after removal of a
56+
// driver is too long).
57+
//
58+
// 30 seconds might be long enough for a simple container restart.
59+
// If a DRA driver wants to be sure that slices don't get wiped,
60+
// it should use rolling updates.
61+
const defaultWipingDelay = 30 * time.Second
62+
4963
// ActivePodsFunc is a function that returns a list of pods to reconcile.
5064
type ActivePodsFunc func() []*v1.Pod
5165

5266
// GetNodeFunc is a function that returns the node object using the kubelet's node lister.
5367
type GetNodeFunc func() (*v1.Node, error)
5468

55-
// ManagerImpl is the structure in charge of managing DRA drivers.
56-
type ManagerImpl struct {
69+
// Manager is responsible for managing ResourceClaims.
70+
// It ensures that they are prepared before starting pods
71+
// and that they are unprepared before the last consuming
72+
// pod is declared as terminated.
73+
type Manager struct {
74+
// draPlugins manages the registered plugins.
75+
draPlugins *draplugin.DRAPluginManager
76+
5777
// cache contains cached claim info
5878
cache *claimInfoCache
5979

@@ -70,12 +90,9 @@ type ManagerImpl struct {
7090

7191
// KubeClient reference
7292
kubeClient clientset.Interface
73-
74-
// getNode is a function that returns the node object using the kubelet's node lister.
75-
getNode GetNodeFunc
7693
}
7794

78-
// NewManagerImpl creates a new manager.
95+
// NewManager creates a new DRA manager.
7996
//
8097
// Most errors returned by the manager show up in the context of a pod.
8198
// They try to adhere to the following convention:
@@ -84,7 +101,7 @@ type ManagerImpl struct {
84101
// - Don't include the namespace, it can be inferred from the context.
85102
// - Avoid repeated "failed to ...: failed to ..." when wrapping errors.
86103
// - Avoid wrapping when it does not provide relevant additional information to keep the user-visible error short.
87-
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
104+
func NewManager(kubeClient clientset.Interface, stateFileDirectory string) (*Manager, error) {
88105
claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
89106
if err != nil {
90107
return nil, fmt.Errorf("create ResourceClaim cache: %w", err)
@@ -94,7 +111,7 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
94111
// We should consider making it configurable in the future.
95112
reconcilePeriod := defaultReconcilePeriod
96113

97-
manager := &ManagerImpl{
114+
manager := &Manager{
98115
cache: claimInfoCache,
99116
kubeClient: kubeClient,
100117
reconcilePeriod: reconcilePeriod,
@@ -105,34 +122,29 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
105122
return manager, nil
106123
}
107124

108-
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
109-
// The time that DRA drivers have to come back after being unregistered
110-
// before the kubelet removes their ResourceSlices.
111-
//
112-
// This must be long enough to actually allow stopping a pod and
113-
// starting the replacement (otherwise ResourceSlices get deleted
114-
// unnecessarily) and not too long (otherwise the time window were
115-
// pods might still get scheduled to the node after removal of a
116-
// driver is too long).
117-
//
118-
// 30 seconds might be long enough for a simple container restart.
119-
// If a DRA driver wants to be sure that slices don't get wiped,
120-
// it should use rolling updates.
121-
wipingDelay := 30 * time.Second
122-
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay))
125+
// GetWatcherHandler must be called after Start, it indirectly depends
126+
// on parameters which only get passed to Start, for example the context.
127+
func (m *Manager) GetWatcherHandler() cache.PluginHandler {
128+
return m.draPlugins
123129
}
124130

125131
// Start starts the reconcile loop of the manager.
126-
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error {
132+
func (m *Manager) Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error {
133+
m.initDRAPluginManager(ctx, getNode, defaultWipingDelay)
127134
m.activePods = activePods
128-
m.getNode = getNode
129135
m.sourcesReady = sourcesReady
130136
go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod)
131137
return nil
132138
}
133139

140+
// initPluginManager can be used instead of Start to make the manager useable
141+
// for calls to prepare/unprepare. It exists primarily for testing purposes.
142+
func (m *Manager) initDRAPluginManager(ctx context.Context, getNode GetNodeFunc, wipingDelay time.Duration) {
143+
m.draPlugins = draplugin.NewDRAPluginManager(ctx, m.kubeClient, getNode, wipingDelay)
144+
}
145+
134146
// reconcileLoop ensures that any stale state in the manager's claimInfoCache gets periodically reconciled.
135-
func (m *ManagerImpl) reconcileLoop(ctx context.Context) {
147+
func (m *Manager) reconcileLoop(ctx context.Context) {
136148
logger := klog.FromContext(ctx)
137149
// Only once all sources are ready do we attempt to reconcile.
138150
// This ensures that the call to m.activePods() below will succeed with
@@ -184,7 +196,7 @@ func (m *ManagerImpl) reconcileLoop(ctx context.Context) {
184196
// for the input container, issue NodePrepareResources rpc requests
185197
// for each new resource requirement, process their responses and update the cached
186198
// containerResources on success.
187-
func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
199+
func (m *Manager) PrepareResources(ctx context.Context, pod *v1.Pod) error {
188200
startTime := time.Now()
189201
err := m.prepareResources(ctx, pod)
190202
metrics.DRAOperationsDuration.WithLabelValues("PrepareResources", strconv.FormatBool(err == nil)).Observe(time.Since(startTime).Seconds())
@@ -194,10 +206,10 @@ func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
194206
return nil
195207
}
196208

197-
func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
209+
func (m *Manager) prepareResources(ctx context.Context, pod *v1.Pod) error {
198210
var err error
199211
logger := klog.FromContext(ctx)
200-
batches := make(map[*dra.Plugin][]*drapb.Claim)
212+
batches := make(map[*draplugin.DRAPlugin][]*drapb.Claim)
201213
resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim)
202214

203215
// Do a validation pass *without* changing the claim info cache.
@@ -213,7 +225,7 @@ func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
213225
resourceClaim *resourceapi.ResourceClaim
214226
podClaim *v1.PodResourceClaim
215227
claimInfo *ClaimInfo
216-
drivers map[string]*dra.Plugin
228+
plugins map[string]*draplugin.DRAPlugin
217229
}, len(pod.Spec.ResourceClaims))
218230
for i := range pod.Spec.ResourceClaims {
219231
podClaim := &pod.Spec.ResourceClaims[i]
@@ -260,17 +272,17 @@ func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
260272
return fmt.Errorf("ResourceClaim %s: %w", resourceClaim.Name, err)
261273
}
262274
infos[i].claimInfo = claimInfo
263-
infos[i].drivers = make(map[string]*dra.Plugin, len(claimInfo.DriverState))
275+
infos[i].plugins = make(map[string]*draplugin.DRAPlugin, len(claimInfo.DriverState))
264276
for driverName := range claimInfo.DriverState {
265-
if plugin := infos[i].drivers[driverName]; plugin != nil {
277+
if plugin := infos[i].plugins[driverName]; plugin != nil {
266278
continue
267279
}
268-
client, err := dra.NewDRAPluginClient(driverName)
280+
plugin, err := m.draPlugins.GetPlugin(driverName)
269281
if err != nil {
270282
// No wrapping, error includes driver name already.
271283
return err
272284
}
273-
infos[i].drivers[driverName] = client
285+
infos[i].plugins[driverName] = plugin
274286
}
275287
}
276288

@@ -326,8 +338,8 @@ func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
326338
Name: claimInfo.ClaimName,
327339
}
328340
for driverName := range claimInfo.DriverState {
329-
client := infos[i].drivers[driverName]
330-
batches[client] = append(batches[client], claim)
341+
plugin := infos[i].plugins[driverName]
342+
batches[plugin] = append(batches[plugin], claim)
331343
}
332344
}
333345

@@ -342,9 +354,9 @@ func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
342354
// Call NodePrepareResources for all claims in each batch.
343355
// If there is any error, processing gets aborted.
344356
// We could try to continue, but that would make the code more complex.
345-
for client, claims := range batches {
357+
for plugin, claims := range batches {
346358
// Call NodePrepareResources RPC for all resource handles.
347-
response, err := client.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{Claims: claims})
359+
response, err := plugin.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{Claims: claims})
348360
if err != nil {
349361
// General error unrelated to any particular claim.
350362
return fmt.Errorf("NodePrepareResources: %w", err)
@@ -367,7 +379,7 @@ func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
367379
return fmt.Errorf("internal error: unable to get claim info for ResourceClaim %s", claim.Name)
368380
}
369381
for _, device := range result.GetDevices() {
370-
info.addDevice(client.Name(), state.Device{PoolName: device.PoolName, DeviceName: device.DeviceName, RequestNames: device.RequestNames, CDIDeviceIDs: device.CDIDeviceIDs})
382+
info.addDevice(plugin.DriverName(), state.Device{PoolName: device.PoolName, DeviceName: device.DeviceName, RequestNames: device.RequestNames, CDIDeviceIDs: device.CDIDeviceIDs})
371383
}
372384
return nil
373385
})
@@ -421,7 +433,7 @@ func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
421433

422434
// GetResources gets a ContainerInfo object from the claimInfo cache.
423435
// This information is used by the caller to update a container config.
424-
func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
436+
func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
425437
cdiDevices := []kubecontainer.CDIDevice{}
426438

427439
for i := range pod.Spec.ResourceClaims {
@@ -466,7 +478,7 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
466478
// This function is idempotent and may be called multiple times against the same pod.
467479
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
468480
// already been successfully unprepared.
469-
func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error {
481+
func (m *Manager) UnprepareResources(ctx context.Context, pod *v1.Pod) error {
470482
startTime := time.Now()
471483
err := m.unprepareResourcesForPod(ctx, pod)
472484
metrics.DRAOperationsDuration.WithLabelValues("UnprepareResources", strconv.FormatBool(err == nil)).Observe(time.Since(startTime).Seconds())
@@ -476,7 +488,7 @@ func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error
476488
return nil
477489
}
478490

479-
func (m *ManagerImpl) unprepareResourcesForPod(ctx context.Context, pod *v1.Pod) error {
491+
func (m *Manager) unprepareResourcesForPod(ctx context.Context, pod *v1.Pod) error {
480492
var claimNames []string
481493
for i := range pod.Spec.ResourceClaims {
482494
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
@@ -495,7 +507,7 @@ func (m *ManagerImpl) unprepareResourcesForPod(ctx context.Context, pod *v1.Pod)
495507
return m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames)
496508
}
497509

498-
func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error {
510+
func (m *Manager) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error {
499511
logger := klog.FromContext(ctx)
500512
batches := make(map[string][]*drapb.Claim)
501513
claimNamesMap := make(map[types.UID]string)
@@ -549,12 +561,12 @@ func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID,
549561
// We could try to continue, but that would make the code more complex.
550562
for driverName, claims := range batches {
551563
// Call NodeUnprepareResources RPC for all resource handles.
552-
client, err := dra.NewDRAPluginClient(driverName)
553-
if client == nil {
564+
plugin, err := m.draPlugins.GetPlugin(driverName)
565+
if plugin == nil {
554566
// No wrapping, error includes driver name already.
555567
return err
556568
}
557-
response, err := client.NodeUnprepareResources(ctx, &drapb.NodeUnprepareResourcesRequest{Claims: claims})
569+
response, err := plugin.NodeUnprepareResources(ctx, &drapb.NodeUnprepareResourcesRequest{Claims: claims})
558570
if err != nil {
559571
// General error unrelated to any particular claim.
560572
return fmt.Errorf("NodeUnprepareResources: %w", err)
@@ -601,14 +613,14 @@ func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID,
601613

602614
// PodMightNeedToUnprepareResources returns true if the pod might need to
603615
// unprepare resources
604-
func (m *ManagerImpl) PodMightNeedToUnprepareResources(uid types.UID) bool {
616+
func (m *Manager) PodMightNeedToUnprepareResources(uid types.UID) bool {
605617
m.cache.Lock()
606618
defer m.cache.Unlock()
607619
return m.cache.hasPodReference(uid)
608620
}
609621

610622
// GetContainerClaimInfos gets Container's ClaimInfo
611-
func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
623+
func (m *Manager) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
612624
claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims))
613625

614626
for i, podResourceClaim := range pod.Spec.ResourceClaims {

0 commit comments

Comments
 (0)