- Notifications
You must be signed in to change notification settings - Fork 25.5k
Defer unpromotable shard refreshes until index refresh blocks are cleared #120642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Defer unpromotable shard refreshes until index refresh blocks are cleared #120642
Conversation
…ared This update postpones unpromotable refreshes for indices with an active INDEX_REFRESH_BLOCK until the block is cleared. This ensures refresh operations proceed only when the index is no longer blocked. To avoid indefinite delays, the maximum wait time is governed by the stateless.indices.refresh.max_wait_time_for_unblock setting. Closes ES-10134
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
Hi @fcofdez, I've created a changelog YAML for you. |
| ||
@Override | ||
public void onTimeout(TimeValue timeout) { | ||
listener.onFailure(indexLevelBlockException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if we should just rely on the fact that the blocks would be cleared eventually instead of failing the refresh request, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could see us do both. But in the case of bulks it would be nice to use the timeout from the bulk instead.
Also, I think throwing the refresh block exception is confusing, I'd prefer to throw a timeout exception instead.
But we could also forego it, expecting the refresh unblock to happen automatically after a separate timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For refresh triggered by writes we can maybe reuse the postWriteRefreshTimeout
and for explicit refreshes reuse the timeout from the original request, and then rely on the block to be cleared automatically in case we wait indefinitely? And get rid of the new setting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes more sense, I used the postWriteRefreshTimeout
instead of the setting in 9a22ad6
…elasticsearch into wait-for-refresh-block-clearing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few smaller initial comments.
return; | ||
} | ||
| ||
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'd normally pass the predicate here instead, avoiding the explicit retries here.
I think that also removes the need for the runnable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good simplification, done in 9a22ad6
| ||
@Override | ||
public void onTimeout(TimeValue timeout) { | ||
listener.onFailure(indexLevelBlockException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could see us do both. But in the case of bulks it would be nice to use the timeout from the bulk instead.
Also, I think throwing the refresh block exception is confusing, I'd prefer to throw a timeout exception instead.
But we could also forego it, expecting the refresh unblock to happen automatically after a separate timeout.
} | ||
| ||
protected void beforeDispatchingRequestToUnpromotableShards(Request request, ActionListener<Void> listener) { | ||
ActionListener.completeWith(listener, () -> null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not just listener.onResponse(null)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just got used to always use the safe completeWith
util, but it's true that listener.onResponse(null)
should be enough. Tackled in 9a22ad6.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice, I made a quick review and left some comment.
| ||
@Override | ||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) { | ||
beforeDispatchingRequestToUnpromotableShards( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add this in TransportUnpromotableShardRefreshAction
directly, unless it is needed for tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it required by tests finally?
| ||
@Override | ||
public void onTimeout(TimeValue timeout) { | ||
listener.onFailure(indexLevelBlockException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For refresh triggered by writes we can maybe reuse the postWriteRefreshTimeout
and for explicit refreshes reuse the timeout from the original request, and then rely on the block to be cleared automatically in case we wait indefinitely? And get rid of the new setting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I hope Tanguy will do a more thorough review.
} | ||
| ||
var clusterStateObserver = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is slightly annoying that the observer does not support it but we need to have something like:
if (isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterStateObserver.setAndGetObservedState()) == false) { listener.onResponse(null); return; }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right and why I prefer slightly more the Runnable approach. Fixed in 179074d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still find this simpler than the runnable way which takes over more reponsibility from the observer. One can trick the observer by passing in initial version -1, but it is a bit ugly. We should probably add a method to it to simplify this very common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was planning on doing that. I'll open a PR shortly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left only minor comments that you can feel free to address or not.
}, clusterState -> isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterState) == false); | ||
} | ||
| ||
private boolean isIndexBlockedForRefresh(String index, ClusterState state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be static
| ||
@Override | ||
public void onTimeout(TimeValue timeout) { | ||
listener.onFailure(new ElasticsearchTimeoutException("index refresh blocked, waiting for shard(s) to be started")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "waiting for shard to be started" is implicit, the timeout is caused by the block to not be removed within the expected delay.
Maybe something like this?
listener.onFailure( new ElasticsearchTimeoutException( "shard refresh timed out waiting for index block to be removed", new ClusterBlockException(Map.of(request.shardId().getIndexName(), Set.of(INDEX_REFRESH_BLOCK))) ) );
| ||
@Override | ||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) { | ||
beforeDispatchingRequestToUnpromotableShards( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it required by tests finally?
List<ShardRouting> assignedShards = new ArrayList<>(); | ||
List<ShardRouting> unpromotableShards = new ArrayList<>(); | ||
List<ShardRouting> allInitializingShards = new ArrayList<>(); | ||
List<ShardRouting> allUnpromotableShards = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if allUnpromotableShards
should be named unpromotableShards
and the existing unpromotableShards
renamed to assignedUnpromotableShards
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(can be a follow up though)
this.activeShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(activeShards); | ||
this.assignedShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedShards); | ||
this.unpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(unpromotableShards); | ||
this.allUnpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(allUnpromotableShards); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be assert that all unpromotableShards
are contained in allUnpromotableShards
? Can be costly though.
}); | ||
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> { | ||
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) { | ||
if (indexShard.getReplicationGroup().getRoutingTable().allUnpromotableShards().size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
final var shardId = new ShardId(new Index(randomIdentifier(), randomUUID()), between(0, 3)); | ||
final var shardRouting = TestShardRouting.newShardRouting(shardId, randomUUID(), true, ShardRoutingState.STARTED); | ||
final var indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build(); | ||
final var indexShardRoutingTable = createShardRoutingTableWithPrimaryAndSearchShards(shardId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it always tests with a search shard, maybe we should randomize that (ie, randomly adding a search shard or not)?
UnpromotableShardRefreshRequest request, | ||
ActionListener<ActionResponse.Empty> responseListener | ||
) { | ||
assert false : "Unexpected call"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe also throw an AssertionError?
| ||
if (randomBoolean()) { | ||
setState(clusterService, ClusterState.builder(clusterService.state()).version(clusterService.state().version() + 1)); | ||
assertThat(countDownLatch.getCount(), is(equalTo(1L))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should remove this? If CI is super slow, we're at risk that the transport message times out before this instruction is executed?
@elasticmachine update branch |
…elasticsearch into wait-for-refresh-block-clearing
@elasticmachine update branch |
This update postpones unpromotable refreshes for indices with an active INDEX_REFRESH_BLOCK until the block is cleared.
This ensures refresh operations proceed only when the index is no longer blocked.
To avoid indefinite delays, the maximum wait time is governed by the bulk request timeout whereas for explicit refreshes
it relies on the fact that the block will be removed eventually.
Closes ES-10134