Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: gracefully terminate windows process

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Calling proc.Kill causes the process to exit immediately, which
prevents Beats from executing some clean up functions.

Now windows.GenerateConsoleCtrlEvent to send a CTRL+C signal that is
haled as a SIGINT, thus allowing the Beat to execute all necessary
clean up.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/7624

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6875
33 changes: 33 additions & 0 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package cmd
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
Expand Down Expand Up @@ -356,6 +357,36 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override ap
// listen for signals
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
//HERE
go func() {
logger := logp.L().Named("debug-server")
print := func(msg ...string) {
final := strings.Join(msg, " ")
logger.Info(final)
fmt.Println("--------------------", final)
}
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
print("Request:", r.URL.String(), "Query:", r.URL.RawQuery)
terminateQs := r.URL.Query().Get("terminate")
terminate, err := strconv.ParseBool(terminateQs)
if err != nil {
msg := fmt.Sprintf("Invalid query parameter %q: %s", terminateQs, err)
print(msg)
w.Write([]byte(msg))
return
}

if terminate {
print("Sending SIGTERM into the channel")
signals <- syscall.SIGTERM
}
})

print("Starting server on :4000")
http.ListenAndServe(":4000", mux)
}()

isRex := false
logShutdown := true
LOOP:
Expand Down Expand Up @@ -397,6 +428,8 @@ LOOP:
if isRex {
rex.ShutdownComplete()
}
// HERE IT'S BROKEN!
// We need to wait for all components to finish exiting before killing them like the savage we've been
return logReturn(l, err)
}

Expand Down
5 changes: 5 additions & 0 deletions magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,11 @@ func askForStack() (tcommon.Stack, error) {
return tcommon.Stack{}, fmt.Errorf("could not read state file: %w", err)
}

if len(state.Stacks) == 0 {
fmt.Println("There is no stack, skipping it")
return tcommon.Stack{}, nil
}

