Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: bug-fix
summary: Resolve deadlocks in runtime checkin communication
component: elastic-agent
pr: https://github.com/elastic/elastic-agent/pull/8881
issue: https://github.com/elastic/elastic-agent/issues/7944
222 changes: 123 additions & 99 deletions pkg/component/runtime/runtime_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Communicator provides an interface for a runtime to communicate with its running component.
type Communicator interface {
// WriteConnInfo writes the connection information to the writer, informing the component it has access
// WriteStartUpInfo writes the connection information to the writer, informing the component it has access
// to the provided services.
WriteStartUpInfo(w io.Writer, services ...client.Service) error
// CheckinExpected sends the expected state to the component.
Expand Down Expand Up @@ -63,6 +63,7 @@ type runtimeComm struct {
initCheckinObserved *proto.CheckinObserved
initCheckinExpectedCh chan *proto.CheckinExpected
initCheckinObservedMx sync.Mutex
runtimeCheckinDone chan struct{}

actionsConn bool
actionsDone chan bool
Expand All @@ -85,21 +86,22 @@ func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.Cert
return nil, err
}
return &runtimeComm{
logger: logger,
listenAddr: listenAddr,
ca: ca,
agentInfo: agentInfo,
name: name,
token: token.String(),
cert: pair,
maxMessageSize: maxMessageSize,
chunkingAllowed: false, // not allow until the client says they support it
checkinConn: true,
checkinExpected: make(chan *proto.CheckinExpected, 1),
checkinObserved: make(chan *proto.CheckinObserved),
actionsConn: true,
actionsRequest: make(chan *proto.ActionRequest),
actionsResponse: make(chan *proto.ActionResponse),
logger: logger,
listenAddr: listenAddr,
ca: ca,
agentInfo: agentInfo,
name: name,
token: token.String(),
cert: pair,
maxMessageSize: maxMessageSize,
chunkingAllowed: false, // not allow until the client says they support it
checkinConn: true,
initCheckinExpectedCh: make(chan *proto.CheckinExpected),
checkinExpected: make(chan *proto.CheckinExpected, 1),
checkinObserved: make(chan *proto.CheckinObserved),
actionsConn: true,
actionsRequest: make(chan *proto.ActionRequest),
actionsResponse: make(chan *proto.ActionResponse),
}, nil
}

