Skip to content

Commit 36874e8

Browse files
authored
Prevent work starvation bug if using scaling EsThreadPoolExecutor with core pool size = 0 (#124732)
When `ExecutorScalingQueue` rejects work to make the worker pool scale up while already being at max pool size (and a new worker consequently cannot be added), available workers might timeout just about at the same time as the task is then force queued by `ForceQueuePolicy`. This has caused starvation of work as observed for `masterService#updateTask` in #124667 where max pool size 1 is used. This configuration is most likely to expose the bug. This PR changes `EsExecutors.newScaling` to not use `ExecutorScalingQueue` if max pool size is 1 (and core pool size is 0). A regular `LinkedTransferQueue` works perfectly fine in this case. If max pool size > 1, a probing approach is used to ensure the worker pool is adequately scaled to at least 1 worker after force queueing work in `ForceQueuePolicy`. Fixes #124667 Relates to #18613
1 parent 3c129e7 commit 36874e8

File tree

9 files changed

+305
-45
lines changed

9 files changed

+305
-45
lines changed

docs/changelog/124732.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 124732
2+
summary: Prevent rare starvation bug when using scaling `EsThreadPoolExecutor` with empty core pool size.
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 124667

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,21 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(
9696
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
9797
}
9898

99+
/**
100+
* Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
101+
* <p>
102+
* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
103+
* (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
104+
* <p>
105+
* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
106+
* a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
107+
* scale beyond the core pool size.
108+
* <p>
109+
* Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
110+
* it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
111+
* a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
112+
* rejection handler.
113+
*/
99114
public static EsThreadPoolExecutor newScaling(
100115
String name,
101116
int min,
@@ -107,10 +122,12 @@ public static EsThreadPoolExecutor newScaling(
107122
ThreadContext contextHolder,
108123
TaskTrackingConfig config
109124
) {
110-
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
111-
EsThreadPoolExecutor executor;
125+
LinkedTransferQueue<Runnable> queue = newUnboundedScalingLTQueue(min, max);
126+
// Force queued work via ForceQueuePolicy might starve if no worker is available (if core size is empty),
127+
// probing the worker pool prevents this.
128+
boolean probeWorkerPool = min == 0 && queue instanceof ExecutorScalingQueue;
112129
if (config.trackExecutionTime()) {
113-
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
130+
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
114131
name,
115132
min,
116133
max,
@@ -119,27 +136,40 @@ public static EsThreadPoolExecutor newScaling(
119136
queue,
120137
TimedRunnable::new,
121138
threadFactory,
122-
new ForceQueuePolicy(rejectAfterShutdown),
139+
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
123140
contextHolder,
124141
config
125142
);
126143
} else {
127-
executor = new EsThreadPoolExecutor(
144+
return new EsThreadPoolExecutor(
128145
name,
129146
min,
130147
max,
131148
keepAliveTime,
132149
unit,
133150
queue,
134151
threadFactory,
135-
new ForceQueuePolicy(rejectAfterShutdown),
152+
new ForceQueuePolicy(rejectAfterShutdown, probeWorkerPool),
136153
contextHolder
137154
);
138155
}
139-
queue.executor = executor;
140-
return executor;
141156
}
142157

158+
/**
159+
* Creates a scaling {@link EsThreadPoolExecutor} using an unbounded work queue.
160+
* <p>
161+
* The {@link EsThreadPoolExecutor} scales the same way as a regular {@link ThreadPoolExecutor} until the core pool size
162+
* (and at least 1) is reached: each time a task is submitted a new worker is added regardless if an idle worker is available.
163+
* <p>
164+
* Once having reached the core pool size, a {@link ThreadPoolExecutor} will only add a new worker if the work queue rejects
165+
* a task offer. Typically, using a regular unbounded queue, task offers won't ever be rejected, meaning the worker pool would never
166+
* scale beyond the core pool size.
167+
* <p>
168+
* Scaling {@link EsThreadPoolExecutor}s use a customized unbounded {@link LinkedTransferQueue}, which rejects every task offer unless
169+
* it can be immediately transferred to an available idle worker. If no such worker is available, the executor will add
170+
* a new worker if capacity remains, otherwise the task is rejected and then appended to the work queue via the {@link ForceQueuePolicy}
171+
* rejection handler.
172+
*/
143173
public static EsThreadPoolExecutor newScaling(
144174
String name,
145175
int min,
@@ -389,32 +419,58 @@ public boolean isSystem() {
389419
*/
390420
private EsExecutors() {}
391421

392-
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
422+
private static <E> LinkedTransferQueue<E> newUnboundedScalingLTQueue(int corePoolSize, int maxPoolSize) {
423+
if (maxPoolSize == 1 || maxPoolSize == corePoolSize) {
424+
// scaling beyond core pool size (or 1) not required, use a regular unbounded LinkedTransferQueue
425+
return new LinkedTransferQueue<>();
426+
}
427+
// scaling beyond core pool size with an unbounded queue requires ExecutorScalingQueue
428+
// note, reconfiguration of core / max pool size not supported in EsThreadPoolExecutor
429+
return new ExecutorScalingQueue<>();
430+
}
393431

394-
ThreadPoolExecutor executor;
432+
/**
433+
* Customized {@link LinkedTransferQueue} to allow a {@link ThreadPoolExecutor} to scale beyond its core pool size despite having an
434+
* unbounded queue.
435+
* <p>
436+
* Note, usage of unbounded work queues is a problem by itself. For once, it makes error-prone customizations necessary so that
437+
* thread pools can scale up adequately. But worse, infinite queues prevent backpressure and impose a high risk of causing OOM errors.
438+
* <a href="https://github.com/elastic/elasticsearch/issues/18613">Github #18613</a> captures various long outstanding, but important
439+
* improvements to thread pools.
440+
* <p>
441+
* Once having reached its core pool size, a {@link ThreadPoolExecutor} will only add more workers if capacity remains and
442+
* the task offer is rejected by the work queue. Typically that's never the case using a regular unbounded queue.
443+
* <p>
444+
* This customized implementation rejects every task offer unless it can be immediately transferred to an available idle worker.
445+
* It relies on {@link ForceQueuePolicy} rejection handler to append the task to the work queue if no additional worker can be added
446+
* and the task is rejected by the executor.
447+
* <p>
448+
* Note, {@link ForceQueuePolicy} cannot guarantee there will be available workers when appending tasks directly to the queue.
449+
* For that reason {@link ExecutorScalingQueue} cannot be used with executors with empty core and max pool size of 1:
450+
* the only available worker could time out just about at the same time as the task is appended, see
451+
* <a href="https://github.com/elastic/elasticsearch/issues/124667">Github #124667</a> for more details.
452+
* <p>
453+
* Note, configuring executors using core = max size in combination with {@code allowCoreThreadTimeOut} could be an alternative to
454+
* {@link ExecutorScalingQueue}. However, the scaling behavior would be very different: Using {@link ExecutorScalingQueue}
455+
* we are able to reuse idle workers if available by means of {@link ExecutorScalingQueue#tryTransfer(Object)}.
456+
* If setting core = max size, the executor will add a new worker for every task submitted until reaching the core/max pool size
457+
* even if there's idle workers available.
458+
*/
459+
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
395460

396461
ExecutorScalingQueue() {}
397462

398463
@Override
399464
public boolean offer(E e) {
400-
// first try to transfer to a waiting worker thread
401-
if (tryTransfer(e) == false) {
402-
// check if there might be spare capacity in the thread
403-
// pool executor
404-
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
405-
if (left > 0) {
406-
// reject queuing the task to force the thread pool
407-
// executor to add a worker if it can; combined
408-
// with ForceQueuePolicy, this causes the thread
409-
// pool to always scale up to max pool size and we
410-
// only queue when there is no spare capacity
411-
return false;
412-
} else {
413-
return super.offer(e);
414-
}
415-
} else {
416-
return true;
465+
if (e == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
466+
// this probe ensures a worker is available after force queueing a task via ForceQueuePolicy
467+
return super.offer(e);
417468
}
469+
// try to transfer to a waiting worker thread
470+
// otherwise reject queuing the task to force the thread pool executor to add a worker if it can;
471+
// combined with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size
472+
// so that we only queue when there is no spare capacity
473+
return tryTransfer(e);
418474
}
419475

420476
// Overridden to workaround a JDK bug introduced in JDK 21.0.2
@@ -456,15 +512,24 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
456512
*/
457513
private final boolean rejectAfterShutdown;
458514

515+
/**
516+
* Flag to indicate if the worker pool needs to be probed after force queuing a task to guarantee a worker is available.
517+
*/
518+
private final boolean probeWorkerPool;
519+
459520
/**
460521
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
461522
*/
462-
ForceQueuePolicy(boolean rejectAfterShutdown) {
523+
ForceQueuePolicy(boolean rejectAfterShutdown, boolean probeWorkerPool) {
463524
this.rejectAfterShutdown = rejectAfterShutdown;
525+
this.probeWorkerPool = probeWorkerPool;
464526
}
465527

466528
@Override
467529
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
530+
if (task == EsThreadPoolExecutor.WORKER_PROBE) { // referential equality
531+
return;
532+
}
468533
if (rejectAfterShutdown) {
469534
if (executor.isShutdown()) {
470535
reject(executor, task);
@@ -481,12 +546,19 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
481546
}
482547
}
483548

484-
private static void put(ThreadPoolExecutor executor, Runnable task) {
549+
private void put(ThreadPoolExecutor executor, Runnable task) {
485550
final BlockingQueue<Runnable> queue = executor.getQueue();
486-
// force queue policy should only be used with a scaling queue
487-
assert queue instanceof ExecutorScalingQueue;
551+
// force queue policy should only be used with a scaling queue (ExecutorScalingQueue / LinkedTransferQueue)
552+
assert queue instanceof LinkedTransferQueue;
488553
try {
489554
queue.put(task);
555+
if (probeWorkerPool && task == queue.peek()) { // referential equality
556+
// If the task is at the head of the queue, we can assume the queue was previously empty. In this case available workers
557+
// might have timed out in the meanwhile. To prevent the task from starving, we submit a noop probe to the executor.
558+
// Note, this deliberately doesn't check getPoolSize()==0 to avoid potential race conditions,
559+
// as the count in the atomic state (used by workerCountOf) is decremented first.
560+
executor.execute(EsThreadPoolExecutor.WORKER_PROBE);
561+
}
490562
} catch (final InterruptedException e) {
491563
assert false : "a scaling queue never blocks so a put to it can never be interrupted";
492564
throw new AssertionError(e);

server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
2929

3030
private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);
3131

32+
// noop probe to prevent starvation of work in the work queue due to ForceQueuePolicy
33+
// https://github.com/elastic/elasticsearch/issues/124667
34+
// note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant
35+
// matching similar lambdas coming from other places
36+
static final Runnable WORKER_PROBE = new Runnable() {
37+
@Override
38+
public void run() {}
39+
};
40+
3241
private final ThreadContext contextHolder;
3342

3443
/**
@@ -66,9 +75,19 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
6675
this.contextHolder = contextHolder;
6776
}
6877

78+
@Override
79+
public void setCorePoolSize(int corePoolSize) {
80+
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
81+
}
82+
83+
@Override
84+
public void setMaximumPoolSize(int maximumPoolSize) {
85+
throw new UnsupportedOperationException("reconfiguration at runtime is not supported");
86+
}
87+
6988
@Override
7089
public void execute(Runnable command) {
71-
final Runnable wrappedRunnable = wrapRunnable(command);
90+
final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE;
7291
try {
7392
super.execute(wrappedRunnable);
7493
} catch (Exception e) {

server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,14 @@ public ScalingExecutorBuilder(
105105
final EsExecutors.TaskTrackingConfig trackingConfig
106106
) {
107107
super(name, false);
108-
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
109-
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
110-
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
108+
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, 0, Setting.Property.NodeScope);
109+
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, 1, Setting.Property.NodeScope);
110+
this.keepAliveSetting = Setting.timeSetting(
111+
settingsKey(prefix, "keep_alive"),
112+
keepAlive,
113+
TimeValue.ZERO,
114+
Setting.Property.NodeScope
115+
);
111116
this.rejectAfterShutdown = rejectAfterShutdown;
112117
this.trackingConfig = trackingConfig;
113118
}
@@ -172,5 +177,4 @@ static class ScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
172177
this.keepAlive = keepAlive;
173178
}
174179
}
175-
176180
}

server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void testAsyncAcquire() throws InterruptedException {
100100
final var completionLatch = new CountDownLatch(1);
101101
final var executorService = EsExecutors.newScaling(
102102
"test",
103-
0,
103+
1,
104104
between(1, 10),
105105
10,
106106
TimeUnit.SECONDS,

0 commit comments

Comments
 (0)