Skip to content

Commit 11c3a5c

Browse files
richabankerk8s-publishing-bot
authored andcommitted
discovery/aggregated: Add invalidation callback support
Add callback mechanism to notify peer-aggregated discovery handler when local discovery changes, enabling efficient cache invalidation. Part of KEP-4020: Unknown Version Interoperability Proxy Kubernetes-commit: c72f9f73d92ff5e7a28f06c110acbc9626e4252d
1 parent 1ccb33e commit 11c3a5c

File tree

1 file changed

+46
-18
lines changed

1 file changed

+46
-18
lines changed

pkg/endpoints/discovery/aggregated/handler.go

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type ResourceManager interface {
8585
// The group from the least-numbered source is used
8686
WithSource(source Source) ResourceManager
8787

88+
// AddInvalidationCallback adds a callback to be called when the discovery cache is invalidated.
89+
AddInvalidationCallback(callback func())
90+
8891
http.Handler
8992
}
9093

@@ -116,6 +119,10 @@ func (rm resourceManager) WithSource(source Source) ResourceManager {
116119
}
117120
}
118121

122+
func (rm resourceManager) AddInvalidationCallback(callback func()) {
123+
rm.resourceDiscoveryManager.AddInvalidationCallback(callback)
124+
}
125+
119126
type groupKey struct {
120127
name string
121128

@@ -138,16 +145,28 @@ type resourceDiscoveryManager struct {
138145

139146
// Writes protected by the lock.
140147
// List of all apigroups & resources indexed by the resource manager
141-
lock sync.RWMutex
142-
apiGroups map[groupKey]*apidiscoveryv2.APIGroupDiscovery
143-
versionPriorities map[groupVersionKey]priorityInfo
148+
lock sync.RWMutex
149+
apiGroups map[groupKey]*apidiscoveryv2.APIGroupDiscovery
150+
versionPriorities map[groupVersionKey]priorityInfo
151+
invalidationCallback atomic.Pointer[func()]
144152
}
145153

146154
type priorityInfo struct {
147155
GroupPriorityMinimum int
148156
VersionPriority int
149157
}
150158

159+
func (rdm *resourceDiscoveryManager) AddInvalidationCallback(callback func()) {
160+
rdm.invalidationCallback.Store(&callback)
161+
}
162+
163+
func (rdm *resourceDiscoveryManager) invalidateCacheLocked() {
164+
rdm.cache.Store(nil)
165+
if callback := rdm.invalidationCallback.Load(); callback != nil {
166+
(*callback)()
167+
}
168+
}
169+
151170
func NewResourceManager(path string) ResourceManager {
152171
scheme := runtime.NewScheme()
153172
utilruntime.Must(apidiscoveryv2.AddToScheme(scheme))
@@ -188,15 +207,15 @@ func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(source Source, gv m
188207
GroupPriorityMinimum: groupPriorityMinimum,
189208
VersionPriority: versionPriority,
190209
}
191-
rdm.cache.Store(nil)
210+
rdm.invalidateCacheLocked()
192211
}
193212

194213
func (rdm *resourceDiscoveryManager) SetGroups(source Source, groups []apidiscoveryv2.APIGroupDiscovery) {
195214
rdm.lock.Lock()
196215
defer rdm.lock.Unlock()
197216

198217
rdm.apiGroups = nil
199-
rdm.cache.Store(nil)
218+
rdm.invalidateCacheLocked()
200219

201220
for _, group := range groups {
202221
for _, version := range group.Versions {
@@ -297,7 +316,7 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(source Source, groupN
297316
}
298317

299318
// Reset response document so it is recreated lazily
300-
rdm.cache.Store(nil)
319+
rdm.invalidateCacheLocked()
301320
}
302321

303322
func (rdm *resourceDiscoveryManager) RemoveGroupVersion(source Source, apiGroup metav1.GroupVersion) {
@@ -338,7 +357,7 @@ func (rdm *resourceDiscoveryManager) RemoveGroupVersion(source Source, apiGroup
338357
}
339358

340359
// Reset response document so it is recreated lazily
341-
rdm.cache.Store(nil)
360+
rdm.invalidateCacheLocked()
342361
}
343362

344363
func (rdm *resourceDiscoveryManager) RemoveGroup(source Source, groupName string) {
@@ -521,53 +540,62 @@ func (rdm *resourceDiscoveryManager) serveHTTP(resp http.ResponseWriter, req *ht
521540
response := cache.cachedResponse
522541
etag := cache.cachedResponseETag
523542

524-
mediaType, _, err := negotiation.NegotiateOutputMediaType(req, rdm.serializer, DiscoveryEndpointRestrictions)
543+
writeDiscoveryResponse(&response, etag, rdm.serializer, resp, req)
544+
}
545+
546+
func writeDiscoveryResponse(
547+
resp *apidiscoveryv2.APIGroupDiscoveryList,
548+
etag string,
549+
serializer runtime.NegotiatedSerializer,
550+
w http.ResponseWriter,
551+
req *http.Request,
552+
) {
553+
mediaType, _, err := negotiation.NegotiateOutputMediaType(req, serializer, DiscoveryEndpointRestrictions)
525554
if err != nil {
526555
// Should never happen. wrapper.go will only proxy requests to this
527556
// handler if the media type passes DiscoveryEndpointRestrictions
528557
utilruntime.HandleError(err)
529-
resp.WriteHeader(http.StatusInternalServerError)
558+
w.WriteHeader(http.StatusInternalServerError)
530559
return
531560
}
532561
var targetGV schema.GroupVersion
533562
if mediaType.Convert == nil ||
534563
(mediaType.Convert.GroupVersion() != apidiscoveryv2.SchemeGroupVersion &&
535564
mediaType.Convert.GroupVersion() != apidiscoveryv2beta1.SchemeGroupVersion) {
536565
utilruntime.HandleError(fmt.Errorf("expected aggregated discovery group version, got group: %s, version %s", mediaType.Convert.Group, mediaType.Convert.Version))
537-
resp.WriteHeader(http.StatusInternalServerError)
566+
w.WriteHeader(http.StatusInternalServerError)
538567
return
539568
}
540569

541570
if mediaType.Convert.GroupVersion() == apidiscoveryv2beta1.SchemeGroupVersion &&
542571
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryRemoveBetaType) {
543572
klog.Errorf("aggregated discovery version v2beta1 is removed. Please update to use v2")
544-
resp.WriteHeader(http.StatusNotFound)
573+
w.WriteHeader(http.StatusNotFound)
545574
return
546575
}
547-
548576
targetGV = mediaType.Convert.GroupVersion()
549577

550578
if len(etag) > 0 {
551579
// Use proper e-tag headers if one is available
552580
ServeHTTPWithETag(
553-
&response,
581+
resp,
554582
etag,
555583
targetGV,
556-
rdm.serializer,
557-
resp,
584+
serializer,
585+
w,
558586
req,
559587
)
560588
} else {
561589
// Default to normal response in rare case etag is
562590
// not cached with the object for some reason.
563591
responsewriters.WriteObjectNegotiated(
564-
rdm.serializer,
592+
serializer,
565593
DiscoveryEndpointRestrictions,
566594
targetGV,
567-
resp,
595+
w,
568596
req,
569597
http.StatusOK,
570-
&response,
598+
resp,
571599
true,
572600
)
573601
}

0 commit comments

Comments
 (0)