Skip to content
2 changes: 1 addition & 1 deletion modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies {

restResources {
restApi {
include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search'
include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search', 'simulate'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,45 @@ teardown:
ingest.processor_grok: {}
- length: { patterns: 318 }
- match: { patterns.PATH: "(?:%{UNIXPATH}|%{WINPATH})" }


---
"Test simulate with invalid GROK pattern":
- skip:
features: headers
- do:
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
pipeline: "invalid-grok"
body: >
{
"docs": [
{
"_index": "index-1",
"_source": {
"foo": "bar"
}
}
],
"pipeline_substitutions": {
"invalid-grok": {
"description": "invalid grok pattern",
"processors": [
{
"grok": {
"field": "field",
"patterns": [
"%{INVALID_PATTERN:field}"
]
}
}
]
}
}
}
- match: { status: 400 }
- match: { error.reason: "[patterns] Invalid regex pattern found in: [%{INVALID_PATTERN:field}]. Unable to find pattern [INVALID_PATTERN] in Grok's pattern dictionary" }
- match: { error.property_name: "patterns" }
- match: { error.processor_type: "grok" }
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ setup:
default_pipeline: "my-pipeline"

- do:
catch: "request"
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
Expand Down Expand Up @@ -661,7 +661,8 @@ setup:
}
}
}
- match: { status: 500 }
- match: { status: 400 }
- match: { error.reason: "No processor type exists with name [non-existent-processor]" }

---
"Test index in path":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand All @@ -28,6 +29,8 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) {
if (request instanceof SimulateBulkRequest simulateBulkRequest) {
try {
pipelineSubstitutions = getPipelineSubstitutions(simulateBulkRequest.getPipelineSubstitutions(), ingestService);
} catch (ElasticsearchException elasticEx) {
throw elasticEx;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.client.internal.Client;
Expand All @@ -34,6 +35,8 @@
import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -46,6 +49,10 @@ private static <K, V> Map<K, V> newHashMap(K key, V value) {
return map;
}

private static <K, V> Map<K, V> emptyMutableHashMap() {
return new HashMap<>(0);
}

public void testGetPipeline() {
PipelineConfiguration pipelineConfiguration = new PipelineConfiguration("pipeline1", new BytesArray("""
{"processors": [{"processor1" : {}}]}"""), XContentType.JSON);
Expand Down Expand Up @@ -118,6 +125,31 @@ public void testGetPipeline() {
}
}

public void testRethrowingOfElasticParseExceptionFromProcessors() {
final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration("pipeline1", new BytesArray("""
{"processors": []}"""), XContentType.JSON);
final IngestMetadata ingestMetadata = new IngestMetadata(Map.of("pipeline1", pipelineConfiguration));
final Processor.Factory factoryThatThrowsElasticParseException = (factory, tag, description, config, projectId) -> {
throw new ElasticsearchParseException("house: it's never lupus");
};
final Map<String, Processor.Factory> processors = Map.of("parse_exception_processor", factoryThatThrowsElasticParseException);
final var projectId = randomProjectIdOrDefault();
IngestService ingestService = createWithProcessors(projectId, processors);
ingestService.innerUpdatePipelines(projectId, ingestMetadata);
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(
newHashMap("pipeline1", newHashMap("processors", List.of(Map.of("parse_exception_processor", emptyMutableHashMap())))),
Map.of(),
Map.of(),
Map.of()
);

final ElasticsearchParseException ex = assertThrows(
ElasticsearchParseException.class,
() -> new SimulateIngestService(ingestService, simulateBulkRequest)
);
assertThat(ex.getMessage(), is("house: it's never lupus"));
}

private static IngestService createWithProcessors(ProjectId projectId, Map<String, Processor.Factory> processors) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
Expand Down