Skip to content

Commit 60b84f8

Browse files
Merge pull request #134941 from p0lyn0mial/upstream-watchlist-unsupported-wl-semantics-cacher-consistency-checker
apiserver/pkg/storage/cacher/lister_watcher: pass RV for request from the watchlist consistency checker Kubernetes-commit: d8f8b017e19af984908153aa1c66eb563bdc5940
2 parents 6ac521f + 2936cfc commit 60b84f8

File tree

2 files changed

+112
-10
lines changed

2 files changed

+112
-10
lines changed

pkg/storage/cacher/lister_watcher.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,30 @@ import (
3131
"k8s.io/apiserver/pkg/storage"
3232
utilfeature "k8s.io/apiserver/pkg/util/feature"
3333
"k8s.io/client-go/tools/cache"
34+
"k8s.io/client-go/util/consistencydetector"
3435
"k8s.io/client-go/util/watchlist"
3536
)
3637

3738
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
3839
type listerWatcher struct {
39-
storage storage.Interface
40-
resourcePrefix string
41-
newListFunc func() runtime.Object
42-
contextMetadata metadata.MD
43-
unsupportedWatchListSemantics bool
40+
storage storage.Interface
41+
resourcePrefix string
42+
newListFunc func() runtime.Object
43+
contextMetadata metadata.MD
44+
45+
unsupportedWatchListSemantics bool
46+
watchListConsistencyCheckEnabled bool
4447
}
4548

4649
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
4750
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object, contextMetadata metadata.MD) cache.ListerWatcher {
4851
return &listerWatcher{
49-
storage: storage,
50-
resourcePrefix: resourcePrefix,
51-
newListFunc: newListFunc,
52-
contextMetadata: contextMetadata,
53-
unsupportedWatchListSemantics: watchlist.DoesClientNotSupportWatchListSemantics(storage),
52+
storage: storage,
53+
resourcePrefix: resourcePrefix,
54+
newListFunc: newListFunc,
55+
contextMetadata: contextMetadata,
56+
unsupportedWatchListSemantics: watchlist.DoesClientNotSupportWatchListSemantics(storage),
57+
watchListConsistencyCheckEnabled: consistencydetector.IsDataConsistencyDetectionForWatchListEnabled(),
5458
}
5559
}
5660

@@ -69,6 +73,27 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error
6973
Predicate: pred,
7074
Recursive: true,
7175
}
76+
77+
// The ConsistencyChecker built into reflectors for the WatchList feature is responsible
78+
// for verifying that the data received from the server (potentially from the watch cache)
79+
// is consistent with the data stored in etcd.
80+
//
81+
// To perform this verification, the checker uses the ResourceVersion obtained from the initial request
82+
// and sets the ResourceVersionMatch so that it retrieves exactly the same data directly from etcd.
83+
// This allows comparing both data sources and confirming their consistency.
84+
//
85+
// The code below checks whether the incoming request originates from the ConsistencyChecker.
86+
// If so, it allows explicitly setting the ResourceVersion.
87+
//
88+
// As of Oct 2025, reflector on its own is not setting RVM=Exact.
89+
// However, even if that changes in the meantime, we would have to propagate that
90+
// down to storage to ensure the correct semantics of the request.
91+
watchListEnabled := utilfeature.DefaultFeatureGate.Enabled(features.WatchList)
92+
supportedRVM := options.ResourceVersionMatch == metav1.ResourceVersionMatchExact
93+
if watchListEnabled && lw.watchListConsistencyCheckEnabled && supportedRVM {
94+
storageOpts.ResourceVersion = options.ResourceVersion
95+
}
96+
7297
ctx := context.Background()
7398
if lw.contextMetadata != nil {
7499
ctx = metadata.NewOutgoingContext(ctx, lw.contextMetadata)

pkg/storage/cacher/lister_watcher_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/apimachinery/pkg/watch"
2626
"k8s.io/apiserver/pkg/apis/example"
2727
"k8s.io/apiserver/pkg/features"
28+
"k8s.io/apiserver/pkg/storage"
2829
storagetesting "k8s.io/apiserver/pkg/storage/testing"
2930
utilfeature "k8s.io/apiserver/pkg/util/feature"
3031
"k8s.io/client-go/tools/cache"
@@ -218,3 +219,79 @@ func TestCacherListerWatcherWhenListWatchDisabled(t *testing.T) {
218219
t.Fatalf("Expected error %q, but got %q", expectedErrMsg, err.Error())
219220
}
220221
}
222+
223+
func TestListerWatcherListResourceVersionPropagation(t *testing.T) {
224+
scenarios := []struct {
225+
name string
226+
options metav1.ListOptions
227+
watchListEnabled bool
228+
watchListConsistencyCheckEnabled bool
229+
expectedStorageResourceVer string
230+
}{
231+
{
232+
name: "WatchList FG disabled - RV not propagated",
233+
options: metav1.ListOptions{
234+
ResourceVersion: "123",
235+
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
236+
},
237+
watchListEnabled: false,
238+
watchListConsistencyCheckEnabled: true,
239+
expectedStorageResourceVer: "",
240+
},
241+
{
242+
name: "WatchList consistency check disabled - RV not propagated",
243+
options: metav1.ListOptions{
244+
ResourceVersion: "123",
245+
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
246+
},
247+
watchListEnabled: true,
248+
watchListConsistencyCheckEnabled: false,
249+
expectedStorageResourceVer: "",
250+
},
251+
{
252+
name: "Unsupported RVM - RV not propagated",
253+
options: metav1.ListOptions{
254+
ResourceVersion: "123",
255+
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
256+
},
257+
watchListEnabled: true,
258+
watchListConsistencyCheckEnabled: true,
259+
expectedStorageResourceVer: "",
260+
},
261+
{
262+
name: "all conditions satisfied - RV propagated",
263+
options: metav1.ListOptions{
264+
ResourceVersion: "123",
265+
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
266+
},
267+
watchListEnabled: true,
268+
watchListConsistencyCheckEnabled: true,
269+
expectedStorageResourceVer: "123",
270+
},
271+
}
272+
273+
for _, scenario := range scenarios {
274+
t.Run(scenario.name, func(t *testing.T) {
275+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, scenario.watchListEnabled)
276+
277+
backingStorage := &dummyStorage{}
278+
var capturedOpts storage.ListOptions
279+
backingStorage.getListFn = func(ctx context.Context, resPrefix string, opts storage.ListOptions, listObj runtime.Object) error {
280+
capturedOpts = opts
281+
return nil
282+
}
283+
284+
targetInterface := NewListerWatcher(backingStorage, "/pods/", func() runtime.Object { return &example.PodList{} }, nil)
285+
target := targetInterface.(*listerWatcher)
286+
target.watchListConsistencyCheckEnabled = scenario.watchListConsistencyCheckEnabled
287+
288+
if _, err := target.List(scenario.options); err != nil {
289+
t.Fatalf("List returned error: %v", err)
290+
}
291+
292+
if capturedOpts.ResourceVersion != scenario.expectedStorageResourceVer {
293+
t.Fatalf("expected storage ResourceVersion %q, got %q", scenario.expectedStorageResourceVer, capturedOpts.ResourceVersion)
294+
}
295+
})
296+
}
297+
}

0 commit comments

Comments
 (0)