Skip to content

Commit c244de0

Browse files
authored
[Logs+] Default pipeline for logs data streams (#95971)
1 parent 3f5dc96 commit c244de0

File tree

8 files changed

+200
-8
lines changed

8 files changed

+200
-8
lines changed

docs/changelog/95971.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 95971
2+
summary: "Set `@timestamp` for documents in logs data streams if missing and add support for custom pipeline"
3+
area: Data streams
4+
type: enhancement
5+
issues:
6+
- 95537
7+
- 95551
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
---
2+
Test default logs-*-* pipeline:
3+
- do:
4+
# setting up a custom field mapping, to test custom pipeline
5+
cluster.put_component_template:
6+
name: logs@custom
7+
body:
8+
template:
9+
mappings:
10+
properties:
11+
custom_timestamp:
12+
type: date
13+
14+
- do:
15+
ingest.put_pipeline:
16+
# testing custom pipeline - setting a custom timestamp with the same value used to set the `@timestamp` field when missing
17+
id: "logs@custom"
18+
body: >
19+
{
20+
"processors": [
21+
{
22+
"set" : {
23+
"field": "custom_timestamp",
24+
"copy_from": "_ingest.timestamp"
25+
}
26+
}
27+
]
28+
}
29+
30+
- do:
31+
indices.create_data_stream:
32+
name: logs-generic-default
33+
- is_true: acknowledged
34+
35+
- do:
36+
indices.get_data_stream:
37+
name: logs-generic-default
38+
- set: { data_streams.0.indices.0.index_name: idx0name }
39+
40+
- do:
41+
indices.get_mapping:
42+
index: logs-generic-default
43+
- match: { .$idx0name.mappings.properties.@timestamp.type: "date" }
44+
45+
- do:
46+
index:
47+
index: logs-generic-default
48+
refresh: true
49+
body:
50+
# no timestamp - testing default pipeline's @timestamp set processor
51+
message: 'no_timestamp'
52+
- match: {result: "created"}
53+
54+
- do:
55+
search:
56+
index: logs-generic-default
57+
body:
58+
query:
59+
term:
60+
message:
61+
value: 'no_timestamp'
62+
fields:
63+
- field: '@timestamp'
64+
- field: 'custom_timestamp'
65+
- length: { hits.hits: 1 }
66+
- match: { hits.hits.0._source.@timestamp: '/[0-9-]+T[0-9:.]+Z/' }
67+
- set: {hits.hits.0._source.custom_timestamp: custom_timestamp_source }
68+
- match: { hits.hits.0._source.@timestamp: $custom_timestamp_source }
69+
- match: { hits.hits.0.fields.@timestamp.0: '/[0-9-]+T[0-9:.]+Z/' }
70+
- set: {hits.hits.0.fields.custom_timestamp.0: custom_timestamp_field }
71+
- match: { hits.hits.0.fields.@timestamp.0: $custom_timestamp_field }
72+
73+
# verify that when a document is ingested with a timestamp, it does not get overridden
74+
- do:
75+
index:
76+
index: logs-generic-default
77+
refresh: true
78+
body:
79+
'@timestamp': '2023-05-10'
80+
message: 'with_timestamp'
81+
- match: {result: "created"}
82+
83+
- do:
84+
search:
85+
index: logs-generic-default
86+
body:
87+
query:
88+
term:
89+
message:
90+
value: 'with_timestamp'
91+
fields:
92+
- field: '@timestamp'
93+
- length: { hits.hits: 1 }
94+
- match: { hits.hits.0.fields.@timestamp.0: '2023-05-10T00:00:00.000Z' }
95+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"processors": [
3+
{
4+
"set": {
5+
"description": "If '@timestamp' is missing, set it with the ingest timestamp",
6+
"field": "@timestamp",
7+
"override": false,
8+
"copy_from": "_ingest.timestamp"
9+
}
10+
},
11+
{
12+
"pipeline" : {
13+
"name": "logs@custom",
14+
"ignore_missing_pipeline": true,
15+
"description": "A custom pipeline for logs data streams, which does not exist by default, but can be added if additional processing is required"
16+
}
17+
}
18+
],
19+
"_meta": {
20+
"description": "default pipeline for the logs index template installed by x-pack",
21+
"managed": true
22+
},
23+
"version": ${xpack.stack.template.version}
24+
}

x-pack/plugin/core/src/main/resources/logs-settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
},
1212
"mapping": {
1313
"ignore_malformed": true
14-
}
14+
},
15+
"default_pipeline": "logs-default-pipeline"
1516
}
1617
}
1718
},

