Skip to content

Commit 6784895

Browse files
committed
merging masseyke:sampling-service-shell
2 parents ca4c964 + 342cbef commit 6784895

File tree

9 files changed

+199
-53
lines changed

9 files changed

+199
-53
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
9090
protected final Executor systemCoordinationExecutor;
9191
private final ActionType<BulkResponse> bulkAction;
9292
protected final FeatureService featureService;
93-
private final SamplingService samplingService;
93+
protected final SamplingService samplingService;
9494

9595
public TransportAbstractBulkAction(
9696
ActionType<BulkResponse> action,
@@ -207,7 +207,7 @@ private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executo
207207
executor.execute(new ActionRunnable<>(releasingListener) {
208208
@Override
209209
protected void doRun() throws IOException {
210-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener, true);
210+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener, false);
211211
}
212212
});
213213
}
@@ -217,7 +217,7 @@ private boolean applyPipelines(
217217
BulkRequest bulkRequest,
218218
Executor executor,
219219
ActionListener<BulkResponse> listener,
220-
boolean firstTime
220+
boolean haveRunIngestService
221221
) throws IOException {
222222
boolean hasIndexRequestsWithPipelines = false;
223223
ClusterState state = clusterService.state();
@@ -311,8 +311,11 @@ private boolean applyPipelines(
311311
}
312312
});
313313
return true;
314-
} else if (samplingService != null && firstTime) {
315-
// else sample, but only if this is the first time through. Otherwise we had pipelines and sampled in IngestService
314+
} else if (haveRunIngestService == false && samplingService != null && samplingService.atLeastOneSampleConfigured()) {
315+
/*
316+
* Else ample only if this request has not passed through IngestService::executeBulkRequest. Otherwise, some request within the
317+
* bulk had pipelines and we sampled in IngestService already.
318+
*/
316319
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
317320
if (actionRequest instanceof IndexRequest ir) {
318321
samplingService.maybeSample(project, ir);
@@ -353,7 +356,7 @@ private void processBulkIndexIngestRequest(
353356
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
354357
@Override
355358
protected void doRun() throws IOException {
356-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener, false);
359+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener, true);
357360
}
358361

359362
@Override
@@ -432,7 +435,7 @@ private void applyPipelinesAndDoInternalExecute(
432435
BulkRequest bulkRequest,
433436
Executor executor,
434437
ActionListener<BulkResponse> listener,
435-
boolean firstTime
438+
boolean haveRunIngestService
436439
) throws IOException {
437440
final long relativeStartTimeNanos = relativeTimeNanos();
438441

@@ -450,7 +453,7 @@ private void applyPipelinesAndDoInternalExecute(
450453

451454
var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
452455

453-
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, firstTime) == false) {
456+
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, haveRunIngestService) == false) {
454457
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
455458
}
456459
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@
8383
import org.elasticsearch.threadpool.Scheduler;
8484
import org.elasticsearch.threadpool.ThreadPool;
8585

