Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
04be99a
Prototype of random sampling of original documents
masseyke Sep 5, 2025
fdbc862
merging main
masseyke Sep 5, 2025
7e4f30d
reverting accidental change
masseyke Sep 5, 2025
d6e40e3
adding files
masseyke Sep 5, 2025
7133846
making the transport action a TransportNodesAction
masseyke Sep 5, 2025
2f1122f
removing accidental change
masseyke Sep 5, 2025
9b3fa2a
Avoiding compiling scripts multiple times, adding stats
masseyke Sep 8, 2025
a8bf002
moved cluster state update logic into SamplingService
masseyke Sep 8, 2025
ca393b9
fixing tests
masseyke Sep 8, 2025
ca24196
enforcing TTL
masseyke Sep 9, 2025
3832467
[CI] Auto commit changes from spotless
Sep 9, 2025
f1ac65a
adding stats
masseyke Sep 9, 2025
ca84960
incorporating projetId, limiting results
masseyke Sep 9, 2025
ee01fba
merging main
masseyke Sep 9, 2025
c12ed1b
Merge branch 'sampling-prototype' of github.com:masseyke/elasticsearc…
masseyke Sep 9, 2025
f906bf5
fixing merge
masseyke Sep 9, 2025
2961cc5
removing some dead code
masseyke Sep 9, 2025
7ab1188
fixing ingestion
masseyke Sep 9, 2025
47da336
fixing build
masseyke Sep 9, 2025
917b9c6
Fixing IndexRequest cloning
masseyke Sep 10, 2025
7d0db72
fixing OperatorPrivilegesIT
masseyke Sep 10, 2025
0ef634c
using SoftReferences
masseyke Sep 10, 2025
c7e9487
handling bad source
masseyke Sep 11, 2025
518615b
using less of ConditionalProcessor
masseyke Sep 11, 2025
519b2d6
merging main
masseyke Sep 12, 2025
6335456
fixing compilation error after merge
masseyke Sep 12, 2025
ef920e5
moving TTL logic to master node only
masseyke Sep 12, 2025
6918c58
removing accidental commit
masseyke Sep 12, 2025
0185a1f
merging main
masseyke Sep 12, 2025
42b3b8a
Sampling on pipeline exception
masseyke Sep 12, 2025
5349303
fixing a bad merge
masseyke Sep 12, 2025
43cbc59
Merge branch 'main' into sampling-prototype
masseyke Sep 15, 2025
255f45b
performance improvements
masseyke Sep 15, 2025
8da7c26
Adding the shell of the SamplingService
masseyke Sep 15, 2025
79c77fa
fixing tests
masseyke Sep 16, 2025
b16f5c9
Merge branch 'main' into sampling-service-shell
masseyke Sep 16, 2025
042dc8c
Merge branch 'main' into sampling-service-shell
masseyke Sep 16, 2025
44bdca4
do not sample twice on exception
masseyke Sep 16, 2025
7af6e27
renaming parameter
masseyke Sep 17, 2025
f50c1f6
Merge branch 'main' into sampling-service-shell
masseyke Sep 17, 2025
6ba15dd
reducing logging
masseyke Sep 17, 2025
342cbef
do not copy an IndexRequest unless necessary
masseyke Sep 17, 2025
ca4c964
merging main
masseyke Sep 18, 2025
6784895
merging masseyke:sampling-service-shell
masseyke Sep 18, 2025
17e4d0f
merging main
masseyke Sep 18, 2025
6e524bf
fixing bad merge
masseyke Sep 18, 2025
bc68e3d
getting the prototype to work again
masseyke Sep 19, 2025
86171a8
avoiding storing IndexRequest
masseyke Sep 19, 2025
dc6ff3f
[CI] Update transport version definitions
Oct 2, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_163_0_00);
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED = def(9_164_0_00);
public static final TransportVersion SEARCH_SOURCE_EXCLUDE_INFERENCE_FIELDS_PARAM = def(9_165_0_00);
public static final TransportVersion RANDOM_SAMPLING = def(9_166_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@
import org.elasticsearch.rest.action.synonyms.RestGetSynonymsSetsAction;
import org.elasticsearch.rest.action.synonyms.RestPutSynonymRuleAction;
import org.elasticsearch.rest.action.synonyms.RestPutSynonymsAction;
import org.elasticsearch.sample.GetSampleAction;
import org.elasticsearch.sample.GetSampleStatsAction;
import org.elasticsearch.sample.PutSampleConfigAction;
import org.elasticsearch.sample.RestGetSampleAction;
import org.elasticsearch.sample.RestGetSampleStatsAction;
import org.elasticsearch.sample.RestPutSampleConfigAction;
import org.elasticsearch.sample.TransportGetSampleAction;
import org.elasticsearch.sample.TransportGetSampleStatsAction;
import org.elasticsearch.sample.TransportPutSampleConfigAction;
import org.elasticsearch.snapshots.TransportUpdateSnapshotStatusAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.TelemetryProvider;
Expand Down Expand Up @@ -813,6 +822,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetSynonymRuleAction.INSTANCE, TransportGetSynonymRuleAction.class);
actions.register(DeleteSynonymRuleAction.INSTANCE, TransportDeleteSynonymRuleAction.class);