x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
2424
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
2525
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
26+
import org.elasticsearch.xpack.core.template.IngestPipelineConfig;
2627
import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig;
2728

2829
import java.io.IOException;
@@ -221,6 +222,15 @@ protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
221222
}
222223
}
223224

225+
private static final List<IngestPipelineConfig> INGEST_PIPELINE_CONFIGS = List.of(
226+
new IngestPipelineConfig("logs-default-pipeline", "/logs-default-pipeline.json", REGISTRY_VERSION, TEMPLATE_VERSION_VARIABLE)
227+
);
228+
229+
@Override
230+
protected List<IngestPipelineConfig> getIngestPipelines() {
231+
return INGEST_PIPELINE_CONFIGS;
232+
}
233+
224234
@Override
225235
protected String getOrigin() {
226236
return ClientHelper.STACK_ORIGIN;

x-pack/plugin/stack/src/test/java/org/elasticsearch/xpack/stack/StackTemplateRegistryTests.java

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
1515
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
16+
import org.elasticsearch.action.ingest.PutPipelineAction;
1617
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1718
import org.elasticsearch.cluster.ClusterChangedEvent;
1819
import org.elasticsearch.cluster.ClusterName;
@@ -27,6 +28,8 @@
2728
import org.elasticsearch.cluster.service.ClusterService;
2829
import org.elasticsearch.common.TriFunction;
2930
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.ingest.IngestMetadata;
32+
import org.elasticsearch.ingest.PipelineConfiguration;
3033
import org.elasticsearch.test.ClusterServiceUtils;
3134
import org.elasticsearch.test.ESTestCase;
3235
import org.elasticsearch.test.client.NoOpClient;
@@ -45,6 +48,7 @@
4548
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
4649
import org.elasticsearch.xpack.core.ilm.OperationMode;
4750
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction;
51+
import org.elasticsearch.xpack.core.template.IngestPipelineConfig;
4852
import org.junit.After;
4953
import org.junit.Before;
5054

@@ -190,10 +194,40 @@ public void testPolicyAlreadyExists() {
190194
return null;
191195
});
192196

193-
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), policyMap, nodes);
197+
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), policyMap, nodes, true);
194198
registry.clusterChanged(event);
195199
}
196200

201+
public void testThatRequiredPipelinesAreAdded() throws Exception {
202+
DiscoveryNode node = TestDiscoveryNode.create("node");
203+
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
204+
205+
AtomicInteger calledTimes = new AtomicInteger(0);
206+
client.setVerifier((action, request, listener) -> {
207+
if (action instanceof PutPipelineAction) {
208+
calledTimes.incrementAndGet();
209+
return AcknowledgedResponse.TRUE;
210+
}
211+
if (action instanceof PutComponentTemplateAction) {
212+
// Ignore this, it's verified in another test
213+
return AcknowledgedResponse.TRUE;
214+
} else if (action instanceof PutLifecycleAction) {
215+
// Ignore this, it's verified in another test
216+
return AcknowledgedResponse.TRUE;
217+
} else if (action instanceof PutComposableIndexTemplateAction) {
218+
// Ignore this, it's verified in another test
219+
return AcknowledgedResponse.TRUE;
220+
} else {
221+
fail("client called with unexpected request: " + request.toString());
222+
}
223+
return null;
224+
});
225+
226+
ClusterChangedEvent event = createInitialClusterChangedEvent(nodes);
227+
registry.clusterChanged(event);
228+
assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelines().size())));
229+
}
230+
197231
public void testPolicyAlreadyExistsButDiffers() throws IOException {
198232
DiscoveryNode node = TestDiscoveryNode.create("node");
199233
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
@@ -235,7 +269,7 @@ public void testPolicyAlreadyExistsButDiffers() throws IOException {
235269
) {
236270
LifecyclePolicy different = LifecyclePolicy.parse(parser, policies.get(0).getName());
237271
policyMap.put(policies.get(0).getName(), different);
238-
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), policyMap, nodes);
272+
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), policyMap, nodes, true);
239273
registry.clusterChanged(event);
240274
}
241275
}
@@ -442,7 +476,6 @@ private ActionResponse verifyComponentTemplateInstalled(
442476
) {
443477
if (action instanceof PutComponentTemplateAction) {
444478
calledTimes.incrementAndGet();
445-
assertThat(action, instanceOf(PutComponentTemplateAction.class));
446479
assertThat(request, instanceOf(PutComponentTemplateAction.Request.class));
447480
final PutComponentTemplateAction.Request putRequest = (PutComponentTemplateAction.Request) request;
448481
assertThat(putRequest.componentTemplate().version(), equalTo((long) StackTemplateRegistry.REGISTRY_VERSION));
@@ -461,15 +494,20 @@ private ActionResponse verifyComponentTemplateInstalled(
461494
}
462495

463496
private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> existingTemplates, DiscoveryNodes nodes) {
464-
return createClusterChangedEvent(existingTemplates, Collections.emptyMap(), nodes);
497+
return createClusterChangedEvent(existingTemplates, Collections.emptyMap(), nodes, true);
498+
}
499+
500+
private ClusterChangedEvent createInitialClusterChangedEvent(DiscoveryNodes nodes) {
501+
return createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), nodes, false);
465502
}
466503

