Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8934c9e
Adding ability to auto-install ingest pipelines through index templates
eyalkoren May 3, 2023
4aaa1ff
Merge remote-tracking branch 'upstream/main' into ingest-pipeline-reg…
eyalkoren May 3, 2023
8d09963
Update docs/changelog/95782.yaml
eyalkoren May 3, 2023
0240b07
Update changelog summary
eyalkoren May 3, 2023
5354b7c
Guarding from nulls
eyalkoren May 3, 2023
ddef98e
Avoid using forbidden API
eyalkoren May 4, 2023
7ee1694
Fixing AnalyticsTemplateRegistryTests to pass index template validation
eyalkoren May 4, 2023
e82e135
Merge remote-tracking branch 'upstream/main' into ingest-pipeline-reg…
eyalkoren May 4, 2023
481f132
Fixing validation when IngestMetadata is null in cluster state
eyalkoren May 4, 2023
3f89462
Merge remote-tracking branch 'upstream/main' into ingest-pipeline-reg…
eyalkoren May 7, 2023
0688915
Merge remote-tracking branch 'upstream/main' into default-timestamp-f…
eyalkoren May 10, 2023
e361eb8
[Logs+] adding defalut pipeline for logs data streams
eyalkoren May 10, 2023
4cd492b
Update docs/changelog/95971.yaml
eyalkoren May 10, 2023
1b9afc1
Fix StackTemplateRegistryTests and verify that StackTemplateRegistry …
eyalkoren May 10, 2023
ba72cdb
Merge remote-tracking branch 'eyalkoren/default-timestamp-for-logs' i…
eyalkoren May 10, 2023
0e8400c
Update changelog summary
eyalkoren May 10, 2023
2eef78e
Fixing CoreWithSecurityClientYamlTestSuiteIT
eyalkoren May 10, 2023
011f627
Verify that valid timestamp is not being overridden
eyalkoren May 10, 2023
b711cb1
Use _ingest.timestamp field directly in set processor
eyalkoren May 10, 2023
e4a3fc5
Add log events JSON parser to logs default pipeline
eyalkoren May 14, 2023
18fe484
Merge remote-tracking branch 'upstream/main' into json-parse-logs
eyalkoren May 14, 2023
c901d5d
Update docs/changelog/96083.yaml
eyalkoren May 14, 2023
12eed7d
Restore custom pipeline description lost in merge
eyalkoren May 14, 2023
3aec692
Merge remote-tracking branch 'eyalkoren/json-parse-logs' into json-pa…
eyalkoren May 14, 2023
8e09c78
Improve changelog summary
eyalkoren May 14, 2023
ab8f69a
Fix StackTemplateRegistryTests
eyalkoren May 14, 2023
4da9477
Fix Painless condition
eyalkoren May 15, 2023
2be9526
Changing method name as proposed in review
eyalkoren May 16, 2023
14e9be9
Do not enforce pipeline dependency list
eyalkoren May 16, 2023
96e1f00
Merge remote-tracking branch 'upstream/main' into json-parse-logs
eyalkoren May 16, 2023
02e5c49
Make spotless happy
eyalkoren May 16, 2023
a3a56b3
Make condition safe for non-string message field
eyalkoren May 16, 2023
99a1c32
Remove explicit comparisons to true
eyalkoren May 16, 2023
75f37aa
No null check for ctx
eyalkoren May 17, 2023
b8e001c
No null check for ctx
eyalkoren May 17, 2023
8f4b7c1
Merge remote-tracking branch 'upstream/main' into json-parse-logs
eyalkoren May 17, 2023
3b91268
Making JSON parsing pipeline opt-in
eyalkoren May 17, 2023
bb67bd6
Merge remote-tracking branch 'eyalkoren/json-parse-logs' into json-pa…
eyalkoren May 17, 2023
c963143
Merge remote-tracking branch 'upstream/main' into json-parse-logs
eyalkoren May 17, 2023
0c766ff
Merge remote-tracking branch 'upstream/main' into json-parse-logs
eyalkoren May 22, 2023
d16f9bd
Changing pipeline name
eyalkoren May 22, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/96083.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 96083
summary: Automatically parse log events in logs data streams, if their `message` field contains JSON content
area: Data streams
type: enhancement
issues:
- 95522
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,3 @@ Test default logs-*-* pipeline:
- field: '@timestamp'
- length: { hits.hits: 1 }
- match: { hits.hits.0.fields.@timestamp.0: '2023-05-10T00:00:00.000Z' }

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
---
Test log message JSON-parsing pipeline:
- do:
ingest.put_pipeline:
# opting in to use the JSON parsing pipeline for message field
id: "logs@custom"
body: >
{
"processors": [
{
"pipeline" : {
"name": "logs@json-message",
"description": "A pipeline that automatically parses JSON log events into top-level fields if they are such"
}
}
]
}

