Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions internal/elasticsearch/ingest/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
return nil, err
}

c, err = addRerouteProcessors(c, dataStreamPath, path)
cWithRerouteProcessors, err := addRerouteProcessors(c, dataStreamPath, path)
if err != nil {
return nil, err
}

name := filepath.Base(path)
pipelines = append(pipelines, Pipeline{
Path: path,
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Format: filepath.Ext(path)[1:],
Content: c,
Path: path,
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Format: filepath.Ext(path)[1:],
Content: cWithRerouteProcessors,
ContentOriginal: c,
Comment on lines 107 to +112
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new field ContentOriginal in this struct to be able to keep the original contents, the ones read from the ingest pipeline file defined in the package.

It is being kept Content to hold the same contents (containing reroute processors) to not change the current behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another consequence of adding those reroute processors, is that the YAML written afterwards is not the same as in the package:

  • comments are not kept
  • fields in the YAML do not keep the same order
})
}
return pipelines, nil
Expand Down
9 changes: 5 additions & 4 deletions internal/elasticsearch/ingest/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type pipelineIngestedDocument struct {

// Pipeline represents a pipeline resource loaded from a file
type Pipeline struct {
Path string // Path of the file with the pipeline definition.
Name string // Name of the pipeline.
Format string // Format (extension) of the pipeline.
Content []byte // Content is the original file contents.
Path string // Path of the file with the pipeline definition.
Name string // Name of the pipeline.
Format string // Format (extension) of the pipeline.
Content []byte // Content is the pipeline file contents with reroute processors if any.
ContentOriginal []byte // Content is the original file contents.
}

// Filename returns the original filename associated with the pipeline.
Expand Down
14 changes: 14 additions & 0 deletions internal/elasticsearch/ingest/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ func (p Pipeline) Processors() (procs []Processor, err error) {
return procs, nil
}

// OriginalProcessors return the original list of processors in an ingest pipeline.
func (p Pipeline) OriginalProcessors() (procs []Processor, err error) {
switch p.Format {
case "yaml", "yml", "json":
procs, err = processorsFromYAML(p.ContentOriginal)
default:
return nil, fmt.Errorf("unsupported pipeline format: %s", p.Format)
}
if err != nil {
return nil, fmt.Errorf("failure processing %s pipeline '%s': %w", p.Format, p.Filename(), err)
}
return procs, nil
}

// extract a list of processors from a pipeline definition in YAML format.
func processorsFromYAML(content []byte) (procs []Processor, err error) {
var p struct {
Expand Down
13 changes: 12 additions & 1 deletion internal/testrunner/runners/pipeline/coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func GetPipelineCoverage(options testrunner.TestOptions, pipelines []ingest.Pipe

func pipelineDataForCoverage(pipeline ingest.Pipeline, stats ingest.PipelineStatsMap, basePath, dataStreamPath string) (string, string, []ingest.Processor, ingest.PipelineStats, error) {
// Load the list of main processors from the pipeline source code, annotated with line numbers.
src, err := pipeline.Processors()
src, err := pipeline.OriginalProcessors()
if err != nil {
return "", "", nil, ingest.PipelineStats{}, err
}
Expand All @@ -115,6 +115,17 @@ func pipelineDataForCoverage(pipeline ingest.Pipeline, stats ingest.PipelineStat
return "", "", nil, ingest.PipelineStats{}, fmt.Errorf("pipeline '%s' not installed in Elasticsearch", pipeline.Name)
}

// Remove reroute processors if any so the pipeline has the same processors as in the file
// reroute processors are added if there are any routing_rules file defined
var processors []ingest.ProcessorStats
for _, proc := range pstats.Processors {
if proc.Type == "reroute" {
continue
}
processors = append(processors, proc)
}
pstats.Processors = processors

// Ensure there is no inconsistency in the list of processors in stats vs obtained from source.
if len(src) != len(pstats.Processors) {
return "", "", nil, ingest.PipelineStats{}, fmt.Errorf("processor count mismatch for %s (src:%d stats:%d)", pipeline.Filename(), len(src), len(pstats.Processors))
Expand Down