467504
private ClusterChangedEvent createClusterChangedEvent(
468505
Map<String, Integer> existingTemplates,
469506
Map<String, LifecyclePolicy> existingPolicies,
470-
DiscoveryNodes nodes
507+
DiscoveryNodes nodes,
508+
boolean addRegistryPipelines
471509
) {
472-
ClusterState cs = createClusterState(Settings.EMPTY, existingTemplates, existingPolicies, nodes);
510+
ClusterState cs = createClusterState(Settings.EMPTY, existingTemplates, existingPolicies, nodes, addRegistryPipelines);
473511
ClusterChangedEvent realEvent = new ClusterChangedEvent(
474512
"created-from-test",
475513
cs,
@@ -485,7 +523,8 @@ private ClusterState createClusterState(
485523
Settings nodeSettings,
486524
Map<String, Integer> existingComponentTemplates,
487525
Map<String, LifecyclePolicy> existingPolicies,
488-
DiscoveryNodes nodes
526+
DiscoveryNodes nodes,
527+
boolean addRegistryPipelines
489528
) {
490529
Map<String, ComponentTemplate> componentTemplates = new HashMap<>();
491530
for (Map.Entry<String, Integer> template : existingComponentTemplates.entrySet()) {
@@ -499,12 +538,26 @@ private ClusterState createClusterState(
499538
.collect(Collectors.toMap(Map.Entry::getKey, e -> new LifecyclePolicyMetadata(e.getValue(), Collections.emptyMap(), 1, 1)));
500539
IndexLifecycleMetadata ilmMeta = new IndexLifecycleMetadata(existingILMMeta, OperationMode.RUNNING);
501540

541+
// adding the registry pipelines, as they may be dependencies for index templates
542+
Map<String, PipelineConfiguration> ingestPipelines = new HashMap<>();
543+
if (addRegistryPipelines) {
544+
for (IngestPipelineConfig ingestPipelineConfig : registry.getIngestPipelines()) {
545+
// we cannot mock PipelineConfiguration as it is a final class
546+
ingestPipelines.put(
547+
ingestPipelineConfig.getId(),
548+
new PipelineConfiguration(ingestPipelineConfig.getId(), ingestPipelineConfig.loadConfig(), XContentType.JSON)
549+
);
550+
}
551+
}
552+
IngestMetadata ingestMetadata = new IngestMetadata(ingestPipelines);
553+
502554
return ClusterState.builder(new ClusterName("test"))
503555
.metadata(
504556
Metadata.builder()
505557
.componentTemplates(componentTemplates)
506558
.transientSettings(nodeSettings)
507559
.putCustom(IndexLifecycleMetadata.TYPE, ilmMeta)
560+
.putCustom(IngestMetadata.TYPE, ingestMetadata)
508561
.build()
509562
)
510563
.blocks(new ClusterBlocks.Builder().build())

x-pack/qa/core-rest-tests-with-security/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dependencies {
44
testImplementation project(':x-pack:qa')
55
clusterModules project(':modules:mapper-extras')
66
clusterModules project(':modules:rank-eval')
7+
clusterModules project(':modules:ingest-common')
78
clusterModules project(xpackModule('stack'))
89
clusterModules project(xpackModule('ilm'))
910
clusterModules project(xpackModule('mapper-constant-keyword'))

x-pack/qa/core-rest-tests-with-security/src/yamlRestTest/java/org/elasticsearch/xpack/security/CoreWithSecurityClientYamlTestSuiteIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class CoreWithSecurityClientYamlTestSuiteIT extends ESClientYamlSuiteTest
3535
.module("rank-eval")
3636
.module("x-pack-ilm")
3737
.module("x-pack-stack")
38+
.module("ingest-common")
3839
.setting("xpack.security.enabled", "true")
3940
.setting("xpack.watcher.enabled", "false")
4041
.setting("xpack.ml.enabled", "false")

0 commit comments

Comments
 (0)