- do:
indices.create_data_stream:
name: logs-generic-default
- is_true: acknowledged

- do:
index:
index: logs-generic-default
refresh: true
body:
'@timestamp': '2023-05-10'
message: |-
{
"@timestamp":"2023-05-09T16:48:34.135Z",
"message":"json",
"log.level": "INFO",
"ecs.version": "1.6.0",
"service.name":"my-app",
"event.dataset":"my-app.RollingFile",
"process.thread.name":"main",
"log.logger":"root.pkg.MyApp"
}
- match: {result: "created"}

- do:
search:
index: logs-generic-default
body:
query:
term:
message:
value: 'json'
fields:
- field: 'message'
- length: { hits.hits: 1 }
# root field parsed from JSON should win
- match: { hits.hits.0._source.@timestamp: '2023-05-09T16:48:34.135Z' }
- match: { hits.hits.0._source.message: 'json' }
- match: { hits.hits.0.fields.message.0: 'json' }
# successful access to subfields verifies that dot expansion is part of the pipeline
- match: { hits.hits.0._source.log.level: 'INFO' }
- match: { hits.hits.0._source.ecs.version: '1.6.0' }
- match: { hits.hits.0._source.service.name: 'my-app' }
- match: { hits.hits.0._source.event.dataset: 'my-app.RollingFile' }
- match: { hits.hits.0._source.process.thread.name: 'main' }
- match: { hits.hits.0._source.log.logger: 'root.pkg.MyApp' }
# _tmp_json_message should be removed by the pipeline
- match: { hits.hits.0._source._tmp_json_message: null }

# test malformed-JSON parsing - parsing error should be ignored and the document should be indexed with original message
- do:
index:
index: logs-generic-default
refresh: true
body:
'@timestamp': '2023-05-10'
test: 'malformed_json'
message: '{"@timestamp":"2023-05-09T16:48:34.135Z", "message":"malformed_json"}}'
- match: {result: "created"}

- do:
search:
index: logs-generic-default
body:
query:
term:
test:
value: 'malformed_json'
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.@timestamp: '2023-05-10' }
- match: { hits.hits.0._source.message: '{"@timestamp":"2023-05-09T16:48:34.135Z", "message":"malformed_json"}}' }
- match: { hits.hits.0._source._tmp_json_message: null }

# test non-string message field
- do:
index:
index: logs-generic-default
refresh: true
body:
test: 'numeric_message'
message: 42
- match: {result: "created"}

- do:
search:
index: logs-generic-default
body:
query:
term:
test:
value: 'numeric_message'
fields:
- field: 'message'
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.message: 42 }
- match: { hits.hits.0.fields.message.0: '42' }
Original file line number Diff line number Diff line change
Expand Up @@ -562,34 +562,57 @@ private void addIngestPipelinesIfMissing(ClusterState state) {
);

