Skip to content

Conversation

@masseyke
Copy link
Member

@masseyke masseyke commented Apr 12, 2023

This adds automatic force merging for indices in data streams that are being managed by DLM. The calls to force merge happen if an index (1) is not the current write index, (2) has not been rolled over in this DLM run, (3) and has not been previously force merged. The second requirement is so that we don't force merge an index that is still likely to be writing some data.
We are not specifying a number of segments for the force merge, so the force merge does not actually do a lot right now (because lucene will have been merging segments all along). But it puts us in position to take advantage of planned future work in force merge.
There is no configuration required, and this call to force merge happens automatically every time DLM runs.
Note that right now we are writing a "begin" marker in the cluster state before force merging an index, and a "complete" marker once it is complete. We use the begin marker so that we don't try to force merge an index more than once, since sometimes a force merge can take a significant amount of time. The complete marker is currently unused, but I have put it in because it seems like it could be useful for future alerting ( for example if we detect that some index began force merging X days ago but never completed it we probably need to alert someone that manual intervention is needed).

Relates to #93596

@elasticsearchmachine
Copy link
Collaborator

Hi @masseyke, I've created a changelog YAML for you.

@gmarouli gmarouli added v8.9.0 and removed v8.8.0 labels Apr 26, 2023
@masseyke masseyke marked this pull request as ready for review April 26, 2023 22:16
@elasticsearchmachine elasticsearchmachine added the Team:Data Management Meta label for data/management team label Apr 26, 2023
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@masseyke masseyke requested a review from andreidan April 26, 2023 22:16
@masseyke masseyke changed the title Feature/dlm auto force merge Add auto force merge functionality to DLM Apr 27, 2023
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Keith, I've left a few comments I'm curios what you think about

}
};

setForceMergeMarkerBegin(targetIndex, forceMergeRunningListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to propose we do NOT use a begin marker.

I think this will simplify the design tremendously (i.e. we would not need the unsetForceMergeMarker method at all) and I don't think we need it.

The begin marker is meant to avoid a double execution of forcemerge. But we already have the resultDeduplicator to achieve this already (we can just run the same forcemerge request through it on each DLM run and whilst one is in progress the request will not be (re)executed)

As long as a node stays as master, channelling the forcemerge request through the deduplicator will make sure we don't begin the forcemerge request multiple times for the same index.
On master failover we'll trigger forcemerge again (as we'll have an empty result deduplicator). I think this is fine given we do not forcemerge to 1 segment.

Even if we keep the begin forcemerge cluster marker, on master failover we'll need to re-trigger forcemerge anyway as we wouldn't have any information on the forcemerge operation actually being started (or completed) by the previous master node (we'd just now that there's a marker in the cluster state).

My proposal here is to simply channel the forcemerge request through the result deduplicator.

I'd say we could keep configuring end of forcemerge timestamp value in the cluster state (this will be useful in the future when we add new operations after forcemerge) but at this stage even that could be avoided (if we do decide to keep it I think we should only remove the forcemerge request from the deduplicator after we update the cluster state to include the end of forcemerge timestamp)

What do you think?

@dakrone I'd be interested what you think on this as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting in the begin marker came out of my paranoia from seeing some very long-running (multi day) force merges in production. Those were merges to a single segment, and this is not. But I wasn't really sure how the performance would be with the updated force merge that @jpountz is planning on. If it's always going to be minutes or less then what you're saying makes sense to me. If there's a chance it could be hours or days then I'm not as sure.

Copy link
Contributor

@andreidan andreidan Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@masseyke I don't think the initial cluster state update to configure the marker would help us more than the deduplicator already does - after a master failover just detecting the cluster state maker from a previous master node that ran DLM would not be able to tell us if forcemerge completed - just as with the deduplicator - so we would have to trigger the forcemerge again anyway via the DLM service running on the new master - just as we'd do had we just used the deduplicator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely have to have some marker in the cluster state right? Unlike a delete (where the index is just gone), there's no way to tell whether an index has been previously force merged. Although maybe there's no harm in force merging an index that has already been force merged on master restart? It would essentially be a no-op, and it might be only a little slower than a cluster state read. I'm leaning toward leaving it though.
I can remove the begin timestamp -- we can always add it back in the future if we need to.

