Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3667eec
Init exploratory coding commit
quux00 Dec 3, 2024
fdc6352
Init version (two impls) both seem to work. Going to follow the CCR m…
quux00 Dec 3, 2024
f110c13
Improvements - using the EsqlLicenseChecker; still doing manual testi…
quux00 Dec 3, 2024
c7f5a6a
Added license check tests to EsqlSessionCCSUtilsTests; fixed bug in E…
quux00 Dec 4, 2024
0d98ff2
Added trial license start code to CrossClustersQueryIT but it fails t…
quux00 Dec 4, 2024
858a8f9
Attempted to get CrossClustersQueryIT working with a license, but eve…
quux00 Dec 5, 2024
018169e
Created EsqlPluginWithTrialLicense and added to the CCS ESQL IT tests…
quux00 Dec 5, 2024
8189a3e
Fixed CrossClustersCancellationIT and tried to fix MultiClusterSearch…
quux00 Dec 5, 2024
5953d21
Changed legacy-with-basic-license ESQL qa tests to expect 400 Bad Req…
quux00 Dec 6, 2024
3fdac19
Renamed EsqlPluginWithTrialLicense to EsqlPluginWithEnterpriseOrTrial…
quux00 Dec 6, 2024
4aba9ce
Added new AbstractEnrichBasedCrossClusterTestCase and all enrich base…
quux00 Dec 6, 2024
8c7fd08
Code cleanup and added another test to CrossClustersQueriesWithInvali…
quux00 Dec 6, 2024
e77c119
Changed EsqlLicenseChecker to track esql-ccs usage when checking the …
quux00 Dec 9, 2024
ab3341a
Removed warnings from EsqlSessionCCSUtilsTests test for active licenses
quux00 Dec 9, 2024
f16478c
Added check for esql-ccs license_usage in RemoteClusterSecurityEsqlIT
quux00 Dec 9, 2024
5609595
Minor code cleanup for PR review
quux00 Dec 10, 2024
165c3eb
Fixed flaky RemoteClusterSecurityEsqlIT test related to checking last…
quux00 Dec 10, 2024
3d91e03
Update docs/changelog/118102.yaml
quux00 Dec 10, 2024
588a2ce
Merge remote-tracking branch 'elastic/main' into esql-ccs/enterprise-…
quux00 Dec 11, 2024
6e2b916
Adjusted license downgrade ack message and added tests to XPackLicens…
quux00 Dec 11, 2024
5d9a153
Added javadoc requested in PR review
quux00 Dec 11, 2024
02cbf71
Merge remote-tracking branch 'elastic/main' into esql-ccs/enterprise-…
quux00 Dec 11, 2024
726c747
Minor tweaks
quux00 Dec 11, 2024
f91128f
Merge remote-tracking branch 'elastic/main' into esql-ccs/enterprise-…
quux00 Dec 11, 2024
124056a
Fix issue in changelog summary
quux00 Dec 11, 2024
43c8120
Merge remote-tracking branch 'elastic/main' into esql-ccs/enterprise-…
quux00 Dec 11, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/118102.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118102
summary: "ESQL: Enterprise license enforcement for CCS"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public boolean isNeedsActive() {
return needsActive;
}

/** Create a momentary feature for hte given license level */
/** Create a momentary feature for the given license level */
public static Momentary momentary(String family, String name, License.OperationMode licenseLevel) {
return new Momentary(family, name, licenseLevel, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class XPackLicenseState {
messages.put(XPackField.CCR, XPackLicenseState::ccrAcknowledgementMessages);
messages.put(XPackField.ENTERPRISE_SEARCH, XPackLicenseState::enterpriseSearchAcknowledgementMessages);
messages.put(XPackField.REDACT_PROCESSOR, XPackLicenseState::redactProcessorAcknowledgementMessages);
messages.put(XPackField.ESQL, XPackLicenseState::esqlAcknowledgementMessages);
ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages);
}

Expand Down Expand Up @@ -243,6 +244,26 @@ private static String[] enterpriseSearchAcknowledgementMessages(OperationMode cu
return Strings.EMPTY_ARRAY;
}

private static String[] esqlAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
/*
* Provide an acknowledgement warning to customers that downgrade from Trial or Enterprise to a lower
* license level (Basic, Standard, Gold or Premium) that they will no longer be able to do CCS in ES|QL.
*/
switch (newMode) {
case BASIC:
case STANDARD:
case GOLD:
case PLATINUM:
switch (currentMode) {
case TRIAL:
case ENTERPRISE:
return new String[] { "ES|QL cross-cluster search will be disabled." };
}
break;
}
return Strings.EMPTY_ARRAY;
}

