Skip to content

Commit a4e98f6

Browse files
otel: add env variable to set agent.monitoring runtime (#11018) (#11148)
* otel: add env variables to enable beats receivers in container * remove ENABLE_BEATS_RECEIVERS, add AGENT_MONITORING_RUNTIME_EXPERIMENTAL * remove policy update from the test * changing test to use local config, read env when fetching default config * add tests for policy override * update test description * update docs (cherry picked from commit d4ec611) Co-authored-by: Mauri de Souza Meneguzzo <mauri870@gmail.com>
1 parent 06af457 commit a4e98f6

File tree

4 files changed

+327
-1
lines changed

4 files changed

+327
-1
lines changed

docs/hybrid-agent-beats-receivers.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ https://github.com/elastic/kibana/issues/233186 is implemented. Before that chan
3636
overrides API can be used to add `_runtime_experimental: "otel"` to the `agent.monitoring` section of the policy.
3737
See https://support.elastic.dev/knowledge/view/06b69893 for details on the policy overrides API.
3838

39+
For the Elastic Agent container images, the `AGENT_MONITORING_RUNTIME_EXPERIMENTAL` environment variable can be set to either `process` or `otel` to override the default runtime used for agent monitoring.
40+
3941
Executing the `elastic-agent diagnostics` command in this mode will now produce an `otel-final.yml` file showing the generated
4042
collector configuration used to run the Beat receivers.
4143

internal/pkg/agent/cmd/container.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ be used when the same credentials will be used across all the possible actions a
148148
KIBANA_CA - path to certificate authority to use with communicate with Kibana [$ELASTICSEARCH_CA]
149149
ELASTIC_AGENT_TAGS - user provided tags for the agent [linux,staging]
150150
151+
* Beats Receivers
152+
The following experimental environment variables can be set to enable using Beats Receivers.
153+
154+
AGENT_MONITORING_RUNTIME_EXPERIMENTAL - Set to either "process" or "otel" to use the respective runtime for the monitoring components.
151155
152156
* Elastic-Agent event logging
153157
If EVENTS_TO_STDERR is set to true log entries containing event data or whole raw events will be logged to stderr alongside

internal/pkg/core/monitoring/config/config.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package config
66

77
import (
8+
"os"
89
"strings"
910
"time"
1011

@@ -111,6 +112,13 @@ type BufferConfig struct {
111112

112113
// DefaultConfig creates a config with pre-set default values.
113114
func DefaultConfig() *MonitoringConfig {
115+
monRuntimeManager := DefaultRuntimeManager
116+
monRuntimeEnv := os.Getenv("AGENT_MONITORING_RUNTIME_EXPERIMENTAL")
117+
switch monRuntimeEnv {
118+
case ProcessRuntimeManager, OtelRuntimeManager:
119+
monRuntimeManager = monRuntimeEnv
120+
}
121+
114122
return &MonitoringConfig{
115123
Enabled: true,
116124
MonitorLogs: true,
@@ -125,7 +133,7 @@ func DefaultConfig() *MonitoringConfig {
125133
Namespace: defaultNamespace,
126134
APM: defaultAPMConfig(),
127135
Diagnostics: defaultDiagnostics(),
128-
RuntimeManager: DefaultRuntimeManager,
136+
RuntimeManager: monRuntimeManager,
129137
}
130138
}
131139

testing/integration/ess/container_cmd_test.go

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/stretchr/testify/require"
2828

2929
"github.com/elastic/elastic-agent-libs/kibana"
30+
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
31+
"github.com/elastic/elastic-agent/pkg/component"
3032
"github.com/elastic/elastic-agent/pkg/core/process"
3133
atesting "github.com/elastic/elastic-agent/pkg/testing"
3234
"github.com/elastic/elastic-agent/pkg/testing/define"
@@ -444,6 +446,246 @@ func createMockESOutput(t *testing.T, info *define.Info, percentDuplicate, perce
444446
return mockesURL, outputResp.Item.ID
445447
}
446448

449+
// TestContainerCMDAgentMonitoringRuntimeExperimental tests that when
450+
// AGENT_MONITORING_RUNTIME_EXPERIMENTAL is set, Elastic Agent uses the
451+
// respective runtime to run the agent.monitoring components from the
452+
// local configuration.
453+
func TestContainerCMDAgentMonitoringRuntimeExperimental(t *testing.T) {
454+
define.Require(t, define.Requirements{
455+
Stack: &define.Stack{},
456+
Local: false,
457+
Sudo: true,
458+
OS: []define.OS{
459+
{Type: define.Linux},
460+
},
461+
Group: "container",
462+
})
463+
464+
testCases := []struct {
465+
name string
466+
agentMonitoringRuntimeEnv string
467+
expectedRuntimeName string
468+
}{
469+
{
470+
name: "var set to otel",
471+
agentMonitoringRuntimeEnv: monitoringCfg.OtelRuntimeManager,
472+
expectedRuntimeName: string(monitoringCfg.OtelRuntimeManager),
473+
},
474+
{
475+
name: "var set to process",
476+
agentMonitoringRuntimeEnv: monitoringCfg.ProcessRuntimeManager,
477+
expectedRuntimeName: string(monitoringCfg.ProcessRuntimeManager),
478+
},
479+
{
480+
name: "var set to invalid value",
481+
agentMonitoringRuntimeEnv: "invalid",
482+
expectedRuntimeName: string(monitoringCfg.DefaultRuntimeManager),
483+
},
484+
{
485+
name: "var not set",
486+
agentMonitoringRuntimeEnv: "",
487+
expectedRuntimeName: string(monitoringCfg.DefaultRuntimeManager),
488+
},
489+
}
490+
491+
for _, tc := range testCases {
492+
t.Run(tc.name, func(t *testing.T) {
493+
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
494+
defer cancel()
495+
496+
agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
497+
require.NoError(t, err)
498+
499+
err = agentFixture.Prepare(ctx)
500+
require.NoError(t, err)
501+
502+
mockesURL := integration.StartMockES(t, 0, 0, 0, 0)
503+
504+
// Create a local agent config file with monitoring enabled
505+
agentConfig := createSimpleAgentMonitoringConfig(t, agentFixture.WorkDir(), mockesURL)
506+
507+
env := []string{
508+
"STATE_PATH=" + agentFixture.WorkDir(),
509+
}
510+
511+
// Set environment variable if specified
512+
if tc.agentMonitoringRuntimeEnv != "" {
513+
env = append(env, "AGENT_MONITORING_RUNTIME_EXPERIMENTAL="+tc.agentMonitoringRuntimeEnv)
514+
}
515+
516+
cmd, agentOutput := prepareAgentCMD(t, ctx, agentFixture, []string{"container", "-c", agentConfig}, env)
517+
t.Logf(">> running binary with: %v", cmd.Args)
518+
if err := cmd.Start(); err != nil {
519+
t.Fatalf("error running container cmd: %s", err)
520+
}
521+
522+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
523+
err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
524+
require.NoError(ct, err)
525+
},
526+
2*time.Minute, time.Second,
527+
"Elastic-Agent did not report healthy. Agent status error: \"%v\", Agent logs\n%s",
528+
err, agentOutput,
529+
)
530+
531+
// Verify that components are using the expected runtime
532+
require.EventuallyWithTf(t, func(ct *assert.CollectT) {
533+
status, err := agentFixture.ExecStatus(ctx, atesting.WithCmdOptions(withEnv(env)))
534+
require.NoErrorf(t, err, "error getting agent status")
535+
536+
expectedComponentCount := 4 // process runtime
537+
if tc.expectedRuntimeName == string(monitoringCfg.OtelRuntimeManager) {
538+
expectedComponentCount = 5
539+
}
540+
541+
require.Len(ct, status.Components, expectedComponentCount, "expected right number of components in agent status")
542+
543+
for _, comp := range status.Components {
544+
var compRuntime string
545+
switch comp.VersionInfo.Name {
546+
case "beats-receiver":
547+
compRuntime = string(component.OtelRuntimeManager)
548+
case "beat-v2-client":
549+
compRuntime = string(component.ProcessRuntimeManager)
550+
}
551+
t.Logf("Component ID: %s, version info: %s, runtime: %s", comp.ID, comp.VersionInfo.Name, compRuntime)
552+
switch comp.ID {
553+
case "beat/metrics-monitoring", "filestream-monitoring", "http/metrics-monitoring", "prometheus/metrics-monitoring":
554+
// Monitoring components should use the expected runtime
555+
assert.Equalf(t, tc.expectedRuntimeName, compRuntime, "expected correct runtime name for monitoring component %s with id %s", comp.Name, comp.ID)
556+
default:
557+
// Non-monitoring components should use the default runtime
558+
assert.Equalf(t, string(component.DefaultRuntimeManager), compRuntime, "expected default runtime for non-monitoring component %s with id %s", comp.Name, comp.ID)
559+
}
560+
}
561+
}, 1*time.Minute, 1*time.Second,
562+
"components did not use expected runtime",
563+
)
564+
})
565+
}
566+
}
567+
568+
// TestContainerCMDAgentMonitoringRuntimeExperimentalPolicy tests that when
569+
// AGENT_MONITORING_RUNTIME_EXPERIMENTAL is set, the agent.monitoring
570+
// from the fleet policy takes precedence over the environment variable.
571+
func TestContainerCMDAgentMonitoringRuntimeExperimentalPolicy(t *testing.T) {
572+
info := define.Require(t, define.Requirements{
573+
Stack: &define.Stack{},
574+
Local: false,
575+
Sudo: true,
576+
OS: []define.OS{
577+
{Type: define.Linux},
578+
},
579+
Group: "container",
580+
})
581+
582+
testCases := []struct {
583+
name string
584+
agentMonitoringRuntimeEnv string
585+
expectedRuntimeName string
586+
}{
587+
{
588+
name: "var set to otel",
589+
agentMonitoringRuntimeEnv: monitoringCfg.OtelRuntimeManager,
590+
expectedRuntimeName: string(monitoringCfg.ProcessRuntimeManager), // set by policy
591+
},
592+
}
593+
594+
for _, tc := range testCases {
595+
t.Run(tc.name, func(t *testing.T) {
596+
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
597+
defer cancel()
598+
599+
agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
600+
require.NoError(t, err)
601+
602+
err = agentFixture.Prepare(ctx)
603+
require.NoError(t, err)
604+
605+
fleetURL, err := fleettools.DefaultURL(ctx, info.KibanaClient)
606+
if err != nil {
607+
t.Fatalf("could not get Fleet URL: %s", err)
608+
}
609+
610+
policyName := fmt.Sprintf("test-beats-receivers-monitoring-%s-%s", tc.name, uuid.Must(uuid.NewV4()).String())
611+
policyID, enrollmentToken := createPolicy(
612+
t,
613+
ctx,
614+
agentFixture,
615+
info,
616+
policyName,
617+
"")
618+
619+
addLogIntegration(t, info, policyID, "/tmp/beats-receivers-test.log")
620+
integration.GenerateLogFile(t, "/tmp/beats-receivers-test.log", time.Second/2, 50)
621+
622+
// set monitoring runtime to process via policy
623+
setAgentMonitoringRuntime(t, info, policyID, policyName, monitoringCfg.ProcessRuntimeManager)
624+
625+
env := []string{
626+
"FLEET_ENROLL=1",
627+
"FLEET_URL=" + fleetURL,
628+
"FLEET_ENROLLMENT_TOKEN=" + enrollmentToken,
629+
"STATE_PATH=" + agentFixture.WorkDir(),
630+
}
631+
632+
// Set environment variable if specified
633+
if tc.agentMonitoringRuntimeEnv != "" {
634+
env = append(env, "AGENT_MONITORING_RUNTIME_EXPERIMENTAL="+tc.agentMonitoringRuntimeEnv)
635+
}
636+
637+
cmd, agentOutput := prepareAgentCMD(t, ctx, agentFixture, []string{"container"}, env)
638+
t.Logf(">> running binary with: %v", cmd.Args)
639+
if err := cmd.Start(); err != nil {
640+
t.Fatalf("error running container cmd: %s", err)
641+
}
642+
643+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
644+
err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
645+
require.NoError(ct, err)
646+
},
647+
2*time.Minute, time.Second,
648+
"Elastic-Agent did not report healthy. Agent status error: \"%v\", Agent logs\n%s",
649+
err, agentOutput,
650+
)
651+
652+
// Verify that components are using the expected runtime
653+
require.EventuallyWithTf(t, func(ct *assert.CollectT) {
654+
status, err := agentFixture.ExecStatus(ctx, atesting.WithCmdOptions(withEnv(env)))
655+
require.NoErrorf(t, err, "error getting agent status")
656+
657+
expectedComponentCount := 4 // process runtime
658+
if tc.expectedRuntimeName == string(monitoringCfg.OtelRuntimeManager) {
659+
expectedComponentCount = 5
660+
}
661+
662+
require.Len(ct, status.Components, expectedComponentCount, "expected right number of components in agent status")
663+
664+
for _, comp := range status.Components {
665+
var compRuntime string
666+
switch comp.VersionInfo.Name {
667+
case "beats-receiver":
668+
compRuntime = string(component.OtelRuntimeManager)
669+
case "beat-v2-client":
670+
compRuntime = string(component.ProcessRuntimeManager)
671+
}
672+
t.Logf("Component ID: %s, version info: %s, runtime: %s", comp.ID, comp.VersionInfo.Name, compRuntime)
673+
switch comp.ID {
674+
case "beat/metrics-monitoring", "filestream-monitoring", "http/metrics-monitoring", "prometheus/metrics-monitoring":
675+
// Monitoring components should use the expected runtime
676+
assert.Equalf(t, tc.expectedRuntimeName, compRuntime, "unexpected runtime name for monitoring component %s with id %s", comp.Name, comp.ID)
677+
default:
678+
// Non-monitoring components should use the default runtime
679+
assert.Equalf(t, string(component.DefaultRuntimeManager), compRuntime, "expected default runtime for non-monitoring component %s with id %s", comp.Name, comp.ID)
680+
}
681+
}
682+
}, 1*time.Minute, 1*time.Second,
683+
"components did not use expected runtime",
684+
)
685+
})
686+
}
687+
}
688+
447689
func addLogIntegration(t *testing.T, info *define.Info, policyID, logFilePath string) {
448690
agentPolicyBuilder := strings.Builder{}
449691
tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(integration.PolicyJSON)
@@ -495,3 +737,73 @@ func addLogIntegration(t *testing.T, info *define.Info, policyID, logFilePath st
495737
t.FailNow()
496738
}
497739
}
740+
741+
// createSimpleAgentMonitoringConfig creates a simple agent configuration file with monitoring enabled
742+
func createSimpleAgentMonitoringConfig(t *testing.T, workDir string, esAddr string) string {
743+
configTemplate := `
744+
outputs:
745+
default:
746+
type: elasticsearch
747+
hosts:
748+
- %s
749+
750+
agent:
751+
logging:
752+
level: debug
753+
monitoring:
754+
enabled: true
755+
metrics: true
756+
757+
inputs:
758+
- id: system-metrics
759+
type: system/metrics
760+
use_output: default
761+
streams:
762+
- metricsets:
763+
- cpu
764+
data_stream.dataset: system.cpu
765+
- metricsets:
766+
- memory
767+
data_stream.dataset: system.memory
768+
`
769+
770+
config := fmt.Sprintf(configTemplate, esAddr)
771+
configPath := filepath.Join(workDir, "elastic-agent.yml")
772+
err := os.WriteFile(configPath, []byte(config), 0644)
773+
if err != nil {
774+
t.Fatalf("failed to write agent config file: %s", err)
775+
}
776+
777+
return configPath
778+
}
779+
780+
func setAgentMonitoringRuntime(t *testing.T, info *define.Info, policyID string, policyName string, runtime string) {
781+
reqBody := fmt.Sprintf(`
782+
{
783+
"name": "%s",
784+
"namespace": "default",
785+
"overrides": {
786+
"agent": {
787+
"monitoring": {
788+
"_runtime_experimental": "%s"
789+
}
790+
}
791+
}
792+
}
793+
`, policyName, runtime)
794+
795+
status, result, err := info.KibanaClient.Request(
796+
http.MethodPut,
797+
fmt.Sprintf("/api/fleet/agent_policies/%s", policyID),
798+
nil,
799+
nil,
800+
bytes.NewBufferString(reqBody))
801+
if err != nil {
802+
t.Fatalf("could not execute request to update policy: %s", err)
803+
}
804+
if status != http.StatusOK {
805+
t.Fatalf("updating policy failed. Status code %d, response:\n%s", status, string(result))
806+
}
807+
808+
t.Logf("Successfully set monitoring to process runtime for policy %s", policyID)
809+
}

0 commit comments

Comments
 (0)