Skip to content

Commit d3d21ad

Browse files
authored
Handle structured log messages (#131027)
1 parent 714baea commit d3d21ad

File tree

8 files changed

+460
-1
lines changed

8 files changed

+460
-1
lines changed

docs/changelog/131027.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131027
2+
summary: Handle structured log messages
3+
area: Ingest Node
4+
type: feature
5+
issues:
6+
- 130333

docs/reference/enrich-processor/normalize-for-stream.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,87 @@ will be normalized into the following form:
153153
"trace_id": "abcdef1234567890abcdef1234567890"
154154
}
155155
```
156+
## Structured `message` field
157+
158+
If the `message` field in the ingested document is structured as a JSON, the
159+
processor will determine whether it is in ECS format or not, based on the
160+
existence or absence of the `@timestamp` field. If the `@timestamp` field is
161+
present, the `message` field will be considered to be in ECS format, and its
162+
contents will be merged into the root of the document and then normalized as
163+
described above. The `@timestamp` from the `message` field will override the
164+
root `@timestamp` field in the resulting document.
165+
If the `@timestamp` field is absent, the `message` field will be moved to
166+
the `body.structured` field as is, without any further normalization.
167+
168+
For example, if the `message` field is an ECS-JSON, as follows:
169+
170+
```json
171+
{
172+
"@timestamp": "2023-10-01T12:00:00Z",
173+
"message": "{\"@timestamp\":\"2023-10-01T12:01:00Z\",\"log.level\":\"INFO\",\"service.name\":\"my-service\",\"message\":\"The actual log message\",\"http\":{\"method\":\"GET\",\"url\":{\"path\":\"/api/v1/resource\"}}}"
174+
175+
}
176+
```
177+
it will be normalized into the following form:
178+
179+
```json
180+
{
181+
"@timestamp": "2023-10-01T12:01:00Z",
182+
"severity_text": "INFO",
183+
"body": {
184+
"text": "The actual log message"
185+
},
186+
"resource": {
187+
"attributes": {
188+
"service.name": "my-service"
189+
}
190+
},
191+
"attributes": {
192+
"http.method": "GET",
193+
"http.url.path": "/api/v1/resource"
194+
}
195+
}
196+
```
197+
198+
However, if the `message` field is not recognized as ECS format, as follows:
199+
200+
```json
201+
{
202+
"@timestamp": "2023-10-01T12:00:00Z",
203+
"log": {
204+
"level": "INFO"
205+
},
206+
"service": {
207+
"name": "my-service"
208+
},
209+
"tags": ["user-action", "api-call"],
210+
"message": "{\"root_cause\":\"Network error\",\"http\":{\"method\":\"GET\",\"url\":{\"path\":\"/api/v1/resource\"}}}"
211+
}
212+
```
213+
it will be normalized into the following form:
214+
215+
```json
216+
{
217+
"@timestamp": "2023-10-01T12:00:00Z",
218+
"severity_text": "INFO",
219+
"resource": {
220+
"attributes": {
221+
"service.name": "my-service"
222+
}
223+
},
224+
"attributes": {
225+
"tags": ["user-action", "api-call"]
226+
},
227+
"body": {
228+
"structured": {
229+
"root_cause": "Network error",
230+
"http": {
231+
"method": "GET",
232+
"url": {
233+
"path": "/api/v1/resource"
234+
}
235+
}
236+
}
237+
}
238+
}
239+
```

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.ingest.PipelineProcessor;
2222
import org.elasticsearch.ingest.Processor;
2323
import org.elasticsearch.plugins.ActionPlugin;
24+
import org.elasticsearch.plugins.ExtensiblePlugin;
2425
import org.elasticsearch.plugins.IngestPlugin;
2526
import org.elasticsearch.plugins.Plugin;
2627
import org.elasticsearch.rest.RestController;
@@ -33,7 +34,7 @@
3334

3435
import static java.util.Map.entry;
3536

36-
public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin {
37+
public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin, ExtensiblePlugin {
3738

3839
public IngestCommonPlugin() {}
3940

modules/ingest-otel/build.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ apply plugin: 'elasticsearch.internal-yaml-rest-test'
1212
esplugin {
1313
description = 'Ingest processor that normalizes ECS documents to OpenTelemetry-compatible namespaces'
1414
classname ='org.elasticsearch.ingest.otel.NormalizeForStreamPlugin'
15+
extendedPlugins = ['ingest-common']
16+
}
17+
18+
dependencies {
19+
compileOnly(project(':modules:ingest-common'))
20+
compileOnly project(':modules:lang-painless:spi')
21+
clusterModules project(':modules:ingest-common')
22+
clusterModules project(':modules:lang-painless')
1523
}
1624

1725
restResources {

modules/ingest-otel/src/main/java/module-info.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@
1010
module org.elasticsearch.ingest.otel {
1111
requires org.elasticsearch.base;
1212
requires org.elasticsearch.server;
13+
requires org.apache.logging.log4j;
14+
requires org.elasticsearch.ingest.common;
1315
}

modules/ingest-otel/src/main/java/org/elasticsearch/ingest/otel/NormalizeForStreamProcessor.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99

1010
package org.elasticsearch.ingest.otel;
1111

12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
1214
import org.elasticsearch.cluster.metadata.ProjectId;
1315
import org.elasticsearch.common.util.Maps;
1416
import org.elasticsearch.ingest.AbstractProcessor;
1517
import org.elasticsearch.ingest.IngestDocument;
1618
import org.elasticsearch.ingest.Processor;
19+
import org.elasticsearch.ingest.common.JsonProcessor;
1720

1821
import java.util.HashMap;
1922
import java.util.HashSet;
@@ -60,6 +63,8 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
6063
* OpenTelemetry-compatible fields that are renamed by the processor.
6164
*/
6265
private static final Set<String> KEEP_KEYS;
66+
private static final Logger log = LogManager.getLogger(NormalizeForStreamProcessor.class);
67+
6368
static {
6469
Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
6570
Set<String> renamedTopLevelFields = new HashSet<>();
@@ -103,6 +108,41 @@ public IngestDocument execute(IngestDocument document) {
103108

104109
// non-OTel document
105110

111+
// handling structured messages
112+
Map<String, Object> body = null;
113+
try {
114+
String message = document.getFieldValue("message", String.class, true);
115+
if (message != null) {
116+
message = message.trim();
117+
if (message.startsWith("{") && message.endsWith("}")) {
118+
// if the message is a JSON object, we assume it is a structured log
119+
Object parsedMessage = JsonProcessor.apply(message, true, true);
120+
if (parsedMessage instanceof Map) {
121+
@SuppressWarnings("unchecked")
122+
Map<String, Object> messageMap = (Map<String, Object>) parsedMessage;
123+
if (messageMap.containsKey("@timestamp")) {
124+
log.debug(
125+
"Handling structured message with @timestamp field, assuming ECS-JSON format, merging into root document"
126+
);
127+
source.remove("message");
128+
JsonProcessor.recursiveMerge(source, messageMap);
129+
} else {
130+
log.debug(
131+
"Handling structured message without @timestamp field, assuming non-ECS format, moving to 'body.structured'"
132+
);
133+
body = new HashMap<>();
134+
body.put(STRUCTURED_KEY, messageMap);
135+
source.remove("message");
136+
}
137+
} else {
138+
log.debug("Structured message is not a JSON object, keeping it as a string in 'body.text' field: {}", message);
139+
}
140+
}
141+
}
142+
} catch (Exception e) {
143+
log.warn("Failed to parse structured message, keeping it as a string in 'body.text' field: {}", e.getMessage());
144+
}
145+
106146
Map<String, Object> newAttributes = new HashMap<>();
107147
// The keep keys indicate the fields that should be kept at the top level later on when applying the namespacing.
108148
// However, at this point we need to move their original values (if they exist) to the one of the new attributes namespaces, except
@@ -117,6 +157,11 @@ public IngestDocument execute(IngestDocument document) {
117157
}
118158
}
119159

160+
// if the body is not null, it means we have a structured log that we need to move to the body.structured field.
161+
if (body != null) {
162+
source.put(BODY_KEY, body);
163+
}
164+
120165
source.put(ATTRIBUTES_KEY, newAttributes);
121166

122167
renameSpecialKeys(document);

0 commit comments

Comments
 (0)