private static String[] machineLearningAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
switch (newMode) {
case BASIC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.core.XPackField;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -59,6 +60,12 @@ void assertAckMessages(String feature, OperationMode from, OperationMode to, int
assertEquals(expectedMessages, gotMessages.length);
}

void assertAckMessages(String feature, OperationMode from, OperationMode to, Set<String> expectedMessages) {
String[] gotMessages = XPackLicenseState.ACKNOWLEDGMENT_MESSAGES.get(feature).apply(from, to);
Set<String> actualMessages = Arrays.stream(gotMessages).collect(Collectors.toSet());
assertThat(actualMessages, equalTo(expectedMessages));
}

static <T> T randomFrom(T[] values, Predicate<T> filter) {
return randomFrom(Arrays.stream(values).filter(filter).collect(Collectors.toList()));
}
Expand Down Expand Up @@ -143,6 +150,16 @@ public void testCcrAckTrialOrPlatinumToNotTrialOrPlatinum() {
assertAckMessages(XPackField.CCR, randomTrialOrPlatinumMode(), randomBasicStandardOrGold(), 1);
}

public void testEsqlAckToTrialOrPlatinum() {
assertAckMessages(XPackField.ESQL, randomMode(), randomFrom(TRIAL, ENTERPRISE), 0);
}

public void testEsqlAckTrialOrEnterpriseToNotTrialOrEnterprise() {
for (OperationMode to : List.of(BASIC, STANDARD, GOLD, PLATINUM)) {
assertAckMessages(XPackField.ESQL, randomFrom(TRIAL, ENTERPRISE), to, Set.of("ES|QL cross-cluster search will be disabled."));
}
}

public void testExpiredLicense() {
// use standard feature which would normally be allowed at all license levels
LicensedFeature feature = LicensedFeature.momentary("family", "enterpriseFeature", STANDARD);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.EnrichPlugin;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;

public abstract class AbstractEnrichBasedCrossClusterTestCase extends AbstractMultiClustersTestCase {

public static String REMOTE_CLUSTER_1 = "c1";
public static String REMOTE_CLUSTER_2 = "c2";

/**
* subclasses should override if they don't want enrich policies wiped after each test method run
*/
protected boolean tolerateErrorsWhenWipingEnrichPolicies() {
return false;
}

@Override
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
}

protected Collection<String> allClusters() {
return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class);
plugins.add(IngestCommonPlugin.class);
plugins.add(ReindexPlugin.class);
return plugins;
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
}

static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os"));
static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor"));

@Before
public void setupHostsEnrich() {
// the hosts policy are identical on every node
Map<String, String> allHosts = Map.of(
"192.168.1.2",
"Windows",
"192.168.1.3",
"MacOS",
"192.168.1.4",
"Linux",
"192.168.1.5",
"Android",
"192.168.1.6",
"iOS",
"192.168.1.7",
"Windows",
"192.168.1.8",
"MacOS",
"192.168.1.9",
"Linux",
"192.168.1.10",
"Linux",
"192.168.1.11",
"Windows"
);
for (String cluster : allClusters()) {
Client client = client(cluster);
client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get();
for (Map.Entry<String, String> h : allHosts.entrySet()) {
client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
}
client.admin().indices().prepareRefresh("hosts").get();
client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy))
.actionGet();
client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
.actionGet();
assertAcked(client.admin().indices().prepareDelete("hosts"));
}
}

@Before
public void setupVendorPolicy() {
var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat");
var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse");
var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu");
var vendors = Map.of(LOCAL_CLUSTER, localVendors, REMOTE_CLUSTER_1, c1Vendors, REMOTE_CLUSTER_2, c2Vendors);
for (Map.Entry<String, Map<String, String>> e : vendors.entrySet()) {
Client client = client(e.getKey());
client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get();
for (Map.Entry<String, String> v : e.getValue().entrySet()) {
client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get();
}
client.admin().indices().prepareRefresh("vendors").get();
client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy))
.actionGet();
client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors"))
.actionGet();
assertAcked(client.admin().indices().prepareDelete("vendors"));
}
}

