Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/128914.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128914
summary: Make Adaptive Allocations Scale to Zero configurable and set default to 24h
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,33 @@ public void loadExtensions(ExtensionLoader loader) {
Setting.Property.NodeScope
);

/**
* The time that has to pass after scaling up, before scaling down is allowed.
* Note that the ML autoscaling has its own cooldown time to release the hardware.
*/
public static final Setting<TimeValue> SCALE_UP_COOLDOWN_TIME = Setting.timeSetting(
"xpack.ml.trained_models.adaptive_allocations.scale_up_cooldown_time",
TimeValue.timeValueMinutes(5),
TimeValue.timeValueMinutes(1),
Property.Dynamic,
Setting.Property.NodeScope
);

/**
* The time interval without any requests that has to pass, before scaling down
* to zero allocations (in case min_allocations = 0). After this time interval
* without requests, the number of allocations is set to zero. When this time
* interval hasn't passed, the minimum number of allocations will always be
* larger than zero.
*/
public static final Setting<TimeValue> SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME = Setting.timeSetting(
"xpack.ml.trained_models.adaptive_allocations.scale_to_zero_time",
TimeValue.timeValueHours(24),
TimeValue.timeValueMinutes(1),
Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Each model deployment results in one or more entries in the cluster state
* for the model allocations. In order to prevent the cluster state from
Expand Down Expand Up @@ -814,7 +841,9 @@ public List<Setting<?>> getSettings() {
MAX_ML_NODE_SIZE,
DELAYED_DATA_CHECK_FREQ,
DUMMY_ENTITY_MEMORY,
DUMMY_ENTITY_PROCESSORS
DUMMY_ENTITY_PROCESSORS,
SCALE_UP_COOLDOWN_TIME,
SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME
);
}

Expand Down Expand Up @@ -1300,7 +1329,8 @@ public Collection<?> createComponents(PluginServices services) {
client,
inferenceAuditor,
telemetryProvider.getMeterRegistry(),
machineLearningExtension.get().isNlpEnabled()
machineLearningExtension.get().isNlpEnabled(),
settings
);

MlInitializationService mlInitializationService = new MlInitializationService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;

import java.util.function.Supplier;

/**
* Processes measured requests counts and inference times and decides whether
* the number of allocations should be scaled up or down.
Expand All @@ -33,7 +35,7 @@ public class AdaptiveAllocationsScaler {
private final String deploymentId;
private final KalmanFilter1d requestRateEstimator;
private final KalmanFilter1d inferenceTimeEstimator;
private final long scaleToZeroAfterNoRequestsSeconds;
private final Supplier<Long> scaleToZeroAfterNoRequestsSeconds;
private double timeWithoutRequestsSeconds;

private int numberOfAllocations;
Expand All @@ -46,7 +48,7 @@ public class AdaptiveAllocationsScaler {
private Double lastMeasuredInferenceTime;
private Long lastMeasuredQueueSize;

AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations, long scaleToZeroAfterNoRequestsSeconds) {
AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations, Supplier<Long> scaleToZeroAfterNoRequestsSeconds) {
this.deploymentId = deploymentId;
this.scaleToZeroAfterNoRequestsSeconds = scaleToZeroAfterNoRequestsSeconds;

Expand Down Expand Up @@ -173,7 +175,7 @@ Integer scale() {
}

if ((minNumberOfAllocations == null || minNumberOfAllocations == 0)
&& timeWithoutRequestsSeconds > scaleToZeroAfterNoRequestsSeconds) {
&& timeWithoutRequestsSeconds > scaleToZeroAfterNoRequestsSeconds.get()) {

if (oldNumberOfAllocations != 0) {
// avoid logging this message if there is no change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -182,25 +184,11 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
/**
* The time interval between the adaptive allocations triggers.
*/
private static final int DEFAULT_TIME_INTERVAL_SECONDS = 10;
/**
* The time that has to pass after scaling up, before scaling down is allowed.
* Note that the ML autoscaling has its own cooldown time to release the hardware.
*/
private static final long SCALE_UP_COOLDOWN_TIME_MILLIS = TimeValue.timeValueMinutes(5).getMillis();

