Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120642.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120642
summary: Defer unpromotable shard refreshes until index refresh blocks are cleared
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ public TransportShardRefreshAction(
ReplicaActionExecution.SubjectToCircuitBreaker
);
// registers the unpromotable version of shard refresh action
new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);
new TransportUnpromotableShardRefreshAction(
clusterService,
transportService,
shardStateAction,
actionFilters,
indicesService,
threadPool
);
this.refreshExecutor = transportService.getThreadPool().executor(ThreadPool.Names.REFRESH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,31 @@

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.broadcast.unpromotable.TransportBroadcastUnpromotableAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;

public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
UnpromotableShardRefreshRequest,
Expand All @@ -36,14 +47,37 @@ public class TransportUnpromotableShardRefreshAction extends TransportBroadcastU
}

private final IndicesService indicesService;
private final ThreadPool threadPool;
private final boolean useRefreshBlock;

@Inject
public TransportUnpromotableShardRefreshAction(
ClusterService clusterService,
TransportService transportService,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndicesService indicesService
IndicesService indicesService,
ThreadPool threadPool
) {
this(
clusterService,
transportService,
shardStateAction,
actionFilters,
indicesService,
threadPool,
MetadataCreateIndexService.useRefreshBlock(clusterService.getSettings())
);
}

public TransportUnpromotableShardRefreshAction(
ClusterService clusterService,
TransportService transportService,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndicesService indicesService,
ThreadPool threadPool,
boolean useRefreshBlock
) {
super(
NAME,
Expand All @@ -55,6 +89,53 @@ public TransportUnpromotableShardRefreshAction(
transportService.getThreadPool().executor(ThreadPool.Names.REFRESH)
);
this.indicesService = indicesService;
this.threadPool = threadPool;
this.useRefreshBlock = useRefreshBlock;
}

@Override
protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener<ActionResponse.Empty> listener) {
beforeDispatchingRequestToUnpromotableShards(request, listener.delegateFailure((l, unused) -> super.doExecute(task, request, l)));
}

private void beforeDispatchingRequestToUnpromotableShards(UnpromotableShardRefreshRequest request, ActionListener<Void> listener) {
if (useRefreshBlock == false) {
listener.onResponse(null);
return;
}

var clusterStateObserver = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is slightly annoying that the observer does not support it but we need to have something like:

if (isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterStateObserver.setAndGetObservedState()) == false) { listener.onResponse(null); return; } 
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right and why I prefer slightly more the Runnable approach. Fixed in 179074d

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find this simpler than the runnable way which takes over more reponsibility from the observer. One can trick the observer by passing in initial version -1, but it is a bit ugly. We should probably add a method to it to simplify this very common case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was planning on doing that. I'll open a PR shortly

if (isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterStateObserver.setAndGetObservedState()) == false) {
listener.onResponse(null);
return;
}

clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(null);
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(
new ElasticsearchTimeoutException(
"shard refresh timed out waiting for index block to be removed",
new ClusterBlockException(Map.of(request.shardId().getIndexName(), Set.of(INDEX_REFRESH_BLOCK)))
)
);
}
}, clusterState -> isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterState) == false);
}

