Skip to content

Commit 9ce0a83

Browse files
Fix filesource provider with kubernetes symlink (#11050) (#11151)
* Add a unit tests to validate kubernetes symlink behavior works with the filesource provider. * Fix filesource to work on Linux. * Remove the extra test. * Skip test on Windows. * Add changelog entry. * Improvement from code review. (cherry picked from commit 0d50584) Co-authored-by: Blake Rouse <blake.rouse@elastic.co>
1 parent a4e98f6 commit 9ce0a83

File tree

3 files changed

+271
-9
lines changed

3 files changed

+271
-9
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: fix filesource provider to work with kubernetes secret mounts
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: elastic-agent
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234

internal/pkg/composable/providers/filesource/filesource.go

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,40 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider
6363
}
6464
defer watcher.Close()
6565

66-
// invert the mapping to map paths to source names
66+
// Build mapping from paths (including intermediate symlinks) to source names
67+
// and keep track of the original source paths for reading
68+
// For Kubernetes secrets: both "token" and "..data/" will map to the same source,
69+
// but we always read from the original "token" path
6770
inverted := make(map[string][]string, len(c.cfg.Sources))
71+
sourcePaths := make(map[string]string, len(c.cfg.Sources)) // sourceName -> original path
72+
6873
for sourceName, sourceCfg := range c.cfg.Sources {
74+
// Store the original path for this source
75+
sourcePaths[sourceName] = sourceCfg.Path
76+
77+
// Add the direct path
6978
sources, ok := inverted[sourceCfg.Path]
7079
if !ok {
7180
sources = []string{sourceName}
7281
} else {
7382
sources = append(sources, sourceName)
7483
}
7584
inverted[sourceCfg.Path] = sources
85+
86+
// Also add any intermediate symlinks in the resolution chain
87+
// This handles Kubernetes secrets where token -> ..data/token -> ..2024_01_01/token
88+
intermediates := resolveSymlinkChain(sourceCfg.Path)
89+
for _, intermediate := range intermediates {
90+
sources, ok := inverted[intermediate]
91+
if !ok {
92+
sources = []string{sourceName}
93+
} else {
94+
if !slices.Contains(sources, sourceName) {
95+
sources = append(sources, sourceName)
96+
}
97+
}
98+
inverted[intermediate] = sources
99+
}
76100
}
77101

