Skip to content

Conversation

fcofdez
Copy link
Contributor

@fcofdez fcofdez commented Jan 22, 2025

This update postpones unpromotable refreshes for indices with an active INDEX_REFRESH_BLOCK until the block is cleared.
This ensures refresh operations proceed only when the index is no longer blocked.

To avoid indefinite delays, the maximum wait time is governed by the bulk request timeout whereas for explicit refreshes
it relies on the fact that the block will be removed eventually.

Closes ES-10134

…ared This update postpones unpromotable refreshes for indices with an active INDEX_REFRESH_BLOCK until the block is cleared. This ensures refresh operations proceed only when the index is no longer blocked. To avoid indefinite delays, the maximum wait time is governed by the stateless.indices.refresh.max_wait_time_for_unblock setting. Closes ES-10134
@fcofdez fcofdez added >enhancement :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. Team:Distributed Indexing Meta label for Distributed Indexing team labels Jan 22, 2025
@fcofdez fcofdez requested a review from a team as a code owner January 22, 2025 15:56
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@elasticsearchmachine
Copy link
Collaborator

Hi @fcofdez, I've created a changelog YAML for you.


@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(indexLevelBlockException);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should just rely on the fact that the blocks would be cleared eventually instead of failing the refresh request, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see us do both. But in the case of bulks it would be nice to use the timeout from the bulk instead.

Also, I think throwing the refresh block exception is confusing, I'd prefer to throw a timeout exception instead.

But we could also forego it, expecting the refresh unblock to happen automatically after a separate timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For refresh triggered by writes we can maybe reuse the postWriteRefreshTimeout and for explicit refreshes reuse the timeout from the original request, and then rely on the block to be cleared automatically in case we wait indefinitely? And get rid of the new setting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes more sense, I used the postWriteRefreshTimeout instead of the setting in 9a22ad6

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few smaller initial comments.

return;
}

clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd normally pass the predicate here instead, avoiding the explicit retries here.

I think that also removes the need for the runnable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good simplification, done in 9a22ad6


@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(indexLevelBlockException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see us do both. But in the case of bulks it would be nice to use the timeout from the bulk instead.

Also, I think throwing the refresh block exception is confusing, I'd prefer to throw a timeout exception instead.

But we could also forego it, expecting the refresh unblock to happen automatically after a separate timeout.

}

protected void beforeDispatchingRequestToUnpromotableShards(Request request, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not just listener.onResponse(null)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just got used to always use the safe completeWith util, but it's true that listener.onResponse(null) should be enough. Tackled in 9a22ad6.

@elasticsearchmachine elasticsearchmachine added the serverless-linked Added by automation, don't add manually label Jan 23, 2025
Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice, I made a quick review and left some comment.


@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
beforeDispatchingRequestToUnpromotableShards(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add this in TransportUnpromotableShardRefreshAction directly, unless it is needed for tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required by tests finally?


@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(indexLevelBlockException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For refresh triggered by writes we can maybe reuse the postWriteRefreshTimeout and for explicit refreshes reuse the timeout from the original request, and then rely on the block to be cleared automatically in case we wait indefinitely? And get rid of the new setting?

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I hope Tanguy will do a more thorough review.

}

var clusterStateObserver = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is slightly annoying that the observer does not support it but we need to have something like:

if (isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterStateObserver.setAndGetObservedState()) == false) { listener.onResponse(null); return; } 
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right and why I prefer slightly more the Runnable approach. Fixed in 179074d

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find this simpler than the runnable way which takes over more reponsibility from the observer. One can trick the observer by passing in initial version -1, but it is a bit ugly. We should probably add a method to it to simplify this very common case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was planning on doing that. I'll open a PR shortly

@fcofdez fcofdez requested a review from tlrx January 23, 2025 14:24
Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I left only minor comments that you can feel free to address or not.

}, clusterState -> isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterState) == false);
}

private boolean isIndexBlockedForRefresh(String index, ClusterState state) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be static


@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new ElasticsearchTimeoutException("index refresh blocked, waiting for shard(s) to be started"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "waiting for shard to be started" is implicit, the timeout is caused by the block to not be removed within the expected delay.

Maybe something like this?

listener.onFailure( new ElasticsearchTimeoutException( "shard refresh timed out waiting for index block to be removed", new ClusterBlockException(Map.of(request.shardId().getIndexName(), Set.of(INDEX_REFRESH_BLOCK))) ) );

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
beforeDispatchingRequestToUnpromotableShards(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required by tests finally?

List<ShardRouting> assignedShards = new ArrayList<>();
List<ShardRouting> unpromotableShards = new ArrayList<>();
List<ShardRouting> allInitializingShards = new ArrayList<>();
List<ShardRouting> allUnpromotableShards = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if allUnpromotableShards should be named unpromotableShards and the existing unpromotableShards renamed to assignedUnpromotableShards?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(can be a follow up though)

this.activeShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(activeShards);
this.assignedShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedShards);
this.unpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(unpromotableShards);
this.allUnpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(allUnpromotableShards);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be assert that all unpromotableShards are contained in allUnpromotableShards? Can be costly though.

});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
if (indexShard.getReplicationGroup().getRoutingTable().allUnpromotableShards().size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

final var shardId = new ShardId(new Index(randomIdentifier(), randomUUID()), between(0, 3));
final var shardRouting = TestShardRouting.newShardRouting(shardId, randomUUID(), true, ShardRoutingState.STARTED);
final var indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build();
final var indexShardRoutingTable = createShardRoutingTableWithPrimaryAndSearchShards(shardId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it always tests with a search shard, maybe we should randomize that (ie, randomly adding a search shard or not)?

UnpromotableShardRefreshRequest request,
ActionListener<ActionResponse.Empty> responseListener
) {
assert false : "Unexpected call";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also throw an AssertionError?


if (randomBoolean()) {
setState(clusterService, ClusterState.builder(clusterService.state()).version(clusterService.state().version() + 1));
assertThat(countDownLatch.getCount(), is(equalTo(1L)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should remove this? If CI is super slow, we're at risk that the transport message times out before this instruction is executed?

@fcofdez
Copy link
Contributor Author

fcofdez commented Jan 27, 2025

@elasticmachine update branch

@fcofdez
Copy link
Contributor Author

fcofdez commented Jan 28, 2025

@elasticmachine update branch

@fcofdez fcofdez merged commit 2ebbad4 into elastic:main Jan 28, 2025
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.0.0

5 participants