private static boolean isIndexBlockedForRefresh(String index, ClusterState state) {
return state.blocks().hasIndexBlock(index, INDEX_REFRESH_BLOCK);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.Engine;

import java.io.IOException;
Expand All @@ -26,22 +28,36 @@ public class UnpromotableShardRefreshRequest extends BroadcastUnpromotableReques

private final long primaryTerm;
private final long segmentGeneration;
private final TimeValue timeout;

public UnpromotableShardRefreshRequest(
IndexShardRoutingTable indexShardRoutingTable,
long primaryTerm,
long segmentGeneration,
boolean failShardOnError
) {
this(indexShardRoutingTable, primaryTerm, segmentGeneration, failShardOnError, null);
}

public UnpromotableShardRefreshRequest(
IndexShardRoutingTable indexShardRoutingTable,
long primaryTerm,
long segmentGeneration,
boolean failShardOnError,
@Nullable TimeValue timeout
) {
super(indexShardRoutingTable, failShardOnError);
this.primaryTerm = primaryTerm;
this.segmentGeneration = segmentGeneration;
this.timeout = timeout;
}

public UnpromotableShardRefreshRequest(StreamInput in) throws IOException {
super(in);
segmentGeneration = in.readVLong();
primaryTerm = in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVLong() : Engine.UNKNOWN_PRIMARY_TERM;
// The timeout is only used by the request sender, therefore we don't write it over the wire
timeout = null;
}

@Override
Expand Down Expand Up @@ -70,6 +86,11 @@ public long getPrimaryTerm() {
return primaryTerm;
}

@Nullable
public TimeValue getTimeout() {
return timeout;
}

@Override
public String toString() {
return Strings.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void onFailure(Exception e) {
}
});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
if (indexShard.getReplicationGroup().getRoutingTable().allUnpromotableShards().size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
} else {
l.onResponse(true);
Expand Down Expand Up @@ -136,7 +136,8 @@ private void sendUnpromotableRequests(
indexShard.getReplicationGroup().getRoutingTable(),
indexShard.getOperationPrimaryTerm(),
generation,
true
true,
postWriteRefreshTimeout
);
transportService.sendRequest(
transportService.getLocalNode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ public static void validateStoreTypeSetting(Settings indexSettings) {
}
}

private static boolean useRefreshBlock(Settings settings) {
public static boolean useRefreshBlock(Settings settings) {
return DiscoveryNode.isStateless(settings) && settings.getAsBoolean(USE_INDEX_REFRESH_BLOCK_SETTING_NAME, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class IndexShardRoutingTable {
final List<ShardRouting> replicas;
final List<ShardRouting> activeShards;
final List<ShardRouting> assignedShards;
private final List<ShardRouting> assignedUnpromotableShards;
private final List<ShardRouting> unpromotableShards;
/**
* The initializing list, including ones that are initializing on a target node because of relocation.
Expand All @@ -71,8 +72,9 @@ public class IndexShardRoutingTable {
List<ShardRouting> replicas = new ArrayList<>();
List<ShardRouting> activeShards = new ArrayList<>();
List<ShardRouting> assignedShards = new ArrayList<>();
List<ShardRouting> unpromotableShards = new ArrayList<>();
List<ShardRouting> assignedUnpromotableShards = new ArrayList<>();
List<ShardRouting> allInitializingShards = new ArrayList<>();
List<ShardRouting> unpromotableShards = new ArrayList<>();
boolean allShardsStarted = true;
int activeSearchShardCount = 0;
int totalSearchShardCount = 0;
Expand All @@ -95,20 +97,24 @@ public class IndexShardRoutingTable {
if (shard.initializing()) {
allInitializingShards.add(shard);
}
if (shard.isPromotableToPrimary() == false) {
unpromotableShards.add(shard);
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.getTargetRelocatingShard());
assert shard.assignedToNode() : "relocating from unassigned " + shard;
assert shard.getTargetRelocatingShard().assignedToNode() : "relocating to unassigned " + shard.getTargetRelocatingShard();
assignedShards.add(shard.getTargetRelocatingShard());
if (shard.getTargetRelocatingShard().isPromotableToPrimary() == false) {
assignedUnpromotableShards.add(shard.getTargetRelocatingShard());
unpromotableShards.add(shard.getTargetRelocatingShard());
}
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
if (shard.isPromotableToPrimary() == false) {
unpromotableShards.add(shard);
assignedUnpromotableShards.add(shard);
}
}
if (shard.state() != ShardRoutingState.STARTED) {
Expand All @@ -117,10 +123,13 @@ public class IndexShardRoutingTable {
}
assert shards.isEmpty() == false : "cannot have an empty shard routing table";
assert primary != null : shards;
assert unpromotableShards.containsAll(assignedUnpromotableShards)
: unpromotableShards + " does not contain all assigned unpromotable shards " + assignedUnpromotableShards;
this.primary = primary;
this.replicas = CollectionUtils.wrapUnmodifiableOrEmptySingleton(replicas);
this.activeShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(activeShards);
this.assignedShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedShards);
this.assignedUnpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedUnpromotableShards);
this.unpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(unpromotableShards);
this.allInitializingShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(allInitializingShards);
this.allShardsStarted = allShardsStarted;
Expand Down Expand Up @@ -185,6 +194,15 @@ public List<ShardRouting> assignedShards() {
* @return a {@link List} of shards
*/
public List<ShardRouting> unpromotableShards() {
return this.assignedUnpromotableShards;
}

/**
* Returns a {@link List} of all unpromotable shards, including unassigned shards
*
* @return a {@link List} of shards
*/
public List<ShardRouting> allUnpromotableShards() {
return this.unpromotableShards;
}

Expand Down
Loading