Copy link
Contributor

@andreidan andreidan May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ the end of operation marker (as a timestamp) makes sense.
However, we should not remove the forcemerge request from the deduplicator until the cluster state update that sets the end of forcemerge timestamp is completed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK that's all done now.

@masseyke masseyke requested review from andreidan and dakrone May 3, 2023 17:49
@masseyke
Copy link
Member Author

masseyke commented May 3, 2023

  • Another possibility would be to keep the "begin" flag, but use it as an "invocation counter" for the number of times the force merge has been invoked

I changed it so that it's a counter that's only incremented and written to the cluster state on exception (rather than at the beginning) so that we have fewer cluster state writes (only the completion timestamp in the normal case).

@masseyke
Copy link
Member Author

masseyke commented May 3, 2023

  • If we do get rid of the beginning flag, maybe we can mock something up to make sure the deduplicator works from a test

I added a dedup test that fails without the equals/hascode fixes.

Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this Keith.

Left a few suggestions


@Override
public void onFailure(Exception e) {
if (e instanceof SnapshotInProgressException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think snapshotting an index causes forcemerge to fail (nor vice-versa). Is there a particular failure you have in mind here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was blindly copying the delete code. Now that you mention it, it makes no sense. Oops. I'll remove that.

* process. And it writes another marker in the cluster state upon completion of the force merge.
*/
private void maybeExecuteForceMerge(ClusterState state, DataStream dataStream, Set<Index> indicesToExclude) {
List<Index> readOnlyIndices = dataStream.getIndices().stream().filter(index -> indicesToExclude.contains(index) == false).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read only indices would imply they have read_only: true. Should we rename this to something else? Maybe just backingIndices?

We could even remove some of the responsibilities of this method and have it just the indices that it attempts to forcemerge?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me in that it's shorthand for "indices that we don't think will be getting any more writes and also aren't in the process of being deleted". I'll shorten it further to indices. :)

Comment on lines 82 to 88
public static final Setting<Integer> DLM_MAX_FORCEMERGE_ERRORS_SETTING = Setting.intSetting(
"indices.dlm.max_forcemerge_errors",
5,
1,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Copy link
Contributor

@andreidan andreidan May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we don't add any error tracking here as we should design a potential back-off on its own as opposed to piggy backing it on this PR (which is already quite large). I don't really think it should be persisted either (I mean it might depend what we end up with in terms of strategy, but DLM should resume once the environmental problems that blocked forcemerge were resolved without the need to bump the value of a setting)

I'd argue we might want to track in-flight requests as opposed to errors, but as I said IMO we should do that separately.

@dakrone would you agree?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I've removed all of that and changed it so that it doesn't log an error if you get the same forcemerge exception repeatedly on an index. It will continue retrying forcemerge on an index indefinitely.

@masseyke masseyke requested a review from andreidan May 5, 2023 16:46
Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some more comments, but I like this behavior better, thanks for working on this Keith!

Comment on lines 454 to 457
public void onResponse(ForceMergeResponse forceMergeResponse) {
logger.info("DLM successfully force merged index [{}]", targetIndex);
setForceMergeCompletedTimestamp(targetIndex, listener);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can ignore the forceMergeResponse here. It includes the number of total, successful, and failed shards, as well as a list of shard failure exceptions. Perhaps this should only be considered successful if total == successful for the shard counts in the response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good point.

listener.onFailure(e);
// To avoid spamming our logs, we only want to log the error once.
if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) {
logger.error(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can go at warn level personally, since it will be retried.

Comment on lines 485 to 503
new DataLifecycleClusterStateUpdateTask(listener) {
@Override
public ClusterState execute(ClusterState currentState) {
logger.trace("Updating cluster state with force merge complete marker for {}", targetIndex);
IndexMetadata indexMetadata = currentState.metadata().index(targetIndex);
Map<String, String> customMetadata = indexMetadata.getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY);
Map<String, String> newCustomMetadata = new HashMap<>();
if (customMetadata != null) {
newCustomMetadata.putAll(customMetadata);
}
newCustomMetadata.put(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, Long.toString(threadPool.absoluteTimeInMillis()));
IndexMetadata updatededIndexMetadata = new IndexMetadata.Builder(indexMetadata).putCustom(
DLM_CUSTOM_INDEX_METADATA_KEY,
newCustomMetadata
).build();
Metadata metadata = Metadata.builder(currentState.metadata()).put(updatededIndexMetadata, true).build();
return ClusterState.builder(currentState).metadata(metadata).build();
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only has one implementer, so I'm not sure it really needs to be an abstract class. What about making this class concrete, and passing in any pieces that are required (like the targetIndex)? We could rename it to something like UpdateForceMergeCompleteTask to indicate exactly what it does instead of leaving it for generic behavior. What do you think?

That would also mean it could be independently unit tested at some point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can change it. It had two implementations until just recently.

@masseyke masseyke requested a review from dakrone May 8, 2023 16:48
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this Keith

This is shaping up really nicely 🚀

I've left a final set of (hopefully) simplifying comments

Comment on lines 323 to 335
long dlmEnabledTime = System.nanoTime();
int currentBackingIndexCount = currentGeneration;
assertBusy(() -> {
if (TimeValue.timeValueNanos(System.nanoTime()).millis() - TimeValue.timeValueNanos(dlmEnabledTime)
.millis() > dlmPollInterval.millis()) {
/*
* We want to disable DLM immediately after it has run the first time so that it doesn't roll over the latest
* generation or force merge the second one.
*/
disableDLM();
} else {
throw new AssertionError("DLM hasn't run yet so no point in going on");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if this time math would yield some test flakiness?

Could we keep DLM disabled throughout the test and manually run iterations instead?

e.g.

 DataLifecycleService dataLifecycleService = internalCluster().getInstance(DataLifecycleService.class, internalCluster().getMasterName()); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); // run DLM one dataLifecycleService.run(clusterService.state()); 
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah good point. I'd been trying to let DLM run "naturally" but there's really no reason for that -- it's not like what I was doing was a lot more realistic than just explicitly calling it.

Comment on lines 105 to 107
private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> clusterStateUpdateTaskQueue;

private static final SimpleBatchedExecutor<UpdateForceMergeCompleteTask, Void> STATE_UPDATE_TASK_EXECUTOR =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These instances are specialised for forcemerge but their names imply generality. Should we rename them to point that they're meant to execute an update related forcemerge only ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, originally they were not specialized but became so over time. I'll update the names.

this.pollInterval = DLM_POLL_INTERVAL_SETTING.get(settings);
this.rolloverConfiguration = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING);
this.clusterStateUpdateTaskQueue = clusterService.createTaskQueue(
"dlm-clusterstate-update",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"dlm-clusterstate-update",
"dlm-forcemerge-state-update",

@Override
public void taskSucceeded(UpdateForceMergeCompleteTask task, Void unused) {
logger.trace("Updated cluster state for force merge");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include the target index in this log statement?

Comment on lines 338 to 340
* This method force merges the given indices in the datastream that that have not been force merged previously. Before force
* merging, it writes custom metadata as a marker in the cluster state so that we know a force merge is in
* process. And it writes another marker in the cluster state upon completion of the force merge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This method force merges the given indices in the datastream that that have not been force merged previously. Before force
* merging, it writes custom metadata as a marker in the cluster state so that we know a force merge is in
* process. And it writes another marker in the cluster state upon completion of the force merge.
* This method force merges the given indices in the datastream. It writes a timestamp in the cluster state upon completion of the force merge.
Comment on lines 462 to 463
listener.onFailure(new ElasticsearchException(message));
logger.warn(message);
Copy link
Contributor

@andreidan andreidan May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This might be a personal preference, but should we log the message before we notify the listeners? (to avoid the cognitive context switch of "leaving this method" ) Equally, if listener.onFailure fails with an exception we'd never have the logging happen.

I think this could be simplified by doing something like:

 if (response.getFailedShards() == 0) { logger.info("DLM successfully force merged index [{}]", targetIndex); setForceMergeCompletedTimestamp(targetIndex, listener); } else { // log a warn message that has the shard failures DefaultShardOperationFailedException[] failures = response.getShardFailures(); String errorMessage = String.format( Locale.ROOT, "DLM failed to forcemerge all shards for inded [%s] due to failures [%s]", indexName, failures == null ? "n/a" : Strings.collectionToDelimitedString( Arrays.stream(failures).map(Strings::toString).collect(Collectors.toList()), "," ) ); logger.warn(errorMessage); // report failure and we'll retry next time DLM runs listener.onFailure(new ElasticsearchException(errorMessage)); } 

Or something along those lines? What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work if getFailedShards() always returns something > 0 if there was a problem. I am not sure that it does though. I can move the log message to before the onFailure call though. And I can include the DefaultShardOperationFailedExceptions. Or if you can show me that we're guaranteed that getTotalShards() == getSuccessfulShards() if getFailedShards() == 0 then I'd be happy to merge the two blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can get rid of the status check though, since getStatus() is just a check of the number of failed shards:

 public RestStatus getStatus() { if (failedShards > 0) { return shardFailures[0].status(); } else { return RestStatus.OK; } } 
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this invariant holds

Or if you can show me that we're guaranteed that getTotalShards() == getSuccessfulShards() if getFailedShards() == 0 then I'd be happy to merge the two blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd that the response has total, successful, and failed if total is always equal to successful + failed. That was my reasoning behind separating that out. I definitely didn't go into the lucene code to see what really happens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coincidentally, ExplainLifecycleDataIT.testExplainLifecycleForIndicesWithErrors() just exposed a case where the number of failed shards is 0, but the total number of shards was not equal to the number of successful shards. I think in that case it is because shards are being allocated. I think in this case we do want it to fail so that we run forcemerge again next time to merge the segments in the shards we missed.

Comment on lines +486 to +498
listener.onFailure(e);
// To avoid spamming our logs, we only want to log the error once.
if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) {
logger.warn(
() -> Strings.format(
"DLM encountered an error trying to force merge index [%s]. DLM will attempt to force merge the index on its "
+ "next run.",
targetIndex
),
e
);
}
}
Copy link
Contributor

@andreidan andreidan May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like above, maybe a nit, but should we do the logic that pertains to this method before we call listener.onFailure(e) ? (equally, if listener.onFailure fails with an exception we'd never have the logging happen)

@masseyke masseyke requested a review from andreidan May 9, 2023 16:13
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Keith 🚀

LGTM

Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a minor comment, but otherwise LGTM also, thanks for iterating on this Keith!

Comment on lines 466 to 467
logger.warn(message);
listener.onFailure(new ElasticsearchException(message));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could replace this with:

Suggested change
logger.warn(message);
listener.onFailure(new ElasticsearchException(message));
this.onFailure(new ElasticsearchException(message));

And it will do the logging only once, instead of (potentially) spamming with the warning multiple times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the listener doing any logging at all -- it's going to ErrorRecordingActionListener which records the error, but I don't see it actually sending it to a logger, right? I also don't see multiple log messages coming out of DataLifecycleServiceTests::testForceMergeRetries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code:

 public void onFailure(Exception e) { String previousError = errorStore.getError(targetIndex); // To avoid spamming our logs, we only want to log the error once. if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) { logger.warn( () -> Strings.format( "DLM encountered an error trying to force merge index [%s]. DLM will attempt to force merge the index on its " + "next run.", targetIndex ), e ); } listener.onFailure(e);

It will log it as a warning if the previousError is null or if it isn't the same

Copy link
Member

@dakrone dakrone May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the listener doing any logging at all

Note that my version is calling this.onFailure(...) not listener.onFailure(...), so it invokes the onFailure that does the only-once logging (and then calls listener.onFailure(...))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I had missed that you were changing which onFailure was being called! :)

Comment on lines 474 to 475
logger.warn(message);
listener.onFailure(new ElasticsearchException(message));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with replacing with:

Suggested change
logger.warn(message);
listener.onFailure(new ElasticsearchException(message));
this.onFailure(new ElasticsearchException(message));
@masseyke masseyke merged commit 6eebbf5 into elastic:main May 11, 2023
@masseyke masseyke deleted the feature/DLM-auto-force-merge branch May 11, 2023 16:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>enhancement Team:Data Management Meta label for data/management team v8.9.0

5 participants