Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/agent/dummy_data_gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ type dummyDataGatherer struct {
FailedAttempts int
}

func (g *dummyDataGatherer) Run(stopCh <-chan struct{}) error {
func (g *dummyDataGatherer) Run(ctx context.Context) error {
// no async functionality, see Fetch
return nil
}

func (g *dummyDataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error {
func (g *dummyDataGatherer) WaitForCacheSync(ctx context.Context) error {
// no async functionality, see Fetch
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
// blocks until the supplied channel is closed.
// For this reason, we must allow these errgroup Go routines to exit
// without cancelling the other Go routines in the group.
if err := newDg.Run(gctx.Done()); err != nil {
if err := newDg.Run(gctx); err != nil {
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
}
return nil
Expand Down Expand Up @@ -220,7 +220,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
// wait for the informer to complete an initial sync, we do this to
// attempt to have an initial set of data for the first upload of
// the run.
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil {
if err := dg.WaitForCacheSync(bootCtx); err != nil {
// log sync failure, this might recover in future
if errors.Is(err, k8s.ErrCacheSyncTimeout) {
timedoutDGs = append(timedoutDGs, dgConfig.Name)
Expand Down
4 changes: 2 additions & 2 deletions pkg/datagatherer/datagatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type DataGatherer interface {
Fetch() (data interface{}, count int, err error)
// Run starts the data gatherer's informers for resource collection.
// Returns error if the data gatherer informer wasn't initialized
Run(stopCh <-chan struct{}) error
Run(ctx context.Context) error
// WaitForCacheSync waits for the data gatherer's informers cache to sync.
WaitForCacheSync(stopCh <-chan struct{}) error
WaitForCacheSync(ctx context.Context) error
// Delete, clear the cache of the DataGatherer if one is being used
Delete() error
}
4 changes: 2 additions & 2 deletions pkg/datagatherer/k8s/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ type DataGathererDiscovery struct {
cl *discovery.DiscoveryClient
}

func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error {
func (g *DataGathererDiscovery) Run(ctx context.Context) error {
// no async functionality, see Fetch
return nil
}

func (g *DataGathererDiscovery) WaitForCacheSync(stopCh <-chan struct{}) error {
func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error {
// no async functionality, see Fetch
return nil
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/datagatherer/k8s/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
dgCache := cache.New(5*time.Minute, 30*time.Second)

newDataGatherer := &DataGathererDynamic{
ctx: ctx,
groupVersionResource: c.GroupVersionResource,
fieldSelector: fieldSelector.String(),
namespaces: c.IncludeNamespaces,
Expand Down Expand Up @@ -217,7 +216,7 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer()
}

registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
registration, err := newDataGatherer.informer.AddEventHandlerWithOptions(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
onAdd(log, obj, dgCache)
},
Expand All @@ -227,6 +226,8 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
DeleteFunc: func(obj interface{}) {
onDelete(log, obj, dgCache)
},
}, k8scache.HandlerOptions{
Logger: &log,
})
if err != nil {
return nil, err
Expand All @@ -243,7 +244,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
// This is to allow us to support arbitrary CRDs and resources that Preflight
// does not have registered as part of its `runtime.Scheme`.
type DataGathererDynamic struct {
ctx context.Context
// groupVersionResource is the name of the API group, version and resource
// that should be fetched by this data gatherer.
groupVersionResource schema.GroupVersionResource
Expand All @@ -269,8 +269,8 @@ type DataGathererDynamic struct {
// Run starts the dynamic data gatherer's informers for resource collection.
// Returns error if the data gatherer informer wasn't initialized, Run blocks
// until the stopCh is closed.
func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
log := klog.FromContext(g.ctx)
func (g *DataGathererDynamic) Run(ctx context.Context) error {
log := klog.FromContext(ctx)
if g.informer == nil {
return fmt.Errorf("informer was not initialized, impossible to start")
}
Expand All @@ -288,7 +288,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
}

// start shared informer
g.informer.Run(stopCh)
g.informer.RunWithContext(ctx)

return nil
}
Expand All @@ -298,8 +298,9 @@ var ErrCacheSyncTimeout = fmt.Errorf("timed out waiting for Kubernetes cache to
// WaitForCacheSync waits for the data gatherer's informers cache to sync before
// collecting the resources. Use errors.Is(err, ErrCacheSyncTimeout) to check if
// the cache sync failed.
func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error {
if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) {
func (g *DataGathererDynamic) WaitForCacheSync(ctx context.Context) error {
// Don't use WaitForNamedCacheSync, since we don't want to log extra messages.
if !k8scache.WaitForCacheSync(ctx.Done(), g.registration.HasSynced) {
return ErrCacheSyncTimeout
}

Expand Down
16 changes: 4 additions & 12 deletions pkg/datagatherer/k8s/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) {
}

expected := &DataGathererDynamic{
ctx: ctx,
groupVersionResource: config.GroupVersionResource,
// it's important that the namespaces are set as the IncludeNamespaces
// during initialization
Expand All @@ -144,9 +143,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) {

gatherer := dg.(*DataGathererDynamic)
// test gatherer's fields
if !reflect.DeepEqual(gatherer.ctx, expected.ctx) {
t.Errorf("expected %v, got %v", expected, dg)
}
if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) {
t.Errorf("expected %v, got %v", expected, dg)
}
Expand Down Expand Up @@ -180,7 +176,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) {
}

expected := &DataGathererDynamic{
ctx: ctx,
groupVersionResource: config.GroupVersionResource,
// it's important that the namespaces are set as the IncludeNamespaces
// during initialization
Expand All @@ -189,9 +184,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) {

gatherer := dg.(*DataGathererDynamic)
// test gatherer's fields
if !reflect.DeepEqual(gatherer.ctx, expected.ctx) {
t.Errorf("expected %v, got %v", expected, dg)
}
if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) {
t.Errorf("expected %v, got %v", expected, dg)
}
Expand Down Expand Up @@ -693,11 +685,11 @@ func TestDynamicGatherer_Fetch(t *testing.T) {
// start data gatherer informer
dynamiDg := dg
go func() {
if err = dynamiDg.Run(ctx.Done()); err != nil {
if err = dynamiDg.Run(ctx); err != nil {
t.Errorf("unexpected client error: %+v", err)
}
}()
err = dynamiDg.WaitForCacheSync(ctx.Done())
err = dynamiDg.WaitForCacheSync(ctx)
if err != nil {
t.Fatalf("unexpected client error: %+v", err)
}
Expand Down Expand Up @@ -1010,11 +1002,11 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) {
// start data gatherer informer
dynamiDg := dg
go func() {
if err = dynamiDg.Run(ctx.Done()); err != nil {
if err = dynamiDg.Run(ctx); err != nil {
t.Errorf("unexpected client error: %+v", err)
}
}()
err = dynamiDg.WaitForCacheSync(ctx.Done())
err = dynamiDg.WaitForCacheSync(ctx)
if err != nil {
t.Fatalf("unexpected client error: %+v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datagatherer/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Config) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer
}, nil
}

func (g *DataGatherer) Run(stopCh <-chan struct{}) error {
func (g *DataGatherer) Run(ctx context.Context) error {
// no async functionality, see Fetch
return nil
}
Expand All @@ -48,7 +48,7 @@ func (g *DataGatherer) Delete() error {
return nil
}

func (g *DataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error {
func (g *DataGatherer) WaitForCacheSync(ctx context.Context) error {
// no async functionality, see Fetch
return nil
}
Expand Down