86-
import java.io.IOException;
8786
import java.time.Instant;
8887
import java.time.InstantSource;
8988
import java.util.ArrayList;
@@ -981,6 +980,7 @@ protected void doRun() {
981980
Pipeline firstPipeline = pipelines.peekFirst();
982981
if (pipelines.hasNext() == false) {
983982
i++;
983+
samplingService.maybeSample(state.metadata().projects().get(pipelines.projectId()), indexRequest);
984984
continue;
985985
}
986986

@@ -1199,11 +1199,11 @@ private void executePipelines(
11991199
}
12001200
};
12011201
AtomicBoolean haveAttemptedSampling = new AtomicBoolean(false);
1202+
final var project = state.metadata().projects().get(pipelines.projectId());
12021203
try {
12031204
if (pipeline == null) {
12041205
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
12051206
}
1206-
final var project = state.metadata().projects().get(pipelines.projectId());
12071207
if (project == null) {
12081208
throw new IllegalArgumentException("project with id [" + pipelines.projectId() + "] does not exist");
12091209
}
@@ -1355,37 +1355,22 @@ private void executePipelines(
13551355
if (newPipelines.hasNext()) {
13561356
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener, originalDocumentMetadata);
13571357
} else {
1358-
try {
1359-
/*
1360-
* At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results.
1361-
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
1362-
* before overwriting it here. We can discard it after sampling.
1363-
*/
1364-
haveAttemptedSampling.set(true);
1365-
IndexRequest original = copyIndexRequest(indexRequest);
1366-
updateIndexRequestMetadata(original, originalDocumentMetadata);
1367-
samplingService.maybeSample(project, original, ingestDocument);
1368-
} catch (IOException ex) {
1369-
logger.warn("unable to sample data");
1370-
}
1371-
1358+
/*
1359+
* At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results.
1360+
* This is our chance to sample with both the original document and all changes.
1361+
*/
1362+
haveAttemptedSampling.set(true);
1363+
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
13721364
updateIndexRequestSource(indexRequest, ingestDocument);
13731365
cacheRawTimestamp(indexRequest, ingestDocument);
13741366
listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded!
13751367
}
13761368
});
13771369
} catch (Exception e) {
1378-
try {
1379-
if (haveAttemptedSampling.get() == false) {
1380-
IndexRequest original = copyIndexRequest(indexRequest);
1381-
updateIndexRequestMetadata(original, originalDocumentMetadata);
1382-
samplingService.maybeSample(state.projectState(projectResolver.getProjectId()).metadata(), original, ingestDocument);
1383-
}
1384-
} catch (IOException ex) {
1385-
logger.warn("unable to sample data");
1370+
if (haveAttemptedSampling.get() == false) {
1371+
// It is possible that an exception happened after we sampled. We do not want to sample the same document twice.
1372+
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
13861373
}
1387-
// Maybe also sample here? Or put it in the exceptionHandler? We want to make sure the exception didn't come of out the
1388-
// listener though.
13891374
logger.debug(
13901375
() -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()),
13911376
e
@@ -1394,20 +1379,49 @@ private void executePipelines(
13941379
}
13951380
}
13961381

1397-
private IndexRequest copyIndexRequest(IndexRequest original) throws IOException {
1382+
private void attemptToSampleData(
1383+
ProjectMetadata projectMetadata,
1384+
IndexRequest indexRequest,
1385+
IngestDocument ingestDocument,
1386+
Metadata originalDocumentMetadata
1387+
) {
1388+
if (samplingService != null && samplingService.atLeastOneSampleConfigured()) {
1389+
/*
1390+
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
1391+
* before overwriting it here. We can discard it after sampling.
1392+
*/
1393+
samplingService.maybeSample(projectMetadata, indexRequest.index(), () -> {
1394+
IndexRequest original = copyIndexRequestForSampling(indexRequest);
1395+
updateIndexRequestMetadata(original, originalDocumentMetadata);
1396+
return original;
1397+
}, ingestDocument);
1398+
1399+
}
1400+
}
1401+
1402+
/**
1403+
* Creates a copy of an IndexRequest to be used by random sampling.
1404+
* @param original The IndexRequest to be copied
1405+
* @return A copy of the IndexRequest
1406+
*/
1407+
private IndexRequest copyIndexRequestForSampling(IndexRequest original) {
13981408
IndexRequest clonedRequest = new IndexRequest(original.index());
13991409
clonedRequest.id(original.id());
14001410
clonedRequest.routing(original.routing());
14011411
clonedRequest.version(original.version());
14021412
clonedRequest.versionType(original.versionType());
14031413
clonedRequest.setPipeline(original.getPipeline());
1414+
clonedRequest.setFinalPipeline(original.getFinalPipeline());
14041415
clonedRequest.setIfSeqNo(original.ifSeqNo());
14051416
clonedRequest.setIfPrimaryTerm(original.ifPrimaryTerm());
14061417
clonedRequest.setRefreshPolicy(original.getRefreshPolicy());
14071418
clonedRequest.waitForActiveShards(original.waitForActiveShards());
14081419
clonedRequest.timeout(original.timeout());
14091420
clonedRequest.opType(original.opType());
14101421
clonedRequest.setParentTask(original.getParentTask());
1422+
clonedRequest.setRequireDataStream(original.isRequireDataStream());
1423+
clonedRequest.setRequireAlias(original.isRequireAlias());
1424+
clonedRequest.setIncludeSourceOnError(original.getIncludeSourceOnError());
14111425
BytesReference source = original.source();
14121426
if (source != null) {
14131427
clonedRequest.source(source, original.getContentType());

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,13 @@ public void deleteSampleConfiguration(ProjectId projectId, String index) {
184184
);
185185
}
186186

187+
/**
188+
* Potentially samples the given indexRequest, depending on the existing sampling configuration.
189+
* @param projectMetadata Used to get the sampling configuration
190+
* @param indexRequest The raw request to potentially sample
191+
*/
187192
public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest) {
188-
maybeSample(projectMetadata, indexRequest, () -> {
193+
maybeSample(projectMetadata, indexRequest.index(), () -> indexRequest, () -> {
189194
Map<String, Object> sourceAsMap;
190195
try {
191196
sourceAsMap = indexRequest.sourceAsMap();
@@ -204,19 +209,20 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque
204209
});
205210
}
206211

207-
public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) {
208-
maybeSample(projectMetadata, indexRequest, () -> ingestDocument);
209-
}
210-
211-
private void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, Supplier<IngestDocument> ingestDocumentSupplier) {
212+
private void maybeSample(
213+
ProjectMetadata projectMetadata,
214+
String indexName,
215+
Supplier<IndexRequest> indexRequestSupplier,
216+
Supplier<IngestDocument> ingestDocumentSupplier
217+
) {
212218
long startTime = relativeNanoTimeSupplier.getAsLong();
213219
TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = projectMetadata.custom(
214220
TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME
215221
);
216222
ProjectId projectId = projectMetadata.id();
217223
if (samplingConfig != null) {
218224
String samplingIndex = samplingConfig.indexName;
219-
if (samplingIndex.equals(indexRequest.index())) {
225+
if (samplingIndex.equals(indexName)) {
220226
SoftReference<SampleInfo> sampleInfoReference = samples.compute(
221227
new ProjectIndex(projectId, samplingIndex),
222228
(k, v) -> v == null || v.get() == null
@@ -266,6 +272,7 @@ private void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequ
266272
sampleInfo.stats
267273
)) {
268274
stats.timeEvaluatingCondition.add((relativeNanoTimeSupplier.getAsLong() - conditionStartTime));
275+
IndexRequest indexRequest = indexRequestSupplier.get();
269276
indexRequest.incRef();
270277
if (indexRequest.source() instanceof ReleasableBytesReference releaseableSource) {
271278
releaseableSource.incRef();
@@ -371,6 +378,25 @@ private boolean isClusterServiceStoppedOrClosed() {
371378
return state == Lifecycle.State.STOPPED || state == Lifecycle.State.CLOSED;
372379
}
373380

381+
/**
382+
*
383+
* @param projectMetadata Used to get the sampling configuration
384+
* @param indexRequestSupplier A supplier for the raw request to potentially sample
385+
* @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration
386+
*/
387+
public void maybeSample(
388+
ProjectMetadata projectMetadata,
389+
String indexName,
390+
Supplier<IndexRequest> indexRequestSupplier,
391+
IngestDocument ingestDocument
392+
) {
393+
maybeSample(projectMetadata, indexName, indexRequestSupplier, () -> ingestDocument);
394+
}
395+
396+
public boolean atLeastOneSampleConfigured() {
397+
return false; // TODO Return true if there is at least one sample in the cluster state
398+
}
399+
374400
@Override
375401
public void clusterChanged(ClusterChangedEvent event) {
376402
final boolean prevIsMaster = this.isMaster;
@@ -609,4 +635,5 @@ static class DeleteSampleConfigTask extends AckedBatchedClusterStateUpdateTask {
609635
}
610636

611637
record ProjectIndex(ProjectId projectId, String indexName) {};
638+
612639
}

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,16 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
174174
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
175175
}
176176
},
177-
mock(SamplingService.class)
177+
initializeSamplingService()
178178
);
179179
}
180180

181+
private static SamplingService initializeSamplingService() {
182+
SamplingService samplingService = mock(SamplingService.class);
183+
when(samplingService.atLeastOneSampleConfigured()).thenReturn(true);
184+
return samplingService;
185+
}
186+
181187
@Override
182188
void executeBulk(
183189
Task task,

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.SortedMap;
8383
import java.util.TreeMap;
8484
import java.util.concurrent.CountDownLatch;
85+
import java.util.concurrent.ExecutionException;
8586
import java.util.concurrent.TimeUnit;
8687
import java.util.concurrent.atomic.AtomicReference;
8788

@@ -93,6 +94,8 @@
9394
import static org.hamcrest.Matchers.nullValue;
9495
import static org.mockito.ArgumentMatchers.any;
9596
import static org.mockito.Mockito.mock;
97+
import static org.mockito.Mockito.times;
98+
import static org.mockito.Mockito.verify;
9699
import static org.mockito.Mockito.when;
97100

98101
public class TransportBulkActionTests extends ESTestCase {
@@ -154,10 +157,16 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
154157
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
155158
}
156159
},
157-
mock(SamplingService.class)
160+
initializeSamplingService()
158161
);
159162
}
160163

164+
private static SamplingService initializeSamplingService() {
165+
SamplingService samplingService = mock(SamplingService.class);
166+
when(samplingService.atLeastOneSampleConfigured()).thenReturn(true);
167+
return samplingService;
168+
}
169+
161170
@Override
162171
void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
163172
indexCreated = true;
@@ -735,6 +744,19 @@ public void testFailuresDuringPrerequisiteActions() throws InterruptedException
735744
assertNull(bulkRequest.requests.get(2));
736745
}
737746

747+
public void testSampling() throws ExecutionException, InterruptedException {
748+
// This test makes sure that the sampling service is called once per IndexRequest
749+
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id1").source(Collections.emptyMap()))
750+
.add(new IndexRequest("index").id("id2").source(Collections.emptyMap()))
751+
.add(new DeleteRequest("index2").id("id3"));
752+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
753+
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
754+
future.get();
755+
assertTrue(bulkAction.indexCreated);
756+
// We expect 2 sampling calls since there are 2 index requests:
757+
verify(bulkAction.samplingService, times(2)).maybeSample(any(), any());
758+
}
759+
738760
private BulkRequest buildBulkRequest(List<String> indices) {
739761
BulkRequest request = new BulkRequest();
740762
for (String index : indices) {

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.index.IndexVersions;
4040
import org.elasticsearch.index.IndexingPressure;
4141
import org.elasticsearch.indices.EmptySystemIndices;
42+
import org.elasticsearch.ingest.SamplingService;
4243
import org.elasticsearch.tasks.Task;
4344
import org.elasticsearch.test.ESTestCase;
4445
import org.elasticsearch.test.VersionUtils;
@@ -66,6 +67,7 @@
6667
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
6768
import static org.hamcrest.CoreMatchers.equalTo;
6869
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
70+
import static org.mockito.Mockito.mock;
6971

7072
public class TransportBulkActionTookTests extends ESTestCase {
7173

@@ -268,7 +270,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
268270
return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature);
269271
}
270272
},
271-
null
273+
mock(SamplingService.class)
272274
);
273275
}
274276
}

0 commit comments

Comments
 (0)