@Before
public void setupEventsIndices() {
record Event(long timestamp, String user, String host) {

}
List<Event> e0 = List.of(
new Event(1, "matthew", "192.168.1.3"),
new Event(2, "simon", "192.168.1.5"),
new Event(3, "park", "192.168.1.2"),
new Event(4, "andrew", "192.168.1.7"),
new Event(5, "simon", "192.168.1.20"),
new Event(6, "kevin", "192.168.1.2"),
new Event(7, "akio", "192.168.1.5"),
new Event(8, "luke", "192.168.1.2"),
new Event(9, "jack", "192.168.1.4")
);
List<Event> e1 = List.of(
new Event(1, "andres", "192.168.1.2"),
new Event(2, "sergio", "192.168.1.6"),
new Event(3, "kylian", "192.168.1.8"),
new Event(4, "andrew", "192.168.1.9"),
new Event(5, "jack", "192.168.1.3"),
new Event(6, "kevin", "192.168.1.4"),
new Event(7, "akio", "192.168.1.7"),
new Event(8, "kevin", "192.168.1.21"),
new Event(9, "andres", "192.168.1.8")
);
List<Event> e2 = List.of(
new Event(1, "park", "192.168.1.25"),
new Event(2, "akio", "192.168.1.5"),
new Event(3, "park", "192.168.1.2"),
new Event(4, "kevin", "192.168.1.3")
);
for (var c : Map.of(LOCAL_CLUSTER, e0, REMOTE_CLUSTER_1, e1, REMOTE_CLUSTER_2, e2).entrySet()) {
Client client = client(c.getKey());
client.admin()
.indices()
.prepareCreate("events")
.setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip")
.get();
for (var e : c.getValue()) {
client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get();
}
client.admin().indices().prepareRefresh("events").get();
}
}

@After
public void wipeEnrichPolicies() {
for (String cluster : allClusters()) {
cluster(cluster).wipe(Set.of());
for (String policy : List.of("hosts", "vendors")) {
if (tolerateErrorsWhenWipingEnrichPolicies()) {
try {
client(cluster).execute(
DeleteEnrichPolicyAction.INSTANCE,
new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
);
} catch (Exception e) {
assertThat(e.getMessage(), containsString("Cluster is already closed"));
}

} else {
client(cluster).execute(
DeleteEnrichPolicyAction.INSTANCE,
new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
);
}
}
}
}

static String enrichHosts(Enrich.Mode mode) {
return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields());
}

static String enrichVendors(Enrich.Mode mode) {
return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields());
}

protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(query);
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
if (randomBoolean()) {
request.profile(true);
}
if (ccsMetadataInResponse != null) {
request.includeCCSMetadata(ccsMetadataInResponse);
}
return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
}

public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
return switch (randomIntBetween(1, 3)) {
case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
case 3 -> new Tuple<>(null, Boolean.FALSE);
default -> throw new AssertionError("should not get here");
};
}

public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {
super(settings, configPath);

plugins.add(new EnrichPlugin(settings) {
@Override
protected XPackLicenseState getLicenseState() {
return this.getLicenseState();
}
});
}

public static class EnrichTransportXPackInfoAction extends TransportXPackInfoAction {
@Inject
public EnrichTransportXPackInfoAction(
TransportService transportService,
ActionFilters actionFilters,
LicenseService licenseService,
NodeClient client
) {
super(transportService, actionFilters, licenseService, client);
}

@Override
protected List<ActionType<XPackInfoFeatureResponse>> infoActions() {
return Collections.singletonList(XPackInfoFeatureAction.ENRICH);
}
}

@Override
protected Class<? extends TransportAction<XPackInfoRequest, XPackInfoResponse>> getInfoAction() {
return CrossClustersQueriesWithInvalidLicenseIT.LocalStateEnrich.EnrichTransportXPackInfoAction.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -78,7 +77,7 @@ protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPlugin.class);
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
plugins.add(InternalExchangePlugin.class);
plugins.add(PauseFieldPlugin.class);
Expand Down
Loading