Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121240.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121240
summary: Implement runtime skip_unavailable=true
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

public class FailingFieldPlugin extends Plugin implements ScriptPlugin {

public static final String FAILING_FIELD_LANG = "failing_field";

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "failing_field";
return FAILING_FIELD_LANG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() {
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
return localCluster(remoteCluster, true);
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
return ElasticsearchCluster.local()
.name(LOCAL_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
Expand All @@ -41,6 +45,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
.setting("node.roles", "[data,ingest,master,remote_cluster_client]")
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
.setting("cluster.remote.connections_per_cluster", "1")
.setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString())
.shared(true)
.setting("cluster.routing.rebalance.enable", "none")
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.ccq;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

// Duplicate of EsqlRestValidationIT test where skip_unavailable is set to false
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class EsqlRestValidationSkipUnFalseIT extends EsqlRestValidationIT {
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);

@Override
protected String getTestRestCluster() {
return localCluster.getHttpAddresses();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -63,6 +64,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class);
plugins.add(SimplePauseFieldPlugin.class);
plugins.add(FailingPauseFieldPlugin.class);
plugins.add(FailingFieldPlugin.class);
plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class);
return plugins;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,17 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertTrue(executionInfo.isCrossClusterSearch());

boolean hasPartials = false;
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis()));
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
|| cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
hasPartials = true;
}
}
assertThat(executionInfo.isPartial(), equalTo(hasPartials));
}

private void setSkipUnavailable(String clusterAlias, boolean skip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -433,6 +436,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
Set<String> expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet());
assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases));

boolean hasSkipped = false;
for (ExpectedCluster expectedCluster : expected) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias());
String msg = cluster.getClusterAlias();
Expand All @@ -451,10 +455,12 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class));
String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg));
hasSkipped = true;
}
// currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
assertThat(msg, cluster.getFailedShards(), equalTo(0));
}
assertThat(executionInfo.isPartial(), equalTo(hasSkipped));
}

public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws Exception {
Expand Down Expand Up @@ -500,6 +506,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -556,6 +563,7 @@ public void testCCSExecutionOnSearchesWithLimit0() throws Exception {
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(false));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
Expand Down Expand Up @@ -604,6 +612,7 @@ public void testMetadataIndex() throws Exception {
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.isPartial(), equalTo(false));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
Expand Down Expand Up @@ -799,6 +808,17 @@ public void testWarnings() throws Exception {
assertTrue(latch.await(30, TimeUnit.SECONDS));
}

// Non-disconnect remote failures still fail the request even if skip_unavailable is true
public void testRemoteFailureSkipUnavailableTrue() throws IOException {
Map<String, Object> testClusterInfo = setupFailClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remote1Index = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
assertThat(e.getMessage(), containsString("Accessing failing field"));
}

private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
try {
final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
Expand Down Expand Up @@ -925,4 +945,46 @@ Map<String, String> createEmptyIndicesWithNoMappings(int numClusters) {

return clusterToEmptyIndexMap;
}

Map<String, Object> setupFailClusters() throws IOException {
int numShardsLocal = randomIntBetween(1, 3);
populateLocalIndices(LOCAL_INDEX, numShardsLocal);

int numShardsRemote = randomIntBetween(1, 3);
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);

Map<String, Object> clusterInfo = new HashMap<>();
clusterInfo.put("local.num_shards", numShardsLocal);
clusterInfo.put("local.index", LOCAL_INDEX);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", REMOTE_INDEX);
setSkipUnavailable(REMOTE_CLUSTER_1, true);
return clusterInfo;
}

void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException {
Client remoteClient = client(clusterAlias);
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject("fail_me");
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject();
}
mapping.endObject();
}
mapping.endObject();
assertAcked(
remoteClient.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
.setMapping(mapping.endObject())
);

remoteClient.prepareIndex(indexName).setSource("id", 0).get();
remoteClient.admin().indices().prepareRefresh(indexName).get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -109,6 +110,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -161,6 +163,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -233,6 +236,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));

Expand Down Expand Up @@ -275,6 +279,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));

Expand Down
Loading