Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/134524.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134524
summary: Add support for flexible access pattern to `NormalizeForStreamProcessor`
area: Ingest Node
type: bug
issues: []
2 changes: 1 addition & 1 deletion modules/ingest-otel/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ dependencies {

restResources {
restApi {
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest'
include '_common', 'indices', 'index', 'cluster', 'nodes', 'get', 'ingest', 'capabilities'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
/**
* Mapping of ECS field names to their corresponding OpenTelemetry-compatible counterparts.
*/
private static final Map<String, String> RENAME_KEYS = Map.ofEntries(
static final Map<String, String> RENAME_KEYS = Map.ofEntries(
entry("span.id", "span_id"),
entry("message", "body.text"),
entry("log.level", "severity_text"),
Expand All @@ -69,12 +69,12 @@ public class NormalizeForStreamProcessor extends AbstractProcessor {
Set<String> keepKeys = new HashSet<>(Set.of("@timestamp", "attributes", "resource"));
Set<String> renamedTopLevelFields = new HashSet<>();
for (String value : RENAME_KEYS.values()) {
// if the renamed field is nested, we only need to know the top level field
int dotIndex = value.indexOf('.');
if (dotIndex != -1) {
renamedTopLevelFields.add(value.substring(0, dotIndex));
} else {
renamedTopLevelFields.add(value);
// if the renamed field is nested, generate the full list of paths that it could be rooted under
String workingKey = null;
String[] values = value.split("\\.");
for (String part : values) {
workingKey = workingKey == null ? part : workingKey + "." + part;
renamedTopLevelFields.add(workingKey);
}
}
keepKeys.addAll(renamedTopLevelFields);
Expand Down Expand Up @@ -244,7 +244,29 @@ static boolean isOTelDocument(Map<String, Object> source) {
}

/**
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts, based on the {@code RENAME_KEYS} map.
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts in a way that is compatible with the
* current access pattern on the IngestDocument.
*
* <p>This method performs the following operations:
* <ul>
* <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
* <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
* {@code RENAME_KEYS} map and the same value.</li>
* <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
* </ul>
*
* @param document the document to process
*/
static void renameSpecialKeys(IngestDocument document) {
switch (document.getCurrentAccessPatternSafe()) {
case CLASSIC -> renameSpecialKeysClassic(document);
case FLEXIBLE -> renameSpecialKeysFlexible(document);
}
}

/**
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
* {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#CLASSIC} access pattern and based on the {@code RENAME_KEYS} map.
*
* <p>This method performs the following operations:
* <ul>
Expand All @@ -257,7 +279,7 @@ static boolean isOTelDocument(Map<String, Object> source) {
*
* @param document the document to process
*/
static void renameSpecialKeys(IngestDocument document) {
static void renameSpecialKeysClassic(IngestDocument document) {
RENAME_KEYS.forEach((nonOtelName, otelName) -> {
boolean fieldExists = false;
Object value = null;
Expand All @@ -272,7 +294,7 @@ static void renameSpecialKeys(IngestDocument document) {
String parentName = nonOtelName.substring(0, lastDot);
// parent should never be null and must be a map if we are here
@SuppressWarnings("unchecked")
Map<String, Object> parent = (Map<String, Object>) document.getFieldValue(parentName, Map.class);
Map<String, Object> parent = document.getFieldValue(parentName, Map.class);
if (parent.isEmpty()) {
document.removeField(parentName);
} else {
Expand All @@ -294,6 +316,76 @@ static void renameSpecialKeys(IngestDocument document) {
});
}

/**
* Renames specific ECS keys in the given document to their OpenTelemetry-compatible counterparts using logic compatible with the
* {@link org.elasticsearch.ingest.IngestPipelineFieldAccessPattern#FLEXIBLE} access pattern and based on the {@code RENAME_KEYS} map.
*
* <p>This method performs the following operations:
* <ul>
* <li>For each key in the {@code RENAME_KEYS} map, it checks if a corresponding field exists in the document.</li>
* <li>If the field exists, it removes it from the document and adds a new field with the corresponding name from the
* {@code RENAME_KEYS} map and the same value. If a field's parent objects do not exist, it will progressively build
* each parent object instead of concatenating the field names together.</li>
* <li>If the key is nested (contains dots), it recursively removes empty parent fields after renaming.</li>
* </ul>
*
* @param document the document to process
*/
static void renameSpecialKeysFlexible(IngestDocument document) {
RENAME_KEYS.forEach((nonOtelName, otelName) -> {
boolean fieldExists = false;
Object value = null;
if (document.hasField(nonOtelName)) {
// Dotted fields are treated the same as normalized fields in flexible mode
fieldExists = true;
value = document.getFieldValue(nonOtelName, Object.class, true);
document.removeField(nonOtelName);
// recursively remove empty parent fields
int lastDot = nonOtelName.lastIndexOf('.');
while (lastDot > 0) {
String parentName = nonOtelName.substring(0, lastDot);
// In flexible mode, dotted field names can be removed. Parent paths may not exist since they might be included
// by the dotted field removal (e.g. For the doc {a:{b.c:1}}, removing a.b.c will not leave an a.b field because
// there is no a.b field to start with.
@SuppressWarnings("unchecked")
Map<String, Object> parent = document.getFieldValue(parentName, Map.class, true);
if (parent != null) {
if (parent.isEmpty()) {
document.removeField(parentName);
} else {
break;
}
}
lastDot = parentName.lastIndexOf('.');
}
}
if (fieldExists) {
// Flexible mode creates dotted field names when parent fields are not present. We expect the rename keys to be
// normalized after processing, so we progressively build each field's parents if it's a dotted field.
Map<String, Object> source = document.getSource();
String remainingPath = otelName;
int dot = remainingPath.indexOf('.');
while (dot > 0) {
// Dotted field, emulate classic mode by building out each parent object
String fieldName = remainingPath.substring(0, dot);
remainingPath = remainingPath.substring(dot + 1);
Object existingParent = source.get(fieldName);
if (existingParent instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> castAssignment = (Map<String, Object>) existingParent;
source = castAssignment;
} else {
Map<String, Object> map = new HashMap<>();
source.put(fieldName, map);
source = map;
}
dot = remainingPath.indexOf('.');
}
source.put(remainingPath, value);
}
});
}

private static void moveResourceAttributes(Map<String, Object> attributes, Map<String, Object> resourceAttributes) {
Set<String> ecsResourceFields = EcsOTelResourceAttributes.LATEST;
Iterator<Map.Entry<String, Object>> attributeIterator = attributes.entrySet().iterator();
Expand Down
Loading