actions.register(PutSampleConfigAction.INSTANCE, TransportPutSampleConfigAction.class);
actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class);
actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down Expand Up @@ -1040,6 +1053,10 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
registerHandler.accept(new RestPutSynonymRuleAction());
registerHandler.accept(new RestGetSynonymRuleAction());
registerHandler.accept(new RestDeleteSynonymRuleAction());

registerHandler.accept(new RestPutSampleConfigAction());
registerHandler.accept(new RestGetSampleAction(projectIdResolver));
registerHandler.accept(new RestGetSampleStatsAction(projectIdResolver));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
*/
public abstract class TransportAbstractBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class);

public static final Set<String> STREAMS_ALLOWED_PARAMS = new HashSet<>(9) {
{
add("error_trace");
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.sample.TransportPutSampleConfigAction;
import org.elasticsearch.script.ScriptMetadata;
import org.elasticsearch.snapshots.RegisteredPolicySnapshots;
import org.elasticsearch.snapshots.SnapshotsInfoService;
Expand Down Expand Up @@ -277,6 +278,12 @@ public static List<Entry> getNamedWriteables() {
PersistentTasksCustomMetadata::new,
PersistentTasksCustomMetadata::readDiffFrom
);
registerProjectCustom(
entries,
TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME,
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::new,
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::readDiffFrom
);
// Cluster scoped persistent tasks
registerMetadataCustom(
entries,
Expand Down Expand Up @@ -357,6 +364,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
ClusterPersistentTasksCustomMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.ProjectCustom.class,
new ParseField(TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME),
TransportPutSampleConfigAction.SamplingConfigCustomMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.ProjectCustom.class,
Expand Down
38 changes: 1 addition & 37 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -1390,45 +1389,10 @@ private void attemptToSampleData(
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
* before overwriting it here. We can discard it after sampling.
*/
samplingService.maybeSample(projectMetadata, indexRequest.index(), () -> {
IndexRequest original = copyIndexRequestForSampling(indexRequest);
updateIndexRequestMetadata(original, originalDocumentMetadata);
return original;
}, ingestDocument);

samplingService.maybeSample(projectMetadata, originalDocumentMetadata.getIndex(), indexRequest, ingestDocument);
}
}

/**
* Creates a copy of an IndexRequest to be used by random sampling.
* @param original The IndexRequest to be copied
* @return A copy of the IndexRequest
*/
private IndexRequest copyIndexRequestForSampling(IndexRequest original) {
IndexRequest clonedRequest = new IndexRequest(original.index());
clonedRequest.id(original.id());
clonedRequest.routing(original.routing());
clonedRequest.version(original.version());
clonedRequest.versionType(original.versionType());
clonedRequest.setPipeline(original.getPipeline());
clonedRequest.setFinalPipeline(original.getFinalPipeline());
clonedRequest.setIfSeqNo(original.ifSeqNo());
clonedRequest.setIfPrimaryTerm(original.ifPrimaryTerm());
clonedRequest.setRefreshPolicy(original.getRefreshPolicy());
clonedRequest.waitForActiveShards(original.waitForActiveShards());
clonedRequest.timeout(original.timeout());
clonedRequest.opType(original.opType());
clonedRequest.setParentTask(original.getParentTask());
clonedRequest.setRequireDataStream(original.isRequireDataStream());
clonedRequest.setRequireAlias(original.isRequireAlias());
clonedRequest.setIncludeSourceOnError(original.getIncludeSourceOnError());
BytesReference source = original.source();
if (source != null) {
clonedRequest.source(source, original.getContentType());
}
return clonedRequest;
}

private static void executePipeline(
final IngestDocument ingestDocument,
final Pipeline pipeline,
Expand Down
Loading