Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -161,7 +160,7 @@ public DissectParser(String pattern, String appendSeparator) {
}

referenceCount = referenceGroupings.size() * 2;
this.matchPairs = Collections.unmodifiableList(dissectPairs);
this.matchPairs = List.copyOf(dissectPairs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -264,11 +262,11 @@ public KeyValueProcessor create(
Set<String> excludeKeys = null;
List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
if (includeKeysList != null) {
includeKeys = Collections.unmodifiableSet(Sets.newHashSet(includeKeysList));
includeKeys = Set.copyOf(includeKeysList);
}
List<String> excludeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "exclude_keys");
if (excludeKeysList != null) {
excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
excludeKeys = Set.copyOf(excludeKeysList);
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand Down Expand Up @@ -126,7 +125,7 @@ private String getDirection(IngestDocument d) throws Exception {
}
networks.addAll(stringList);
} else {
networks = internalNetworks.stream().map(network -> d.renderTemplate(network)).collect(Collectors.toList());
networks = internalNetworks.stream().map(network -> d.renderTemplate(network)).toList();
}

String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

Expand All @@ -41,8 +40,8 @@ public final class RemoveProcessor extends AbstractProcessor {
boolean ignoreMissing
) {
super(tag, description);
this.fieldsToRemove = new ArrayList<>(fieldsToRemove);
this.fieldsToKeep = new ArrayList<>(fieldsToKeep);
this.fieldsToRemove = List.copyOf(fieldsToRemove);
this.fieldsToKeep = List.copyOf(fieldsToKeep);
this.ignoreMissing = ignoreMissing;
}

Expand Down Expand Up @@ -124,7 +123,7 @@ public RemoveProcessor create(
private List<TemplateScript.Factory> getTemplates(String processorTag, Map<String, Object> config, String propertyName) {
return getFields(processorTag, config, propertyName).stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, propertyName, f, scriptService))
.collect(Collectors.toList());
.toList();
}

private static List<String> getFields(String processorTag, Map<String, Object> config, String propertyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void testCreate() throws Exception {
assertThat(setProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Map.of()), equalTo("value1"));
assertThat(setProcessor.isOverrideEnabled(), equalTo(true));
assertThat(setProcessor.isIgnoreEmptyValue(), equalTo(false));
}

public void testCreateWithOverride() throws Exception {
Expand All @@ -57,6 +58,20 @@ public void testCreateWithOverride() throws Exception {
assertThat(setProcessor.isOverrideEnabled(), equalTo(overrideEnabled));
}

public void testCreateWithIgnoreEmptyValue() throws Exception {
boolean ignoreEmptyValueEnabled = randomBoolean();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("value", "value1");
config.put("ignore_empty_value", ignoreEmptyValueEnabled);
String processorTag = randomAlphaOfLength(10);
SetProcessor setProcessor = factory.create(null, processorTag, null, config);
assertThat(setProcessor.getTag(), equalTo(processorTag));
assertThat(setProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Map.of()), equalTo("value1"));
assertThat(setProcessor.isIgnoreEmptyValue(), equalTo(ignoreEmptyValueEnabled));
}

public void testCreateNoFieldPresent() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("value", "value1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -388,7 +387,7 @@ public GeoIpDatabase get() throws IOException {
}

