@@ -6,6 +6,8 @@ package fleet
66
77import (
88"context"
9+ stderrors "errors"
10+ "sync"
911"time"
1012
1113"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
@@ -15,6 +17,7 @@ import (
1517eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
1618"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1719"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
20+ "github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
1821"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
1922"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
2023"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
@@ -83,8 +86,8 @@ type FleetGateway struct {
8386acker acker.Acker
8487unauthCounter int
8588checkinFailCounter int
86- stateFetcher func () coordinator.State
8789stateStore stateStore
90+ stateFetcher StateFetcher
8891errCh chan error
8992actionCh chan []fleetapi.Action
9093}
@@ -95,19 +98,22 @@ func New(
9598agentInfo agentInfo ,
9699client client.Sender ,
97100acker acker.Acker ,
98- stateFetcher func () coordinator.State ,
99101stateStore stateStore ,
102+ stateFetcher StateFetcher ,
103+ cfg configuration.FleetCheckin ,
100104) (* FleetGateway , error ) {
101105scheduler := scheduler .NewPeriodicJitter (defaultGatewaySettings .Duration , defaultGatewaySettings .Jitter )
106+ st := defaultGatewaySettings
107+ st .Backoff = getBackoffSettings (cfg )
102108return newFleetGatewayWithScheduler (
103109log ,
104- defaultGatewaySettings ,
110+ st ,
105111agentInfo ,
106112client ,
107113scheduler ,
108114acker ,
109- stateFetcher ,
110115stateStore ,
116+ stateFetcher ,
111117)
112118}
113119
@@ -118,8 +124,8 @@ func newFleetGatewayWithScheduler(
118124client client.Sender ,
119125scheduler scheduler.Scheduler ,
120126acker acker.Acker ,
121- stateFetcher func () coordinator.State ,
122127stateStore stateStore ,
128+ stateFetcher StateFetcher ,
123129) (* FleetGateway , error ) {
124130return & FleetGateway {
125131log : log ,
@@ -144,7 +150,6 @@ func (f *FleetGateway) Run(ctx context.Context) error {
144150if f .settings .Backoff == nil {
145151requestBackoff = RequestBackoff (ctx .Done ())
146152} else {
147- // this is only used in tests
148153requestBackoff = backoff .NewEqualJitterBackoff (
149154ctx .Done (),
150155f .settings .Backoff .Init ,
@@ -193,11 +198,20 @@ func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
193198f .log .Debugf ("Checking started" )
194199resp , took , err := f .execute (ctx )
195200if err != nil {
196- f .checkinFailCounter ++
201+ becauseOfStateChanged := errors .Is (err , errComponentStateChanged )
202+
203+ // don't count that as failed attempt
204+ if ! becauseOfStateChanged {
205+ f .checkinFailCounter ++
206+ }
197207
208+ warnMsg := "Possible transient error during checkin with fleet-server, retrying"
209+ if becauseOfStateChanged {
210+ warnMsg = "Check in cancelled because of state change, retrying"
211+ }
198212// Report the first two failures at warn level as they may be recoverable with retries.
199213if f .checkinFailCounter <= 2 {
200- f .log .Warnw ("Possible transient error during checkin with fleet-server, retrying" ,
214+ f .log .Warnw (warnMsg ,
201215"error.message" , err , "request_duration_ns" , took , "failed_checkins" , f .checkinFailCounter ,
202216"retry_after_ns" , bo .NextWait ())
203217} else {
@@ -348,7 +362,7 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
348362}
349363
350364// get current state
351- state := f .stateFetcher ( )
365+ state , stateCtx := f .stateFetcher . FetchState ( ctx )
352366
353367// convert components into checkin components structure
354368components := f .convertToCheckinComponents (state .Components , state .Collector )
@@ -374,7 +388,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
374388PolicyRevisionIDX : policyRevisionIDX ,
375389}
376390
377- resp , took , err := cmd .Execute (ctx , req )
391+ resp , took , err := cmd .Execute (stateCtx , req )
392+ f .stateFetcher .Done ()
378393if isUnauth (err ) {
379394f .unauthCounter ++
380395if f .shouldUseLongSched () {
@@ -390,6 +405,9 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
390405
391406f .unauthCounter = 0
392407if err != nil {
408+ if errors .Is (err , context .Canceled ) && errors .Is (context .Cause (stateCtx ), errComponentStateChanged ) {
409+ return nil , took , stderrors .Join (err , errComponentStateChanged )
410+ }
393411return nil , took , err
394412}
395413
@@ -494,3 +512,118 @@ func getPolicyRevisionIDX(action fleetapi.Action) int64 {
494512return 0
495513}
496514}
515+
516+ var errComponentStateChanged = errors .New ("error component state changed" )
517+
518+ type StateFetcher interface {
519+ // FetchState returns the current state and a context that is valid as long as the returned state is valid to use.
520+ FetchState (ctx context.Context ) (coordinator.State , context.Context )
521+ // Done should be called once the checkin call is complete.
522+ Done ()
523+ StartStateWatch (ctx context.Context ) error
524+ }
525+
526+ type FastCheckinStateFetcher struct {
527+ log * logger.Logger
528+ fetcher func () coordinator.State
529+ stateChan chan coordinator.State
530+
531+ cancel context.CancelCauseFunc
532+ mutex sync.Mutex
533+ }
534+
535+ func NewFastCheckinStateFetcher (log * logger.Logger , fetcher func () coordinator.State , stateChan chan coordinator.State ) * FastCheckinStateFetcher {
536+ return & FastCheckinStateFetcher {
537+ log : log ,
538+ fetcher : fetcher ,
539+ stateChan : stateChan ,
540+ cancel : nil ,
541+ mutex : sync.Mutex {},
542+ }
543+ }
544+
545+ // Fetch wraps the state fetching to send in the check-in request under the checkin state mutex.
546+ // After the state is fetched the checkin cancellation function has be initialized and the new context
547+ // is returned.
548+ func (s * FastCheckinStateFetcher ) FetchState (ctx context.Context ) (coordinator.State , context.Context ) {
549+ s .mutex .Lock ()
550+ defer s .mutex .Unlock ()
551+
552+ if s .cancel != nil {
553+ s .cancel (nil ) // ensure ctx cleanup
554+ }
555+
556+ ctx2 , ctxCancel := context .WithCancelCause (ctx )
557+ state := s .fetcher ()
558+ s .cancel = ctxCancel
559+ return state , ctx2
560+ }
561+
562+ func (s * FastCheckinStateFetcher ) Done () {
563+ s .mutex .Lock ()
564+ defer s .mutex .Unlock ()
565+
566+ if s .cancel != nil {
567+ s .cancel (nil ) // ensure ctx cleanup
568+ s .cancel = nil
569+ }
570+ }
571+
572+ func (s * FastCheckinStateFetcher ) invalidateState () {
573+ s .mutex .Lock ()
574+ defer s .mutex .Unlock ()
575+
576+ if s .cancel != nil {
577+ s .cancel (errComponentStateChanged )
578+ s .cancel = nil
579+ }
580+ }
581+
582+ func (s * FastCheckinStateFetcher ) StartStateWatch (ctx context.Context ) error {
583+ s .log .Info ("FleetGateway state watching started" )
584+ for {
585+ select {
586+ case <- ctx .Done ():
587+ s .log .Info ("FleetGateway state watching stopped" )
588+ return ctx .Err ()
589+ case _ , isOpen := <- s .stateChan :
590+ if ! isOpen {
591+ s .log .Info ("FleetGateway state watching channel closed, stopping loop." )
592+ return nil
593+ }
594+ // TODO: consider check for specific changes e.g. degraded?
595+ s .invalidateState ()
596+ }
597+ }
598+ }
599+
600+ // CheckinStateFetcher implements the simple state fetching without any invalidation or fast checkin logic.
601+ type CheckinStateFetcher struct {
602+ fetcher func () coordinator.State
603+ }
604+
605+ func NewCheckinStateFetcher (fetcher func () coordinator.State ) * CheckinStateFetcher {
606+ return & CheckinStateFetcher {fetcher : fetcher }
607+ }
608+
609+ // FetchState returns the current state and the given ctx because the current state is always valid to use.
610+ func (s * CheckinStateFetcher ) FetchState (ctx context.Context ) (coordinator.State , context.Context ) {
611+ state := s .fetcher ()
612+ return state , ctx
613+ }
614+
615+ func (s * CheckinStateFetcher ) Done () {}
616+ func (s * CheckinStateFetcher ) StartStateWatch (ctx context.Context ) error { return nil }
617+
618+ func getBackoffSettings (cfg configuration.FleetCheckin ) * backoffSettings {
619+ bo := defaultFleetBackoffSettings
620+
621+ if cfg .RequestBackoffInit > 0 {
622+ bo .Init = cfg .RequestBackoffInit
623+ }
624+ if cfg .RequestBackoffMax > 0 {
625+ bo .Max = cfg .RequestBackoffMax
626+ }
627+
628+ return & bo
629+ }
0 commit comments