if (creationCheck.compareAndSet(false, true)) {
PipelineConfiguration existingPipeline = findInstalledPipeline(state, requiredPipeline.getId());
if (existingPipeline != null) {
Integer existingPipelineVersion = existingPipeline.getVersion();
if (existingPipelineVersion == null || existingPipelineVersion < requiredPipeline.getVersion()) {
logger.info(
"upgrading ingest pipeline [{}] for [{}] from version [{}] to version [{}]",
requiredPipeline.getId(),
getOrigin(),
existingPipelineVersion,
requiredPipeline.getVersion()
);
putIngestPipeline(requiredPipeline, creationCheck);
List<String> pipelineDependencies = requiredPipeline.getPipelineDependencies();
if (pipelineDependencies != null && pipelineDependenciesExist(state, pipelineDependencies) == false) {
creationCheck.set(false);
logger.trace(
"not adding ingest pipeline [{}] for [{}] because its dependencies do not exist",
requiredPipeline.getId(),
getOrigin()
);
} else {
PipelineConfiguration existingPipeline = findInstalledPipeline(state, requiredPipeline.getId());
if (existingPipeline != null) {
Integer existingPipelineVersion = existingPipeline.getVersion();
if (existingPipelineVersion == null || existingPipelineVersion < requiredPipeline.getVersion()) {
logger.info(
"upgrading ingest pipeline [{}] for [{}] from version [{}] to version [{}]",
requiredPipeline.getId(),
getOrigin(),
existingPipelineVersion,
requiredPipeline.getVersion()
);
putIngestPipeline(requiredPipeline, creationCheck);
} else {
creationCheck.set(false);
logger.debug(
"not adding ingest pipeline [{}] for [{}], because it already exists",
requiredPipeline.getId(),
getOrigin()
);
}
} else {
logger.debug(
"not adding ingest pipeline [{}] for [{}], because it already exists",
"adding ingest pipeline [{}] for [{}], because it doesn't exist",
requiredPipeline.getId(),
getOrigin()
);
creationCheck.set(false);
putIngestPipeline(requiredPipeline, creationCheck);
}
} else {
logger.debug("adding ingest pipeline [{}] for [{}], because it doesn't exist", requiredPipeline.getId(), getOrigin());
putIngestPipeline(requiredPipeline, creationCheck);
}
}
}
}

private boolean pipelineDependenciesExist(ClusterState state, List<String> dependencies) {
for (String dependency : dependencies) {
if (findInstalledPipeline(state, dependency) == null) {
return false;
}
}
return true;
}

