@@ -27,7 +27,7 @@ import (
2727
2828// Communicator provides an interface for a runtime to communicate with its running component.
2929type Communicator interface {
30- // WriteConnInfo writes the connection information to the writer, informing the component it has access
30+ // WriteStartUpInfo writes the connection information to the writer, informing the component it has access
3131// to the provided services.
3232WriteStartUpInfo (w io.Writer , services ... client.Service ) error
3333// CheckinExpected sends the expected state to the component.
@@ -63,6 +63,7 @@ type runtimeComm struct {
6363initCheckinObserved * proto.CheckinObserved
6464initCheckinExpectedCh chan * proto.CheckinExpected
6565initCheckinObservedMx sync.Mutex
66+ runtimeCheckinDone chan struct {}
6667
6768actionsConn bool
6869actionsDone chan bool
@@ -85,21 +86,22 @@ func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.Cert
8586return nil , err
8687}
8788return & runtimeComm {
88- logger : logger ,
89- listenAddr : listenAddr ,
90- ca : ca ,
91- agentInfo : agentInfo ,
92- name : name ,
93- token : token .String (),
94- cert : pair ,
95- maxMessageSize : maxMessageSize ,
96- chunkingAllowed : false , // not allow until the client says they support it
97- checkinConn : true ,
98- checkinExpected : make (chan * proto.CheckinExpected , 1 ),
99- checkinObserved : make (chan * proto.CheckinObserved ),
100- actionsConn : true ,
101- actionsRequest : make (chan * proto.ActionRequest ),
102- actionsResponse : make (chan * proto.ActionResponse ),
89+ logger : logger ,
90+ listenAddr : listenAddr ,
91+ ca : ca ,
92+ agentInfo : agentInfo ,
93+ name : name ,
94+ token : token .String (),
95+ cert : pair ,
96+ maxMessageSize : maxMessageSize ,
97+ chunkingAllowed : false , // not allow until the client says they support it
98+ checkinConn : true ,
99+ initCheckinExpectedCh : make (chan * proto.CheckinExpected ),
100+ checkinExpected : make (chan * proto.CheckinExpected , 1 ),
101+ checkinObserved : make (chan * proto.CheckinObserved ),
102+ actionsConn : true ,
103+ actionsRequest : make (chan * proto.ActionRequest ),
104+ actionsResponse : make (chan * proto.ActionResponse ),
103105}, nil
104106}
105107
@@ -152,7 +154,6 @@ func (c *runtimeComm) CheckinExpected(
152154expected * proto.CheckinExpected ,
153155observed * proto.CheckinObserved ,
154156) {
155- c .agentInfo .Unprivileged ()
156157if c .agentInfo != nil && c .agentInfo .AgentID () != "" {
157158expected .AgentInfo = & proto.AgentInfo {
158159Id : c .agentInfo .AgentID (),
@@ -165,35 +166,49 @@ func (c *runtimeComm) CheckinExpected(
165166expected .AgentInfo = nil
166167}
167168
168- // we need to determine if the communicator is currently in the initial observed message path
169- // in the case that it is we send the expected state over a different channel
169+ // we need to determine if the communicator is currently waiting to complete the initial checkin process
170+ // and if the given observed message is the same as the one the communicator is waiting for
170171c .initCheckinObservedMx .Lock ()
171- initObserved := c .initCheckinObserved
172- expectedCh := c .initCheckinExpectedCh
173- if initObserved != nil {
174- // the next call to `CheckinExpected` must be from the initial `CheckinObserved` message
175- if observed != initObserved {
176- // not the initial observed message; we don't send it
177- c .initCheckinObservedMx .Unlock ()
178- return
179- }
180- // it is the expected from the initial observed message
181- // clear the initial state
172+ // waitingForInitCheckin captures if communicator is waiting to complete the initial checkin process
173+ waitingForInitCheckin := c .initCheckinObserved != nil
174+ // shouldIgnore captures if we should ignore this checkin expected taking into account waitingForInitCheckin
175+ shouldIgnore := waitingForInitCheckin && c .initCheckinObserved != observed
176+ if waitingForInitCheckin && ! shouldIgnore {
177+ // here the given observed message is the same as the one the communicator is waiting for to complete
178+ // the initial checkin process, thus clear the state
182179c .initCheckinObserved = nil
183- c .initCheckinExpectedCh = nil
184- c .initCheckinObservedMx .Unlock ()
185- expectedCh <- expected
186- return
187180}
181+ // when doneCh is closed the communicator is done and sending to any of the checkin expected channels,
182+ // namely init (c.initCheckinExpectedCh) and regular (c.checkinExpected), should unblock
183+ doneCh := c .runtimeCheckinDone
188184c .initCheckinObservedMx .Unlock ()
189185
190- // not in the initial observed message path; send it over the standard channel
191- // clear channel making it the latest expected message
192- select {
193- case <- c .checkinExpected :
194- default :
186+ if shouldIgnore {
187+ // ignore this checkin expected
188+ return
189+ }
190+
191+ if waitingForInitCheckin {
192+ // send to the init checkin expected channel
193+ // no draining here as we don't want to lose any message
194+ select {
195+ case <- doneCh :
196+ case c .initCheckinExpectedCh <- expected :
197+ }
198+ } else {
199+ // send to the regular checkin expected channel and drain it
200+ // as we care only about the last message being sent
201+ select {
202+ case <- c .checkinExpected :
203+ default :
204+ }
205+
206+ select {
207+ case <- doneCh :
208+ // this isn't exactly required but better safe than SDH
209+ case c .checkinExpected <- expected :
210+ }
195211}
196- c .checkinExpected <- expected
197212}
198213
199214func (c * runtimeComm ) CheckinObserved () <- chan * proto.CheckinObserved {
@@ -225,85 +240,94 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p
225240c .checkinLock .Unlock ()
226241}()
227242
228- initExp := make (chan * proto.CheckinExpected )
229- recvDone := make (chan bool )
230- sendDone := make (chan bool )
231- go func () {
232- defer func () {
233- close (sendDone )
234- }()
235-
236- // initial startup waits for the first expected message from the dedicated initExp channel
237- select {
238- case <- checkinDone :
239- return
240- case <- recvDone :
241- return
242- case expected := <- initExp :
243- err := sendExpectedChunked (server , expected , c .chunkingAllowed , c .maxMessageSize )
244- if err != nil {
245- if reportableErr (err ) {
246- c .logger .Debugf ("check-in stream failed to send initial expected state: %s" , err )
247- }
248- return
249- }
250- }
251-
252- for {
253- var expected * proto.CheckinExpected
254- select {
255- case <- checkinDone :
256- return
257- case <- recvDone :
258- return
259- case expected = <- c .checkinExpected :
260- }
261-
262- err := sendExpectedChunked (server , expected , c .chunkingAllowed , c .maxMessageSize )
263- if err != nil {
264- if reportableErr (err ) {
265- c .logger .Debugf ("check-in stream failed to send expected state: %s" , err )
266- }
267- return
268- }
269- }
270- }()
271-
272- // at this point the client is connected, and it has sent it's first initial checkin
273- // the initial expected message must come before the sender goroutine will send any other
274- // expected messages. `CheckinExpected` method will also drop any expected messages that do not
275- // match the observed message to ensure that the expected that we receive is from the initial
276- // observed state.
277243c .initCheckinObservedMx .Lock ()
278- c .initCheckinObserved = init
279- c .initCheckinExpectedCh = initExp
280244// clears the latest queued expected message
281245select {
282246case <- c .checkinExpected :
283247default :
284248}
249+ c .initCheckinObserved = init
250+ runtimeCheckinDone := make (chan struct {})
251+ c .runtimeCheckinDone = runtimeCheckinDone
285252c .initCheckinObservedMx .Unlock ()
253+ defer func (ch chan struct {}) {
254+ close (ch )
255+ }(runtimeCheckinDone )
286256
287- // send the initial message (manager then calls `CheckinExpected` method with the result)
288- c .checkinObserved <- init
257+ // send the initial observed message, so the respective runtime (e.g. commandRuntime, serviceRuntime, etc. )
258+ // then calls CheckinExpected method with the result
259+ select {
260+ case <- checkinDone :
261+ // runtimeComm is destroyed return
262+ return status .Error (codes .Unavailable , "component is being destroyed" )
263+ case c .checkinObserved <- init :
264+ }
289265
266+ recvDone := make (chan bool )
290267go func () {
268+ // this goroutine will not be leaked, because when the server CheckinV2 function
269+ // returns (lives inside the manager) it will close the connection.
270+ // That will cause the chunk.RecvObserved function to return with an error and thus
271+ // this goroutine will exit. Another reason that this goroutine could exit for
272+ // is if the checkinDone channel is closed which happens when the runtimeComm is
273+ // destroyed (when the runtime.Run() exits).
274+ defer func () {
275+ close (recvDone )
276+ }()
277+
291278for {
292279// always allow a chunked observed message to be received
293280checkin , err := chunk .RecvObserved (server )
294281if err != nil {
295282if reportableErr (err ) {
296283c .logger .Debugf ("check-in stream failed to receive data: %s" , err )
297284}
298- close (recvDone )
299285return
300286}
301- c .checkinObserved <- checkin
287+ select {
288+ case <- checkinDone :
289+ // runtimeComm is destroyed return
290+ return
291+ case c .checkinObserved <- checkin :
292+ }
302293}
303294}()
304295
305- <- sendDone
306- return nil
296+ initCheckinCompleted := false
297+ var afterInitCheckinExpectedCh chan * proto.CheckinExpected
298+ for {
299+ var expected * proto.CheckinExpected
300+ select {
301+ case <- checkinDone :
302+ // runtimeComm is destroyed return
303+ return status .Error (codes .Unavailable , "component is being destroyed" )
304+ case <- recvDone :
305+ // the goroutine that receives observed messages has exited we can't continue
306+ // This acts also as a proxy to the server.Context().Done() method which
307+ // will be closed when the server is closed.
308+ return status .Error (codes .Unavailable , "component is being destroyed" )
309+ case expected = <- c .initCheckinExpectedCh :
310+ // unbuffered channel to receive the first expected state
311+ if ! initCheckinCompleted {
312+ initCheckinCompleted = true
313+ afterInitCheckinExpectedCh = c .checkinExpected
314+ } else {
315+ // this shouldn't occur, but better safe than SDH
316+ c .logger .Warn ("check-in stream received unexpected init expected state, ignoring..." )
317+ continue
318+ }
319+ case expected = <- afterInitCheckinExpectedCh :
320+ }
321+
322+ err := sendExpectedChunked (server , expected , c .chunkingAllowed , c .maxMessageSize )
323+ if err != nil {
324+ c .logger .Debugf ("check-in stream failed to send expected state: %s" , err )
325+ if reportableErr (err ) {
326+ return err
327+ }
328+ return nil
329+ }
330+ }
307331}
308332
309333func (c * runtimeComm ) actions (server proto.ElasticAgent_ActionsServer ) error {
@@ -419,7 +443,7 @@ func genServerName() (string, error) {
419443if err != nil {
420444return "" , err
421445}
422- return strings .Replace (u .String (), "-" , "" , - 1 ), nil
446+ return strings .ReplaceAll (u .String (), "-" , "" ), nil
423447}
424448
425449func sendExpectedChunked (server proto.ElasticAgent_CheckinV2Server , msg * proto.CheckinExpected , chunkingAllowed bool , maxSize int ) error {
0 commit comments