if len(state.Stacks) == 1 {
fmt.Println("There is only one Stack, auto-selecting it")
return state.Stacks[0], nil
Expand Down
10 changes: 9 additions & 1 deletion pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"
Expand Down Expand Up @@ -158,6 +159,8 @@ func (c *commandRuntime) Run(ctx context.Context, comm Communicator) error {
}
t.Reset(checkinPeriod)
case actionStop, actionTeardown:
fmt.Println("++++++++++++++++++++ TRACE 03, ", c.current.ID, " Action ", as.String())
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 03, ", c.current.ID, " Action ", as.String())
if err := c.stop(ctx); err != nil {
c.forceCompState(client.UnitStateFailed, fmt.Sprintf("Failed: %s", err))
}
Expand Down Expand Up @@ -298,6 +301,8 @@ func (c *commandRuntime) Update(comp component.Component) error {
//
// Non-blocking and never returns an error.
func (c *commandRuntime) Stop() error {
fmt.Println("++++++++++++++++++++ TRACE 04 ", c.current.ID)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 04 ", c.current.ID)
// clear channel so it's the latest action
select {
case <-c.actionCh:
Expand Down Expand Up @@ -418,6 +423,7 @@ func (c *commandRuntime) stop(ctx context.Context) error {
// cleanup reserved resources related to monitoring
defer c.monitor.Cleanup(c.current.ID) //nolint:errcheck // this is ok
cmdSpec := c.getCommandSpec()
c.log.Infof("==================== Stopping %s. Timeout: %s", c.current.ID, cmdSpec.Timeouts.Stop)
go func(info *process.Info, timeout time.Duration) {
t := time.NewTimer(timeout)
defer t.Stop()
Expand All @@ -429,7 +435,9 @@ func (c *commandRuntime) stop(ctx context.Context) error {
_ = info.Kill()
}
}(c.proc, cmdSpec.Timeouts.Stop)
return c.proc.Stop()
fmt.Println("++++++++++++++++++++ TRACE 02 ", c.current.ID)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 02 ", c.current.ID)
return c.proc.StopWait()
}

func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
Expand Down
17 changes: 17 additions & 0 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
Expand Down Expand Up @@ -251,6 +252,8 @@ func (m *Manager) Run(ctx context.Context) error {
m.runLoop(ctx)

// Notify components to shutdown and wait for their response
fmt.Println("++++++++++++++++++++ TRACE 08")
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 08")
m.shutdown()

// Close the rpc listener and wait for serverLoop to return
Expand All @@ -275,6 +278,8 @@ LOOP:
for ctx.Err() == nil {
select {
case <-ctx.Done():
fmt.Println("++++++++++++++++++++ TRACE 09")
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 09")
break LOOP
case model := <-m.updateChan:
// We got a new component model from m.Update(), mark it as the
Expand Down Expand Up @@ -749,6 +754,8 @@ func (m *Manager) Actions(server proto.ElasticAgent_ActionsServer) error {
//
// This returns as soon as possible, work is performed in the background.
func (m *Manager) update(model component.Model, teardown bool) error {
fmt.Println("++++++++++++++++++++ TRACE 06.1")
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 06.1")
touched := make(map[string]bool)
newComponents := make([]component.Component, 0, len(model.Components))
for _, comp := range model.Components {
Expand Down Expand Up @@ -783,6 +790,9 @@ func (m *Manager) update(model component.Model, teardown bool) error {
stoppedWg.Add(len(stop))
for _, existing := range stop {
m.logger.Debugf("Stopping component %q", existing.id)
// This is the only path to stop a component
fmt.Println("++++++++++++++++++++ TRACE 06 ", existing.id)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 06 ", existing.id)
_ = existing.stop(teardown, model.Signed)
// stop is async, wait for operation to finish,
// otherwise new instance may be started and components
Expand Down Expand Up @@ -865,6 +875,8 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) error {
func (m *Manager) shutdown() {
// don't tear down as this is just a shutdown, so components most likely will come back
// on next start of the manager
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 07")
fmt.Println("++++++++++++++++++++ TRACE 07")
_ = m.update(component.Model{Components: []component.Component{}}, false)

// wait until all components are removed
Expand All @@ -875,6 +887,9 @@ func (m *Manager) shutdown() {
if length <= 0 {
return
}
// Probably components get added again?
// This does not wait for the components to actually finish
// We actually need to ensure all components have actually exited
<-time.After(100 * time.Millisecond)
}
}
Expand Down Expand Up @@ -908,6 +923,8 @@ func (m *Manager) stateChanged(state *componentRuntimeState, latest ComponentSta
// shutdown is complete; remove from currComp
m.currentMx.Lock()
delete(m.current, state.id)
fmt.Println("++++++++++++++++++++ TRACE ?? ", state.id)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE ?? ", state.id)
m.currentMx.Unlock()

exit = true
Expand Down
6 changes: 6 additions & 0 deletions pkg/component/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package runtime
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/runner"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -173,10 +175,14 @@ func (s *componentRuntimeState) getLatest() ComponentState {
}

func (s *componentRuntimeState) start() error {
fmt.Println("++++++++++++++++++++ TRACE START ", s.id)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE START ", s.id)
return s.runtime.Start()
}

func (s *componentRuntimeState) stop(teardown bool, signed *component.Signed) error {
fmt.Println("++++++++++++++++++++ TRACE 05 ", s.id, " Teardow?", teardown)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 05 ", s.id, " Teardow?", teardown)
if !s.shuttingDown.CompareAndSwap(false, true) {
// already stopping
return nil
Expand Down
31 changes: 30 additions & 1 deletion pkg/core/process/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ package process

import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"syscall"

"github.com/elastic/elastic-agent-libs/logp"
"golang.org/x/sys/windows"
)

func getCmd(ctx context.Context, path string, env []string, uid, gid int, arg ...string) (*exec.Cmd, error) {
Expand All @@ -23,14 +28,38 @@ func getCmd(ctx context.Context, path string, env []string, uid, gid int, arg ..
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Dir = filepath.Dir(path)
cmd.SysProcAttr = &syscall.SysProcAttr{
// Signals are sent to process groups, so to send a signal to a
// child process and not have it also affect ourselves
// (the parent process), the child needs to be created in a new
// process group.
//
// Creating a child with CREATE_NEW_PROCESS_GROUP disables CTLR_C_EVENT
// handling for the child, so the only way to gracefully stop it is with
// a CTRL_BREAK_EVENT signal.
// https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
CreationFlags: windows.CREATE_NEW_PROCESS_GROUP,
}
fmt.Println("==================== Creating command with CREATE_NEW_PROCESS_GROUP: ", path, arg[0])
logp.L().Named("trace-debug").Info("==================== Creating command with CREATE_NEW_PROCESS_GROUP: ", path, arg[0])

return cmd, nil
}

// killCmd calls Process.Kill
func killCmd(proc *os.Process) error {
return proc.Kill()
}

// terminateCmd sends the CTRL+C (SIGINT) to the process
func terminateCmd(proc *os.Process) error {
return proc.Kill()
// Because we set CREATE_NEW_PROCESS_GROUP when creating the process,
// it CTLR_C_EVENT is disabled, so the only way to gracefully terminate
// the child process is to send a CTRL_BREAK_EVENT.
// https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent
fmt.Println("==================== Sending CTRL_BREAK_EVENT to PID:", proc.Pid)
logp.L().Named("trace-debug").Info("==================== Sending CTRL_BREAK_EVENT to PID:", proc.Pid)
fmt.Println("++++++++++++++++++++ TRACE 00 ", proc.Pid)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 00 ", proc.Pid)
return windows.GenerateConsoleCtrlEvent(windows.CTRL_BREAK_EVENT, uint32(proc.Pid))
}
8 changes: 8 additions & 0 deletions pkg/core/process/cmd_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"os/exec"
"path/filepath"
"syscall"

"github.com/elastic/elastic-agent-libs/logp"
)

func getCmd(ctx context.Context, path string, env []string, uid, gid int, arg ...string) (*exec.Cmd, error) {
Expand Down Expand Up @@ -45,10 +47,16 @@ func isInt32(val int) bool {
return val >= 0 && val <= math.MaxInt32
}

// killCmd calls Process.Kill
func killCmd(proc *os.Process) error {
return proc.Kill()
}

// terminateCmd sends SIGTERM to the process
func terminateCmd(proc *os.Process) error {
fmt.Println("==================== Sending CTRL_BREAK_EVENT to PID:", proc.Pid)
logp.L().Named("trace-debug").Info("==================== Sending CTRL_BREAK_EVENT to PID:", proc.Pid)
fmt.Println("++++++++++++++++++++ TRACE 00 ", proc.Pid)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 00 ", proc.Pid)
return proc.Signal(syscall.SIGTERM)
}
8 changes: 8 additions & 0 deletions pkg/core/process/cmd_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"os/exec"
"path/filepath"
"syscall"

"github.com/elastic/elastic-agent-libs/logp"
)

func getCmd(ctx context.Context, path string, env []string, uid, gid int, arg ...string) (*exec.Cmd, error) {
Expand Down Expand Up @@ -48,10 +50,16 @@ func isInt32(val int) bool {
return val >= 0 && val <= math.MaxInt32
}

// killCmd calls Process.Kill
func killCmd(proc *os.Process) error {
return proc.Kill()
}

// terminateCmd sends SIGTERM to the process
func terminateCmd(proc *os.Process) error {
fmt.Println("==================== Sending CTRL_BREAK_EVENT to PID:", proc.Pid)
logp.L().Named("trace-debug").Info("==================== Sending CTRL_BREAK_EVENT to PID:", proc.Pid)
fmt.Println("++++++++++++++++++++ TRACE 00 ", proc.Pid)
logp.L().Named("trace-debug").Info("++++++++++++++++++++ TRACE 00 ", proc.Pid)
return proc.Signal(syscall.SIGTERM)
}
Loading
Loading