@Nullable
private static PipelineConfiguration findInstalledPipeline(ClusterState state, String pipelineId) {
Optional<IngestMetadata> maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
Expand All @@ -21,11 +23,23 @@ public class IngestPipelineConfig {
private final int version;
private final String versionProperty;

/**
* A list of this pipeline's dependencies, for example- such referred to through a pipeline processor.
* This list is used to enforce proper ordering of pipeline installation, so that a pipeline gets installed only if all its
* dependencies are already installed.
*/
private final List<String> dependencies;

public IngestPipelineConfig(String id, String resource, int version, String versionProperty) {
this(id, resource, version, versionProperty, Collections.emptyList());
}

public IngestPipelineConfig(String id, String resource, int version, String versionProperty, List<String> dependencies) {
this.id = Objects.requireNonNull(id);
this.resource = Objects.requireNonNull(resource);
this.version = version;
this.versionProperty = Objects.requireNonNull(versionProperty);
this.dependencies = dependencies;
}

public String getId() {
Expand All @@ -40,6 +54,10 @@ public String getVersionProperty() {
return versionProperty;
}

public List<String> getPipelineDependencies() {
return dependencies;
}

public BytesReference loadConfig() {
return new BytesArray(TemplateUtils.loadTemplate(resource, String.valueOf(version), versionProperty));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"processors": [
{
"rename": {
"if": "ctx.message instanceof String && ctx.message.startsWith('{') && ctx.message.endsWith('}')",
"field": "message",
"target_field": "_tmp_json_message",
"ignore_missing": true
}
},
{
"json": {
"if": "ctx._tmp_json_message != null",
"field": "_tmp_json_message",
"add_to_root": true,
"add_to_root_conflict_strategy": "merge",
"allow_duplicate_keys": true,
"on_failure": [
{
"rename": {
"field": "_tmp_json_message",
"target_field": "message",
"ignore_missing": true
}
}
]
}
},
{
"dot_expander" : {
"if": "ctx._tmp_json_message != null",
"field": "*",
"override": true
}
},
{
"remove" : {
"field": "_tmp_json_message",
"ignore_missing": true
}
}
],
"_meta": {
"description": "automatic parsing of JSON log messages",
"managed": true
},
"version": ${xpack.stack.template.version}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ public void tearDown() throws Exception {
threadPool.shutdownNow();
}

public void testThatPipelinesAreAddedImmediately() throws Exception {
public void testThatIndependentPipelinesAreAddedImmediately() throws Exception {
DiscoveryNode node = TestDiscoveryNode.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();

AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
if (action instanceof PutPipelineAction) {
assertPutPipelineAction(calledTimes, action, request, listener);
assertPutPipelineAction(calledTimes, action, request, listener, "custom-plugin-final_pipeline");
return AcknowledgedResponse.TRUE;
} else {
// the composable template is not expected to be added, as it's dependency is not available in the cluster state
Expand All @@ -102,7 +102,34 @@ public void testThatPipelinesAreAddedImmediately() throws Exception {

ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes);
registry.clusterChanged(event);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(2)));
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}

public void testThatDependentPipelinesAreAddedIfDependenciesExist() throws Exception {
DiscoveryNode node = TestDiscoveryNode.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();

AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
if (action instanceof PutPipelineAction) {
assertPutPipelineAction(calledTimes, action, request, listener, "custom-plugin-default_pipeline");
return AcknowledgedResponse.TRUE;
} else {
// the composable template is not expected to be added, as it's dependency is not available in the cluster state
// custom-plugin-settings.json is not expected to be added as it contains a dependency on the default_pipeline
fail("client called with unexpected request: " + request.toString());
return null;
}
});

ClusterChangedEvent event = createClusterChangedEvent(
Collections.emptyMap(),
Collections.emptyMap(),
Map.of("custom-plugin-final_pipeline", 3),
nodes
);
registry.clusterChanged(event);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}

public void testThatTemplateIsAddedIfAllDependenciesExist() throws Exception {
Expand Down Expand Up @@ -138,7 +165,7 @@ public void testThatTemplateIsNotAddedIfNotAllDependenciesExist() throws Excepti
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
if (action instanceof PutPipelineAction) {
assertPutPipelineAction(calledTimes, action, request, listener);
assertPutPipelineAction(calledTimes, action, request, listener, "custom-plugin-default_pipeline");
return AcknowledgedResponse.TRUE;
} else {
// the template is not expected to be added, as the final pipeline is missing
Expand All @@ -150,7 +177,7 @@ public void testThatTemplateIsNotAddedIfNotAllDependenciesExist() throws Excepti
ClusterChangedEvent event = createClusterChangedEvent(
Collections.emptyMap(),
Collections.emptyMap(),
Map.of("custom-plugin-default_pipeline", 3),
Map.of("custom-plugin-final_pipeline", 3),
nodes
);
registry.clusterChanged(event);
Expand Down Expand Up @@ -188,7 +215,14 @@ public void testThatTemplatesAreUpgradedWhenNeeded() throws Exception {
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
if (action instanceof PutPipelineAction) {
assertPutPipelineAction(calledTimes, action, request, listener);
assertPutPipelineAction(
calledTimes,
action,
request,
listener,
"custom-plugin-default_pipeline",
"custom-plugin-final_pipeline"
);
return AcknowledgedResponse.TRUE;
} else if (action instanceof PutComponentTemplateAction) {
assertPutComponentTemplate(calledTimes, action, request, listener);
Expand Down Expand Up @@ -277,12 +311,13 @@ private static void assertPutPipelineAction(
AtomicInteger calledTimes,
ActionType<?> action,
ActionRequest request,
ActionListener<?> listener
ActionListener<?> listener,
String... pipelineIds
) {
assertThat(action, instanceOf(PutPipelineAction.class));
assertThat(request, instanceOf(PutPipelineRequest.class));
final PutPipelineRequest putRequest = (PutPipelineRequest) request;
assertThat(putRequest.getId(), oneOf("custom-plugin-default_pipeline", "custom-plugin-final_pipeline"));
assertThat(putRequest.getId(), oneOf(pipelineIds));
PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(
putRequest.getId(),
putRequest.getSource(),
Expand Down
Loading