public static final class Factory implements Processor.Factory {
static final Set<Property> DEFAULT_CITY_PROPERTIES = Collections.unmodifiableSet(
static final Set<Property> DEFAULT_CITY_PROPERTIES = Set.copyOf(
EnumSet.of(
Property.CONTINENT_NAME,
Property.COUNTRY_NAME,
Expand All @@ -399,10 +398,10 @@ public static final class Factory implements Processor.Factory {
Property.LOCATION
)
);
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = Collections.unmodifiableSet(
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = Set.copyOf(
EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_NAME, Property.COUNTRY_ISO_CODE)
);
static final Set<Property> DEFAULT_ASN_PROPERTIES = Collections.unmodifiableSet(
static final Set<Property> DEFAULT_ASN_PROPERTIES = Set.copyOf(
EnumSet.of(Property.IP, Property.ASN, Property.ORGANIZATION_NAME, Property.NETWORK)
);

Expand Down Expand Up @@ -455,7 +454,7 @@ public Processor create(
throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage());
}
}
properties = Collections.unmodifiableSet(modifiableProperties);
properties = Set.copyOf(modifiableProperties);
} else {
if (databaseType.endsWith(CITY_DB_SUFFIX)) {
properties = DEFAULT_CITY_PROPERTIES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,28 @@

public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
public void testHasAtLeastOneGeoipProcessor() {
Map<String, PipelineConfiguration> configs = new HashMap<>();
IngestMetadata ingestMetadata = new IngestMetadata(configs);
final IngestMetadata[] ingestMetadata = new IngestMetadata[1];
ClusterState clusterState = mock(ClusterState.class);
Metadata metadata = mock(Metadata.class);
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
when(metadata.custom(IngestMetadata.TYPE)).thenAnswer(invocationOnmock -> ingestMetadata[0]);
when(clusterState.getMetadata()).thenReturn(metadata);
List<String> expectHitsInputs = getPipelinesWithGeoIpProcessors();
List<String> expectMissesInputs = getPipelinesWithoutGeoIpProcessors();
{
// Test that hasAtLeastOneGeoipProcessor returns true for any pipeline with a geoip processor:
for (String pipeline : expectHitsInputs) {
configs.clear();
configs.put("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON));
ingestMetadata[0] = new IngestMetadata(
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
}
}
{
// Test that hasAtLeastOneGeoipProcessor returns false for any pipeline without a geoip processor:
for (String pipeline : expectMissesInputs) {
configs.clear();
configs.put("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON));
ingestMetadata[0] = new IngestMetadata(
Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
);
assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
}
}
Expand All @@ -54,7 +55,7 @@ public void testHasAtLeastOneGeoipProcessor() {
* Now test that hasAtLeastOneGeoipProcessor returns true for a mix of pipelines, some which have geoip processors and some
* which do not:
*/
configs.clear();
Map<String, PipelineConfiguration> configs = new HashMap<>();
for (String pipeline : expectHitsInputs) {
String id = randomAlphaOfLength(20);
configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
Expand All @@ -63,6 +64,7 @@ public void testHasAtLeastOneGeoipProcessor() {
String id = randomAlphaOfLength(20);
configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
}
ingestMetadata[0] = new IngestMetadata(configs);
assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List
this.processors = List.copyOf(processors);
this.onFailureProcessors = List.copyOf(onFailureProcessors);
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = processors.stream().map(p -> new Tuple<>(p, new IngestMetric())).toList();
this.processorsWithMetrics = List.copyOf(processors.stream().map(p -> new Tuple<>(p, new IngestMetric())).toList());
this.isAsync = flattenProcessors().stream().anyMatch(Processor::isAsync);
}

Expand Down
32 changes: 6 additions & 26 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,22 +428,6 @@ public void appendFieldValue(String path, Object value, boolean allowDuplicates)
setFieldValue(path, value, true, allowDuplicates);
}

/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
* @param valueSource The value source that will produce the value or values to append to the existing ones
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource) {
appendFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), valueSource.copyAndResolve(templateModel));
}

/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
Expand Down Expand Up @@ -476,7 +460,7 @@ public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSour
* item identified by the provided path.
*/
public void setFieldValue(String path, Object value) {
setFieldValue(path, value, false);
setFieldValue(path, value, false, true);
}

/**
Expand All @@ -489,7 +473,7 @@ public void setFieldValue(String path, Object value) {
* item identified by the provided path.
*/
public void setFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource) {
setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), valueSource.copyAndResolve(templateModel), false);
setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), valueSource.copyAndResolve(templateModel));
}

/**
Expand All @@ -514,7 +498,7 @@ public void setFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource
}
}

setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value, false);
setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value);
}

/**
Expand All @@ -539,11 +523,7 @@ public void setFieldValue(TemplateScript.Factory fieldPathTemplate, Object value
}
}

setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value, false);
}

private void setFieldValue(String path, Object value, boolean append) {
setFieldValue(path, value, append, true);
setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value);
}

private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
Expand Down Expand Up @@ -969,11 +949,11 @@ private FieldPath(String path) {
String newPath;
if (path.startsWith(INGEST_KEY_PREFIX)) {
initialContext = ingestMetadata;
newPath = path.substring(INGEST_KEY_PREFIX.length(), path.length());
newPath = path.substring(INGEST_KEY_PREFIX.length());
} else {
initialContext = ctxMap;
if (path.startsWith(SOURCE_PREFIX)) {
newPath = path.substring(SOURCE_PREFIX.length(), path.length());
newPath = path.substring(SOURCE_PREFIX.length());
} else {
newPath = path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -52,7 +51,7 @@ public final class IngestMetadata implements Metadata.Custom {
private final Map<String, PipelineConfiguration> pipelines;

public IngestMetadata(Map<String, PipelineConfiguration> pipelines) {
this.pipelines = Collections.unmodifiableMap(pipelines);
this.pipelines = Map.copyOf(pipelines);
}

@Override
Expand All @@ -76,7 +75,7 @@ public IngestMetadata(StreamInput in) throws IOException {
PipelineConfiguration pipeline = PipelineConfiguration.readFrom(in);
pipelines.put(pipeline.getId(), pipeline);
}
this.pipelines = Collections.unmodifiableMap(pipelines);
this.pipelines = Map.copyOf(pipelines);
}

@Override
Expand Down
6 changes: 0 additions & 6 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,6 @@ public static Pipeline create(
*/
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
metrics.preIngest();
compoundProcessor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
Expand Down