Skip to content

Commit dfcf308

Browse files
authored
[cmd/opampsupervisor] Control Collectors with only the OpAMP extension (#38809)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Right now three components are required for the Supervisor to control a Collector: * nopreceiver * nopexporter * opampextension The `service.AllowNoPipelines` feature gate allows the Collector to be started without pipelines, which was released in v0.122.0. This can be used to start the Collector with only the OpAMP extension during the bootstrapping process, obviating the need for the nopreceiver and nopexporter. --------- Co-authored-by: Evan Bradley <evan-bradley@users.noreply.github.com>
1 parent 04003f1 commit dfcf308

File tree

6 files changed

+330
-70
lines changed

6 files changed

+330
-70
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow controlling Collectors that don't include the nopreceiver and nopexporer
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38809]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This requires Collectors built with Collector API v0.122.0+. The nopreceiver
20+
and nopexporter will continue to be supported for a few releases, after which
21+
only v0.122.0+ will be supported.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: []

cmd/opampsupervisor/e2e_test.go

Lines changed: 93 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -617,77 +617,109 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
617617
}
618618

619619
func TestSupervisorBootstrapsCollector(t *testing.T) {
620-
agentDescription := atomic.Value{}
620+
tests := []struct {
621+
name string
622+
cfg string
623+
env []string
624+
precheck func(t *testing.T)
625+
}{
626+
{
627+
name: "With service.AllowNoPipelines",
628+
cfg: "nocap",
629+
},
630+
{
631+
name: "Without service.AllowNoPipelines",
632+
cfg: "no_fg",
633+
env: []string{
634+
"COLLECTOR_BIN=../../bin/otelcontribcol_" + runtime.GOOS + "_" + runtime.GOARCH,
635+
},
636+
precheck: func(t *testing.T) {
637+
if runtime.GOOS == "windows" {
638+
t.Skip("This test requires a shell script, which may not be supported by Windows")
639+
}
640+
},
641+
},
642+
}
621643

622-
// Load the Supervisor config so we can get the location of
623-
// the Collector that will be run.
624-
var cfg config.Supervisor
625-
cfgFile := getSupervisorConfig(t, "nocap", map[string]string{})
626-
k := koanf.New("::")
627-
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
628-
require.NoError(t, err)
629-
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
630-
Tag: "mapstructure",
631-
})
632-
require.NoError(t, err)
644+
for _, tt := range tests {
645+
t.Run(tt.name, func(t *testing.T) {
646+
agentDescription := atomic.Value{}
633647

634-
// Get the binary name and version from the Collector binary
635-
// using the `components` command that prints a YAML-encoded
636-
// map of information about the Collector build. Some of this
637-
// information will be used as defaults for the telemetry
638-
// attributes.
639-
agentPath := cfg.Agent.Executable
640-
componentsInfo, err := exec.Command(agentPath, "components").Output()
641-
require.NoError(t, err)
642-
k = koanf.New("::")
643-
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
644-
require.NoError(t, err)
645-
buildinfo := k.StringMap("buildinfo")
646-
command := buildinfo["command"]
647-
version := buildinfo["version"]
648+
// Load the Supervisor config so we can get the location of
649+
// the Collector that will be run.
650+
var cfg config.Supervisor
651+
cfgFile := getSupervisorConfig(t, tt.cfg, map[string]string{})
652+
k := koanf.New("::")
653+
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
654+
require.NoError(t, err)
655+
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
656+
Tag: "mapstructure",
657+
})
658+
require.NoError(t, err)
648659

649-
server := newOpAMPServer(
650-
t,
651-
defaultConnectingHandler,
652-
types.ConnectionCallbacks{
653-
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
654-
if message.AgentDescription != nil {
655-
agentDescription.Store(message.AgentDescription)
656-
}
660+
// Get the binary name and version from the Collector binary
661+
// using the `components` command that prints a YAML-encoded
662+
// map of information about the Collector build. Some of this
663+
// information will be used as defaults for the telemetry
664+
// attributes.
665+
agentPath := cfg.Agent.Executable
666+
cmd := exec.Command(agentPath, "components")
667+
for _, env := range tt.env {
668+
cmd.Env = append(cmd.Env, env)
669+
}
670+
componentsInfo, err := cmd.Output()
671+
require.NoError(t, err)
672+
k = koanf.New("::")
673+
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
674+
require.NoError(t, err)
675+
buildinfo := k.StringMap("buildinfo")
676+
command := buildinfo["command"]
677+
version := buildinfo["version"]
678+
679+
server := newOpAMPServer(
680+
t,
681+
defaultConnectingHandler,
682+
types.ConnectionCallbacks{
683+
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
684+
if message.AgentDescription != nil {
685+
agentDescription.Store(message.AgentDescription)
686+
}
657687

658-
return &protobufs.ServerToAgent{}
659-
},
660-
})
688+
return &protobufs.ServerToAgent{}
689+
},
690+
})
661691

662-
s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})
692+
s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})
663693

664-
require.Nil(t, s.Start())
665-
defer s.Shutdown()
694+
require.Nil(t, s.Start())
695+
defer s.Shutdown()
666696

667-
waitForSupervisorConnection(server.supervisorConnected, true)
697+
waitForSupervisorConnection(server.supervisorConnected, true)
668698