Expand Down Expand Up @@ -128,7 +130,7 @@ func (c *runtimeComm) WriteStartUpInfo(w io.Writer, services ...client.Service)
Services: srvs,
// chunking is always allowed if the client supports it
Supports: []proto.ConnectionSupports{proto.ConnectionSupports_CheckinChunking},
MaxMessageSize: uint32(c.maxMessageSize),
MaxMessageSize: uint32(c.maxMessageSize), //nolint:gosec // G115 Conversion from int to uint32 is safe here.
AgentInfo: &proto.AgentInfo{
Id: c.agentInfo.AgentID(),
Version: c.agentInfo.Version(),
Expand All @@ -152,7 +154,6 @@ func (c *runtimeComm) CheckinExpected(
expected *proto.CheckinExpected,
observed *proto.CheckinObserved,
) {
c.agentInfo.Unprivileged()
if c.agentInfo != nil && c.agentInfo.AgentID() != "" {
expected.AgentInfo = &proto.AgentInfo{
Id: c.agentInfo.AgentID(),
Expand All @@ -165,35 +166,49 @@ func (c *runtimeComm) CheckinExpected(
expected.AgentInfo = nil
}

// we need to determine if the communicator is currently in the initial observed message path
// in the case that it is we send the expected state over a different channel
// we need to determine if the communicator is currently waiting to complete the initial checkin process
// and if the given observed message is the same as the one the communicator is waiting for
c.initCheckinObservedMx.Lock()
initObserved := c.initCheckinObserved
expectedCh := c.initCheckinExpectedCh
if initObserved != nil {
// the next call to `CheckinExpected` must be from the initial `CheckinObserved` message
if observed != initObserved {
// not the initial observed message; we don't send it
c.initCheckinObservedMx.Unlock()
return
}
// it is the expected from the initial observed message
// clear the initial state
// waitingForInitCheckin captures if communicator is waiting to complete the initial checkin process
waitingForInitCheckin := c.initCheckinObserved != nil
// shouldIgnore captures if we should ignore this checkin expected taking into account waitingForInitCheckin
shouldIgnore := waitingForInitCheckin && c.initCheckinObserved != observed
if waitingForInitCheckin && !shouldIgnore {
// here the given observed message is the same as the one the communicator is waiting for to complete
// the initial checkin process, thus clear the state
c.initCheckinObserved = nil
c.initCheckinExpectedCh = nil
c.initCheckinObservedMx.Unlock()
expectedCh <- expected
return
}
// when doneCh is closed the communicator is done and sending to any of the checkin expected channels,
// namely init (c.initCheckinExpectedCh) and regular (c.checkinExpected), should unblock
doneCh := c.runtimeCheckinDone
c.initCheckinObservedMx.Unlock()

// not in the initial observed message path; send it over the standard channel
// clear channel making it the latest expected message
select {
case <-c.checkinExpected:
default:
if shouldIgnore {
// ignore this checkin expected
return
}

if waitingForInitCheckin {
// send to the init checkin expected channel
// no draining here as we don't want to lose any message
select {
case <-doneCh:
case c.initCheckinExpectedCh <- expected:
}
} else {
// send to the regular checkin expected channel and drain it
// as we care only about the last message being sent
select {
case <-c.checkinExpected:
default:
}

select {
case <-doneCh:
// this isn't exactly required but better safe than SDH
case c.checkinExpected <- expected:
}
}
c.checkinExpected <- expected
}

func (c *runtimeComm) CheckinObserved() <-chan *proto.CheckinObserved {
Expand Down Expand Up @@ -225,85 +240,94 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p
c.checkinLock.Unlock()
}()

initExp := make(chan *proto.CheckinExpected)
recvDone := make(chan bool)
sendDone := make(chan bool)
go func() {
defer func() {
close(sendDone)
}()

// initial startup waits for the first expected message from the dedicated initExp channel
select {
case <-checkinDone:
return
case <-recvDone:
return
case expected := <-initExp:
err := sendExpectedChunked(server, expected, c.chunkingAllowed, c.maxMessageSize)
if err != nil {
if reportableErr(err) {
c.logger.Debugf("check-in stream failed to send initial expected state: %s", err)
}
return
}
}

for {
var expected *proto.CheckinExpected
select {
case <-checkinDone:
return
case <-recvDone:
return
case expected = <-c.checkinExpected:
}

err := sendExpectedChunked(server, expected, c.chunkingAllowed, c.maxMessageSize)
if err != nil {
if reportableErr(err) {
c.logger.Debugf("check-in stream failed to send expected state: %s", err)
}
return
}
}
}()

// at this point the client is connected, and it has sent it's first initial checkin
// the initial expected message must come before the sender goroutine will send any other
// expected messages. `CheckinExpected` method will also drop any expected messages that do not
// match the observed message to ensure that the expected that we receive is from the initial
// observed state.
c.initCheckinObservedMx.Lock()
c.initCheckinObserved = init
c.initCheckinExpectedCh = initExp
// clears the latest queued expected message
select {
case <-c.checkinExpected:
default:
}
c.initCheckinObserved = init
runtimeCheckinDone := make(chan struct{})
c.runtimeCheckinDone = runtimeCheckinDone
c.initCheckinObservedMx.Unlock()
defer func(ch chan struct{}) {
close(ch)
}(runtimeCheckinDone)

// send the initial message (manager then calls `CheckinExpected` method with the result)
c.checkinObserved <- init
// send the initial observed message, so the respective runtime (e.g. commandRuntime, serviceRuntime, etc. )
// then calls CheckinExpected method with the result
select {
case <-checkinDone:
// runtimeComm is destroyed return
return status.Error(codes.Unavailable, "component is being destroyed")
case c.checkinObserved <- init:
}

recvDone := make(chan bool)
go func() {
// this goroutine will not be leaked, because when the server CheckinV2 function
// returns (lives inside the manager) it will close the connection.
// That will cause the chunk.RecvObserved function to return with an error and thus
// this goroutine will exit. Another reason that this goroutine could exit for
// is if the checkinDone channel is closed which happens when the runtimeComm is
// destroyed (when the runtime.Run() exits).
defer func() {
close(recvDone)
}()

for {
// always allow a chunked observed message to be received
checkin, err := chunk.RecvObserved(server)
if err != nil {
if reportableErr(err) {
c.logger.Debugf("check-in stream failed to receive data: %s", err)
}
close(recvDone)
return
}
c.checkinObserved <- checkin
select {
case <-checkinDone:
// runtimeComm is destroyed return
return
case c.checkinObserved <- checkin:
}
}
}()

<-sendDone
return nil
initCheckinCompleted := false
var afterInitCheckinExpectedCh chan *proto.CheckinExpected
for {
var expected *proto.CheckinExpected
select {
case <-checkinDone:
// runtimeComm is destroyed return
return status.Error(codes.Unavailable, "component is being destroyed")
case <-recvDone:
// the goroutine that receives observed messages has exited we can't continue
// This acts also as a proxy to the server.Context().Done() method which
// will be closed when the server is closed.
return status.Error(codes.Unavailable, "component is being destroyed")
case expected = <-c.initCheckinExpectedCh:
// unbuffered channel to receive the first expected state
if !initCheckinCompleted {
initCheckinCompleted = true
afterInitCheckinExpectedCh = c.checkinExpected
} else {
// this shouldn't occur, but better safe than SDH
c.logger.Warn("check-in stream received unexpected init expected state, ignoring...")
continue
}
case expected = <-afterInitCheckinExpectedCh:
}

err := sendExpectedChunked(server, expected, c.chunkingAllowed, c.maxMessageSize)
if err != nil {
c.logger.Debugf("check-in stream failed to send expected state: %s", err)
if reportableErr(err) {
return err
}
return nil
}
}
}

func (c *runtimeComm) actions(server proto.ElasticAgent_ActionsServer) error {
Expand Down Expand Up @@ -419,7 +443,7 @@ func genServerName() (string, error) {
if err != nil {
return "", err
}
return strings.Replace(u.String(), "-", "", -1), nil
return strings.ReplaceAll(u.String(), "-", ""), nil
}

func sendExpectedChunked(server proto.ElasticAgent_CheckinV2Server, msg *proto.CheckinExpected, chunkingAllowed bool, maxSize int) error {
Expand Down
Loading