78102
// determine the paths to watch (watch is performed on the directories that contain the file)
@@ -97,11 +121,9 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider
97121
// the updated file changes will not be missed
98122
current := make(map[string]interface{}, len(c.cfg.Sources))
99123
readAll := func() error {
100-
for path, sources := range inverted {
124+
for sourceName, path := range sourcePaths {
101125
value := c.readContents(path)
102-
for _, source := range sources {
103-
current[source] = value
104-
}
126+
current[sourceName] = value
105127
}
106128
err = comm.Set(current)
107129
if err != nil {
@@ -153,12 +175,15 @@ func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider
153175
switch {
154176
case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0:
155177
// file was created, updated, or deleted (update the value)
178+
// when an intermediate symlink changes (e.g., ..data), we need to
179+
// re-read all sources that depend on it using their original paths
156180
changed := false
157-
value := c.readContents(path)
158-
for _, source := range sources {
159-
previous := current[source]
181+
for _, sourceName := range sources {
182+
sourcePath := sourcePaths[sourceName]
183+
value := c.readContents(sourcePath)
184+
previous := current[sourceName]
160185
if previous != value {
161-
current[source] = value
186+
current[sourceName] = value
162187
changed = true
163188
}
164189
}
@@ -284,3 +309,64 @@ func drainQueue(e <-chan fsnotify.Event) {
284309
}
285310
}
286311
}
312+
313+
// resolveSymlinkChain resolves a path and returns all intermediate symlink paths
314+
// that should be watched to detect changes. This is critical for Kubernetes secrets
315+
// where the structure is: token -> ..data/token -> ..2024_01_01/token
316+
// When Kubernetes updates the secret, it replaces ..data, so we need to watch it.
317+
func resolveSymlinkChain(path string) []string {
318+
var intermediates []string
319+
dir := filepath.Dir(path)
320+
321+
// Check if the file itself is a symlink
322+
target, err := os.Readlink(path)
323+
if err != nil {
324+
// Not a symlink or doesn't exist, nothing to track
325+
return intermediates
326+
}
327+
328+
// If it's a relative symlink, resolve it relative to the parent directory
329+
if !filepath.IsAbs(target) {
330+
target = filepath.Join(dir, target)
331+
}
332+
333+
// Now walk through the target path and find intermediate symlinks
334+
// For example, if target is "/var/secrets/..data/token", we want to check if "..data" is a symlink
335+
targetDir := filepath.Dir(target)
336+
337+
// Check each component of the target directory path for symlinks
338+
components := strings.Split(filepath.Clean(targetDir), string(filepath.Separator))
339+
currentPath := ""
340+
if filepath.IsAbs(targetDir) {
341+
currentPath = string(filepath.Separator)
342+
}
343+
344+
for _, component := range components {
345+
if component == "" {
346+
continue
347+
}
348+
currentPath = filepath.Join(currentPath, component)
349+
350+
// Check if this component is a symlink
351+
_, err = os.Readlink(currentPath)
352+
if err == nil {
353+
// This is a symlink, add it to our watch list
354+
cleanPath := filepath.Clean(currentPath)
355+
// Windows paths are case-insensitive
356+
if runtime.GOOS == "windows" {
357+
cleanPath = strings.ToLower(cleanPath)
358+
}
359+
intermediates = append(intermediates, cleanPath)
360+
}
361+
}
362+
363+
// Also check if the target file itself is a symlink (nested symlinks)
364+
_, err = os.Readlink(target)
365+
if err == nil {
366+
// The target itself is also a symlink, recursively resolve it
367+
nestedIntermediates := resolveSymlinkChain(target)
368+
intermediates = append(intermediates, nestedIntermediates...)
369+
}
370+
371+
return intermediates
372+
}

internal/pkg/composable/providers/filesource/filesource_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,134 @@ func TestContextProvider(t *testing.T) {
181181
}
182182
}
183183
}
184+
185+
func TestContextProvider_KubernetesSymlinks(t *testing.T) {
186+
if runtime.GOOS == "windows" {
187+
t.Skip("Skipping Kubernetes symlink test on Windows, because atomic replacing a symlink using os.Rename doesn't work")
188+
}
189+
190+
const testTimeout = 3 * time.Second
191+
192+
// Create directory structure that mimics Kubernetes secrets
193+
tmpDir := t.TempDir()
194+
195+
// Create initial timestamped directory with secret content
196+
dataDir1 := filepath.Join(tmpDir, "..2024_01_01_12_00")
197+
require.NoError(t, os.Mkdir(dataDir1, 0o755))
198+
199+
value1 := "secret-token-v1"
200+
tokenFile1 := filepath.Join(dataDir1, "token")
201+
require.NoError(t, os.WriteFile(tokenFile1, []byte(value1), 0o644))
202+
203+
value2 := "secret-cert-v1"
204+
certFile1 := filepath.Join(dataDir1, "ca.crt")
205+
require.NoError(t, os.WriteFile(certFile1, []byte(value2), 0o644))
206+
207+
// Create ..data symlink pointing to the timestamped directory
208+
dataSymlink := filepath.Join(tmpDir, "..data")
209+
require.NoError(t, os.Symlink(dataDir1, dataSymlink))
210+
211+
// Create top-level symlinks (what the user actually references)
212+
tokenSymlink := filepath.Join(tmpDir, "token")
213+
require.NoError(t, os.Symlink(filepath.Join("..data", "token"), tokenSymlink))
214+
215+
certSymlink := filepath.Join(tmpDir, "ca.crt")
216+
require.NoError(t, os.Symlink(filepath.Join("..data", "ca.crt"), certSymlink))
217+
218+
// Setup logger and provider
219+
log, err := logger.New("filesource_test", false)
220+
require.NoError(t, err)
221+
222+
osPath := func(path string) string {
223+
return path
224+
}
225+
if runtime.GOOS == "windows" {
226+
osPath = func(path string) string {
227+
return strings.ToLower(path)
228+
}
229+
}
230+
231+
c, err := config.NewConfigFrom(map[string]interface{}{
232+
"sources": map[string]interface{}{
233+
"token": map[string]interface{}{
234+
"path": osPath(tokenSymlink),
235+
},
236+
"cert": map[string]interface{}{
237+
"path": osPath(certSymlink),
238+
},
239+
},
240+
})
241+
require.NoError(t, err)
242+
243+
builder, _ := composable.Providers.GetContextProvider("filesource")
244+
provider, err := builder(log, c, true)
245+
require.NoError(t, err)
246+
247+
ctx, cancel := context.WithCancel(context.Background())
248+
defer cancel()
249+
comm := ctesting.NewContextComm(ctx)
250+
setChan := make(chan map[string]interface{})
251+
comm.CallOnSet(func(value map[string]interface{}) {
252+
t.Logf("Set called with: token=%v, cert=%v", value["token"], value["cert"])
253+
setChan <- value
254+
})
255+
256+
go func() {
257+
_ = provider.Run(ctx, comm)
258+
}()
259+
260+
// Wait for initial values
261+
var current map[string]interface{}
262+
select {
263+
case current = <-setChan:
264+
case <-time.After(testTimeout):
265+
require.FailNow(t, "timeout waiting for provider to call Set")
266+
}
267+
268+
require.Equal(t, value1, current["token"], "initial token value should match")
269+
require.Equal(t, value2, current["cert"], "initial cert value should match")
270+
271+
// Simulate Kubernetes secret update:
272+
// 1. Create new timestamped directory with updated content
273+
dataDir2 := filepath.Join(tmpDir, "..2024_01_01_13_00")
274+
require.NoError(t, os.Mkdir(dataDir2, 0o755))
275+
276+
value1Updated := "secret-token-v2"
277+
tokenFile2 := filepath.Join(dataDir2, "token")
278+
require.NoError(t, os.WriteFile(tokenFile2, []byte(value1Updated), 0o644))
279+
280+
value2Updated := "secret-cert-v2"
281+
certFile2 := filepath.Join(dataDir2, "ca.crt")
282+
require.NoError(t, os.WriteFile(certFile2, []byte(value2Updated), 0o644))
283+
284+
// 2. Atomically replace ..data symlink (this is what Kubernetes does)
285+
// Create temporary symlink, then rename it to replace the old one atomically
286+
dataTmpSymlink := filepath.Join(tmpDir, "..data_tmp")
287+
require.NoError(t, os.Symlink(dataDir2, dataTmpSymlink))
288+
require.NoError(t, os.Rename(dataTmpSymlink, dataSymlink))
289+
290+
// Note: The top-level symlinks (token, ca.crt) are NOT modified
291+
// They still point to ..data/token and ..data/ca.crt
292+
// Only the ..data symlink target changed
293+
294+
// Wait for the provider to detect the update
295+
// This should happen because fsnotify should see the ..data symlink change
296+
updateDetected := false
297+
deadline := time.After(testTimeout)
298+
for !updateDetected {
299+
select {
300+
case updated := <-setChan:
301+
// Check if we got the updated values
302+
if updated["token"] == value1Updated && updated["cert"] == value2Updated {
303+
updateDetected = true
304+
t.Log("Successfully detected Kubernetes-style symlink update")
305+
} else {
306+
t.Logf("Got update but values don't match yet: token=%v, cert=%v", updated["token"], updated["cert"])
307+
}
308+
case <-deadline:
309+
require.FailNow(t, "timeout waiting for provider to detect Kubernetes-style symlink update")
310+
}
311+
}
312+
313+
require.True(t, updateDetected, "provider should detect Kubernetes-style symlink updates")
314+
}

0 commit comments

Comments
 (0)