669-
require.Eventually(t, func() bool {
670-
ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
671-
if !ok {
672-
return false
673-
}
699+
require.Eventually(t, func() bool {
700+
ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
701+
if !ok {
702+
return false
703+
}
674704

675-
var agentName, agentVersion string
676-
identAttr := ad.IdentifyingAttributes
677-
for _, attr := range identAttr {
678-
switch attr.Key {
679-
case semconv.AttributeServiceName:
680-
agentName = attr.Value.GetStringValue()
681-
case semconv.AttributeServiceVersion:
682-
agentVersion = attr.Value.GetStringValue()
683-
}
684-
}
705+
var agentName, agentVersion string
706+
identAttr := ad.IdentifyingAttributes
707+
for _, attr := range identAttr {
708+
switch attr.Key {
709+
case semconv.AttributeServiceName:
710+
agentName = attr.Value.GetStringValue()
711+
case semconv.AttributeServiceVersion:
712+
agentVersion = attr.Value.GetStringValue()
713+
}
714+
}
685715

686-
// By default the Collector should report its name and version
687-
// from the component.BuildInfo struct built into the Collector
688-
// binary.
689-
return agentName == command && agentVersion == version
690-
}, 5*time.Second, 250*time.Millisecond)
716+
// By default the Collector should report its name and version
717+
// from the component.BuildInfo struct built into the Collector
718+
// binary.
719+
return agentName == command && agentVersion == version
720+
}, 5*time.Second, 250*time.Millisecond)
721+
})
722+
}
691723
}
692724

693725
func TestSupervisorBootstrapsCollectorAvailableComponents(t *testing.T) {

cmd/opampsupervisor/supervisor/commander/commander.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,108 @@ func (c *Commander) watch() {
201201
c.exitCh <- struct{}{}
202202
}
203203

204+
// StartOneShot starts the Collector with the expectation that it will immediately
205+
// exit after it finishes a quick operation. This is useful for situations like reading stdout/sterr
206+
// to e.g. check the feature gate the Collector supports.
207+
func (c *Commander) StartOneShot() ([]byte, []byte, error) {
208+
stdout := []byte{}
209+
stderr := []byte{}
210+
ctx := context.Background()
211+
212+
cmd := exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204
213+
cmd.Env = common.EnvVarMapToEnvMapSlice(c.cfg.Env)
214+
cmd.SysProcAttr = sysProcAttrs()
215+
// grab cmd pipes
216+
stdoutPipe, err := cmd.StdoutPipe()
217+
if err != nil {
218+
return nil, nil, fmt.Errorf("stdoutPipe: %w", err)
219+
}
220+
stderrPipe, err := cmd.StderrPipe()
221+
if err != nil {
222+
return nil, nil, fmt.Errorf("stderrPipe: %w", err)
223+
}
224+
225+
// start agent
226+
if err := cmd.Start(); err != nil {
227+
return nil, nil, fmt.Errorf("start: %w", err)
228+
}
229+
// capture agent output
230+
go func() {
231+
scanner := bufio.NewScanner(stdoutPipe)
232+
for scanner.Scan() {
233+
stdout = append(stdout, scanner.Bytes()...)
234+
stdout = append(stdout, byte('\n'))
235+
}
236+
if err := scanner.Err(); err != nil {
237+
c.logger.Error("Error reading agent stdout: %w", zap.Error(err))
238+
}
239+
}()
240+
go func() {
241+
scanner := bufio.NewScanner(stderrPipe)
242+
for scanner.Scan() {
243+
stderr = append(stderr, scanner.Bytes()...)
244+
stderr = append(stderr, byte('\n'))
245+
}
246+
if err := scanner.Err(); err != nil {
247+
c.logger.Error("Error reading agent stderr: %w", zap.Error(err))
248+
}
249+
}()
250+
251+
c.logger.Debug("Agent process started", zap.Int("pid", cmd.Process.Pid))
252+
253+
doneCh := make(chan struct{}, 1)
254+
255+
go func() {
256+
err := cmd.Wait()
257+
if err != nil {
258+
c.logger.Error("One-shot Collector encountered an error during execution", zap.Error(err))
259+
}
260+
doneCh <- struct{}{}
261+
}()
262+
263+
waitCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
264+
265+
defer cancel()
266+
267+
select {
268+
case <-doneCh:
269+
case <-waitCtx.Done():
270+
pid := cmd.Process.Pid
271+
c.logger.Debug("Stopping agent process", zap.Int("pid", pid))
272+
273+
// Gracefully signal process to stop.
274+
if err := sendShutdownSignal(cmd.Process); err != nil {
275+
return nil, nil, err
276+
}
277+
278+
innerWaitCtx, innerCancel := context.WithTimeout(ctx, 10*time.Second)
279+
280+
// Setup a goroutine to wait a while for process to finish and send kill signal
281+
// to the process if it doesn't finish.
282+
var innerErr error
283+
go func() {
284+
<-innerWaitCtx.Done()
285+
286+
if !errors.Is(innerWaitCtx.Err(), context.DeadlineExceeded) {
287+
c.logger.Debug("Agent process successfully stopped.", zap.Int("pid", pid))
288+
return
289+
}
290+
291+
// Time is out. Kill the process.
292+
c.logger.Debug(
293+
"Agent process is not responding to SIGTERM. Sending SIGKILL to kill forcibly.",
294+
zap.Int("pid", pid))
295+
if innerErr = cmd.Process.Signal(os.Kill); innerErr != nil {
296+
return
297+
}
298+
}()
299+
300+
innerCancel()
301+
}
302+
303+
return stdout, stderr, nil
304+
}
305+
204306
// Exited returns a channel that will send a signal when the Agent process exits.
205307
func (c *Commander) Exited() <-chan struct{} {
206308
return c.exitCh

0 commit comments

Comments
 (0)