/**
* The time interval without any requests that has to pass, before scaling down
* to zero allocations (in case min_allocations = 0). After this time interval
* without requests, the number of allocations is set to zero. When this time
* interval hasn't passed, the minimum number of allocations will always be
* larger than zero.
*/
private static final long SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS = TimeValue.timeValueMinutes(15).getSeconds();
private static final long DEFAULT_TIME_INTERVAL_SECONDS = 10;

private static final Logger logger = LogManager.getLogger(AdaptiveAllocationsScalerService.class);

private final int timeIntervalSeconds;
private final long timeIntervalSeconds;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Client client;
Expand All @@ -214,8 +202,8 @@ Collection<DoubleWithAttributes> observeDouble(Function<AdaptiveAllocationsScale
private final Map<String, Long> lastScaleUpTimesMillis;
private volatile Scheduler.Cancellable cancellable;
private final AtomicBoolean busy;
private final long scaleToZeroAfterNoRequestsSeconds;
private final long scaleUpCooldownTimeMillis;
private final AtomicLong scaleToZeroAfterNoRequestsSeconds;
private final AtomicLong scaleUpCooldownTimeMillis;
private final Set<String> deploymentIdsWithInFlightScaleFromZeroRequests = new ConcurrentSkipListSet<>();
private final Map<String, String> lastWarningMessages = new ConcurrentHashMap<>();

Expand All @@ -225,7 +213,8 @@ public AdaptiveAllocationsScalerService(
Client client,
InferenceAuditor inferenceAuditor,
MeterRegistry meterRegistry,
boolean isNlpEnabled
boolean isNlpEnabled,
Settings settings
) {
this(
threadPool,
Expand All @@ -235,9 +224,19 @@ public AdaptiveAllocationsScalerService(
meterRegistry,
isNlpEnabled,
DEFAULT_TIME_INTERVAL_SECONDS,
SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME_SECONDS,
SCALE_UP_COOLDOWN_TIME_MILLIS
new AtomicLong(MachineLearning.SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME.get(settings).getSeconds()),
new AtomicLong(MachineLearning.SCALE_UP_COOLDOWN_TIME.get(settings).getMillis())
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
MachineLearning.SCALE_TO_ZERO_AFTER_NO_REQUESTS_TIME,
timeInterval -> this.scaleToZeroAfterNoRequestsSeconds.set(timeInterval.getSeconds())
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
MachineLearning.SCALE_UP_COOLDOWN_TIME,
timeInterval -> this.scaleUpCooldownTimeMillis.set(timeInterval.getMillis())
);
}

// visible for testing
Expand All @@ -248,9 +247,9 @@ public AdaptiveAllocationsScalerService(
InferenceAuditor inferenceAuditor,
MeterRegistry meterRegistry,
boolean isNlpEnabled,
int timeIntervalSeconds,
long scaleToZeroAfterNoRequestsSeconds,
long scaleUpCooldownTimeMillis
long timeIntervalSeconds,
AtomicLong scaleToZeroAfterNoRequestsSeconds,
AtomicLong scaleUpCooldownTimeMillis
) {
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand Down Expand Up @@ -314,7 +313,7 @@ private synchronized void updateAutoscalers(ClusterState state) {
key -> new AdaptiveAllocationsScaler(
assignment.getDeploymentId(),
assignment.totalTargetAllocations(),
scaleToZeroAfterNoRequestsSeconds
scaleToZeroAfterNoRequestsSeconds::get
)
);
adaptiveAllocationsScaler.setMinMaxNumberOfAllocations(
Expand All @@ -331,7 +330,7 @@ private synchronized void updateAutoscalers(ClusterState state) {

private synchronized void startScheduling() {
if (cancellable == null) {
logger.debug("Starting ML adaptive allocations scaler");
logger.debug("Starting ML adaptive allocations scaler at interval [{}].", timeIntervalSeconds);
try {
cancellable = threadPool.scheduleWithFixedDelay(
this::trigger,
Expand Down Expand Up @@ -425,7 +424,7 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
if (nodeStats.getRoutingState() != null && nodeStats.getRoutingState().getState() == RoutingState.STARTING) {
hasRecentObservedScaleUp.add(deploymentId);
}
if (nodeStats.getStartTime() != null && now < nodeStats.getStartTime().toEpochMilli() + scaleUpCooldownTimeMillis) {
if (nodeStats.getStartTime() != null && now < nodeStats.getStartTime().toEpochMilli() + scaleUpCooldownTimeMillis.get()) {
hasRecentObservedScaleUp.add(deploymentId);
}
}
Expand All @@ -446,7 +445,7 @@ private void processDeploymentStats(GetDeploymentStatsAction.Response statsRespo
// hasRecentScaleUp indicates whether this service has recently scaled up the deployment.
// hasRecentObservedScaleUp indicates whether a deployment recently has started,
// potentially triggered by another node.
boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis;
boolean hasRecentScaleUp = lastScaleUpTimeMillis != null && now < lastScaleUpTimeMillis + scaleUpCooldownTimeMillis.get();
if (newNumberOfAllocations < numberOfAllocations.get(deploymentId)
&& (hasRecentScaleUp || hasRecentObservedScaleUp.contains(deploymentId))) {
logger.debug("adaptive allocations scaler: skipping scaling down [{}] because of recent scaleup.", deploymentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -58,6 +59,11 @@

public class AdaptiveAllocationsScalerServiceTests extends ESTestCase {

private static final long ONE_SECOND = 1L;
private static final AtomicLong ATOMIC_SECOND = new AtomicLong(1);
private static final AtomicLong SIXTY_SECONDS = new AtomicLong(60);
private static final AtomicLong TWO_THOUSAND_MILLISECONDS = new AtomicLong(2_000);
private static final AtomicLong SIXTY_THOUSAND_MILLISECONDS = new AtomicLong(60_000);
private TestThreadPool threadPool;
private ClusterService clusterService;
private Client client;
Expand Down Expand Up @@ -175,9 +181,9 @@ public void test_scaleUp() {
inferenceAuditor,
meterRegistry,
true,
1,
60,
60_000
ONE_SECOND,
SIXTY_SECONDS,
SIXTY_THOUSAND_MILLISECONDS
);
service.start();

Expand Down Expand Up @@ -269,9 +275,9 @@ public void test_scaleDownToZero_whenNoRequests() {
inferenceAuditor,
meterRegistry,
true,
1,
1,
2_000
ONE_SECOND,
ATOMIC_SECOND,
TWO_THOUSAND_MILLISECONDS
);
service.start();

Expand Down Expand Up @@ -336,9 +342,9 @@ public void test_dontScale_whenNotStarted() {
inferenceAuditor,
meterRegistry,
true,
1,
1,
2_000
ONE_SECOND,
ATOMIC_SECOND,
TWO_THOUSAND_MILLISECONDS
);
service.start();

Expand Down Expand Up @@ -392,9 +398,9 @@ public void test_noScaleDownToZero_whenRecentlyScaledUpByOtherNode() {
inferenceAuditor,
meterRegistry,
true,
1,
1,
2_000
ONE_SECOND,
ATOMIC_SECOND,
TWO_THOUSAND_MILLISECONDS
);
service.start();

Expand Down Expand Up @@ -477,9 +483,9 @@ public void testMaybeStartAllocation() {
inferenceAuditor,
meterRegistry,
true,
1,
60,
60_000
ONE_SECOND,
SIXTY_SECONDS,
TWO_THOUSAND_MILLISECONDS
);

when(client.threadPool()).thenReturn(threadPool);
Expand Down Expand Up @@ -512,9 +518,9 @@ public void testMaybeStartAllocation_BlocksMultipleRequests() throws Exception {
inferenceAuditor,
meterRegistry,
true,
1,
60,
60_000
ONE_SECOND,
SIXTY_SECONDS,
SIXTY_THOUSAND_MILLISECONDS
);

var latch = new CountDownLatch(1);
Expand Down
Loading
Loading