- Notifications
You must be signed in to change notification settings - Fork 25.6k
Add auto force merge functionality to DLM #95204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add auto force merge functionality to DLM #95204
Conversation
| Hi @masseyke, I've created a changelog YAML for you. |
…asticsearch into feature/DLM-auto-force-merge
| Pinging @elastic/es-data-management (Team:Data Management) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this Keith, I've left a few comments I'm curios what you think about
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java Outdated Show resolved Hide resolved
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java Outdated Show resolved Hide resolved
| } | ||
| }; | ||
| | ||
| setForceMergeMarkerBegin(targetIndex, forceMergeRunningListener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to propose we do NOT use a begin marker.
I think this will simplify the design tremendously (i.e. we would not need the unsetForceMergeMarker method at all) and I don't think we need it.
The begin marker is meant to avoid a double execution of forcemerge. But we already have the resultDeduplicator to achieve this already (we can just run the same forcemerge request through it on each DLM run and whilst one is in progress the request will not be (re)executed)
As long as a node stays as master, channelling the forcemerge request through the deduplicator will make sure we don't begin the forcemerge request multiple times for the same index.
On master failover we'll trigger forcemerge again (as we'll have an empty result deduplicator). I think this is fine given we do not forcemerge to 1 segment.
Even if we keep the begin forcemerge cluster marker, on master failover we'll need to re-trigger forcemerge anyway as we wouldn't have any information on the forcemerge operation actually being started (or completed) by the previous master node (we'd just now that there's a marker in the cluster state).
My proposal here is to simply channel the forcemerge request through the result deduplicator.
I'd say we could keep configuring end of forcemerge timestamp value in the cluster state (this will be useful in the future when we add new operations after forcemerge) but at this stage even that could be avoided (if we do decide to keep it I think we should only remove the forcemerge request from the deduplicator after we update the cluster state to include the end of forcemerge timestamp)
What do you think?
@dakrone I'd be interested what you think on this as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting in the begin marker came out of my paranoia from seeing some very long-running (multi day) force merges in production. Those were merges to a single segment, and this is not. But I wasn't really sure how the performance would be with the updated force merge that @jpountz is planning on. If it's always going to be minutes or less then what you're saying makes sense to me. If there's a chance it could be hours or days then I'm not as sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@masseyke I don't think the initial cluster state update to configure the marker would help us more than the deduplicator already does - after a master failover just detecting the cluster state maker from a previous master node that ran DLM would not be able to tell us if forcemerge completed - just as with the deduplicator - so we would have to trigger the forcemerge again anyway via the DLM service running on the new master - just as we'd do had we just used the deduplicator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely have to have some marker in the cluster state right? Unlike a delete (where the index is just gone), there's no way to tell whether an index has been previously force merged. Although maybe there's no harm in force merging an index that has already been force merged on master restart? It would essentially be a no-op, and it might be only a little slower than a cluster state read. I'm leaning toward leaving it though.
I can remove the begin timestamp -- we can always add it back in the future if we need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ the end of operation marker (as a timestamp) makes sense.
However, we should not remove the forcemerge request from the deduplicator until the cluster state update that sets the end of forcemerge timestamp is completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK that's all done now.
I changed it so that it's a counter that's only incremented and written to the cluster state on exception (rather than at the beginning) so that we have fewer cluster state writes (only the completion timestamp in the normal case). |
I added a dedup test that fails without the equals/hascode fixes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for iterating on this Keith.
Left a few suggestions
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java Show resolved Hide resolved
| | ||
| @Override | ||
| public void onFailure(Exception e) { | ||
| if (e instanceof SnapshotInProgressException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think snapshotting an index causes forcemerge to fail (nor vice-versa). Is there a particular failure you have in mind here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was blindly copying the delete code. Now that you mention it, it makes no sense. Oops. I'll remove that.
| * process. And it writes another marker in the cluster state upon completion of the force merge. | ||
| */ | ||
| private void maybeExecuteForceMerge(ClusterState state, DataStream dataStream, Set<Index> indicesToExclude) { | ||
| List<Index> readOnlyIndices = dataStream.getIndices().stream().filter(index -> indicesToExclude.contains(index) == false).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read only indices would imply they have read_only: true. Should we rename this to something else? Maybe just backingIndices?
We could even remove some of the responsibilities of this method and have it just the indices that it attempts to forcemerge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to me in that it's shorthand for "indices that we don't think will be getting any more writes and also aren't in the process of being deleted". I'll shorten it further to indices. :)
| public static final Setting<Integer> DLM_MAX_FORCEMERGE_ERRORS_SETTING = Setting.intSetting( | ||
| "indices.dlm.max_forcemerge_errors", | ||
| 5, | ||
| 1, | ||
| Setting.Property.Dynamic, | ||
| Setting.Property.NodeScope | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest we don't add any error tracking here as we should design a potential back-off on its own as opposed to piggy backing it on this PR (which is already quite large). I don't really think it should be persisted either (I mean it might depend what we end up with in terms of strategy, but DLM should resume once the environmental problems that blocked forcemerge were resolved without the need to bump the value of a setting)
I'd argue we might want to track in-flight requests as opposed to errors, but as I said IMO we should do that separately.
@dakrone would you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I've removed all of that and changed it so that it doesn't log an error if you get the same forcemerge exception repeatedly on an index. It will continue retrying forcemerge on an index indefinitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some more comments, but I like this behavior better, thanks for working on this Keith!
| public void onResponse(ForceMergeResponse forceMergeResponse) { | ||
| logger.info("DLM successfully force merged index [{}]", targetIndex); | ||
| setForceMergeCompletedTimestamp(targetIndex, listener); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can ignore the forceMergeResponse here. It includes the number of total, successful, and failed shards, as well as a list of shard failure exceptions. Perhaps this should only be considered successful if total == successful for the shard counts in the response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good point.
| listener.onFailure(e); | ||
| // To avoid spamming our logs, we only want to log the error once. | ||
| if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) { | ||
| logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can go at warn level personally, since it will be retried.
| new DataLifecycleClusterStateUpdateTask(listener) { | ||
| @Override | ||
| public ClusterState execute(ClusterState currentState) { | ||
| logger.trace("Updating cluster state with force merge complete marker for {}", targetIndex); | ||
| IndexMetadata indexMetadata = currentState.metadata().index(targetIndex); | ||
| Map<String, String> customMetadata = indexMetadata.getCustomData(DLM_CUSTOM_INDEX_METADATA_KEY); | ||
| Map<String, String> newCustomMetadata = new HashMap<>(); | ||
| if (customMetadata != null) { | ||
| newCustomMetadata.putAll(customMetadata); | ||
| } | ||
| newCustomMetadata.put(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, Long.toString(threadPool.absoluteTimeInMillis())); | ||
| IndexMetadata updatededIndexMetadata = new IndexMetadata.Builder(indexMetadata).putCustom( | ||
| DLM_CUSTOM_INDEX_METADATA_KEY, | ||
| newCustomMetadata | ||
| ).build(); | ||
| Metadata metadata = Metadata.builder(currentState.metadata()).put(updatededIndexMetadata, true).build(); | ||
| return ClusterState.builder(currentState).metadata(metadata).build(); | ||
| } | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only has one implementer, so I'm not sure it really needs to be an abstract class. What about making this class concrete, and passing in any pieces that are required (like the targetIndex)? We could rename it to something like UpdateForceMergeCompleteTask to indicate exactly what it does instead of leaving it for generic behavior. What do you think?
That would also mean it could be independently unit tested at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I can change it. It had two implementations until just recently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for iterating on this Keith
This is shaping up really nicely 🚀
I've left a final set of (hopefully) simplifying comments
| long dlmEnabledTime = System.nanoTime(); | ||
| int currentBackingIndexCount = currentGeneration; | ||
| assertBusy(() -> { | ||
| if (TimeValue.timeValueNanos(System.nanoTime()).millis() - TimeValue.timeValueNanos(dlmEnabledTime) | ||
| .millis() > dlmPollInterval.millis()) { | ||
| /* | ||
| * We want to disable DLM immediately after it has run the first time so that it doesn't roll over the latest | ||
| * generation or force merge the second one. | ||
| */ | ||
| disableDLM(); | ||
| } else { | ||
| throw new AssertionError("DLM hasn't run yet so no point in going on"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder if this time math would yield some test flakiness?
Could we keep DLM disabled throughout the test and manually run iterations instead?
e.g.
DataLifecycleService dataLifecycleService = internalCluster().getInstance(DataLifecycleService.class, internalCluster().getMasterName()); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); // run DLM one dataLifecycleService.run(clusterService.state()); There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah good point. I'd been trying to let DLM run "naturally" but there's really no reason for that -- it's not like what I was doing was a lot more realistic than just explicitly calling it.
| private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> clusterStateUpdateTaskQueue; | ||
| | ||
| private static final SimpleBatchedExecutor<UpdateForceMergeCompleteTask, Void> STATE_UPDATE_TASK_EXECUTOR = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These instances are specialised for forcemerge but their names imply generality. Should we rename them to point that they're meant to execute an update related forcemerge only ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, originally they were not specialized but became so over time. I'll update the names.
| this.pollInterval = DLM_POLL_INTERVAL_SETTING.get(settings); | ||
| this.rolloverConfiguration = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING); | ||
| this.clusterStateUpdateTaskQueue = clusterService.createTaskQueue( | ||
| "dlm-clusterstate-update", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "dlm-clusterstate-update", | |
| "dlm-forcemerge-state-update", |
| | ||
| @Override | ||
| public void taskSucceeded(UpdateForceMergeCompleteTask task, Void unused) { | ||
| logger.trace("Updated cluster state for force merge"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include the target index in this log statement?
| * This method force merges the given indices in the datastream that that have not been force merged previously. Before force | ||
| * merging, it writes custom metadata as a marker in the cluster state so that we know a force merge is in | ||
| * process. And it writes another marker in the cluster state upon completion of the force merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * This method force merges the given indices in the datastream that that have not been force merged previously. Before force | |
| * merging, it writes custom metadata as a marker in the cluster state so that we know a force merge is in | |
| * process. And it writes another marker in the cluster state upon completion of the force merge. | |
| * This method force merges the given indices in the datastream. It writes a timestamp in the cluster state upon completion of the force merge. |
| listener.onFailure(new ElasticsearchException(message)); | ||
| logger.warn(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This might be a personal preference, but should we log the message before we notify the listeners? (to avoid the cognitive context switch of "leaving this method" ) Equally, if listener.onFailure fails with an exception we'd never have the logging happen.
I think this could be simplified by doing something like:
if (response.getFailedShards() == 0) { logger.info("DLM successfully force merged index [{}]", targetIndex); setForceMergeCompletedTimestamp(targetIndex, listener); } else { // log a warn message that has the shard failures DefaultShardOperationFailedException[] failures = response.getShardFailures(); String errorMessage = String.format( Locale.ROOT, "DLM failed to forcemerge all shards for inded [%s] due to failures [%s]", indexName, failures == null ? "n/a" : Strings.collectionToDelimitedString( Arrays.stream(failures).map(Strings::toString).collect(Collectors.toList()), "," ) ); logger.warn(errorMessage); // report failure and we'll retry next time DLM runs listener.onFailure(new ElasticsearchException(errorMessage)); } Or something along those lines? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would work if getFailedShards() always returns something > 0 if there was a problem. I am not sure that it does though. I can move the log message to before the onFailure call though. And I can include the DefaultShardOperationFailedExceptions. Or if you can show me that we're guaranteed that getTotalShards() == getSuccessfulShards() if getFailedShards() == 0 then I'd be happy to merge the two blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can get rid of the status check though, since getStatus() is just a check of the number of failed shards:
public RestStatus getStatus() { if (failedShards > 0) { return shardFailures[0].status(); } else { return RestStatus.OK; } } There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this invariant holds
Or if you can show me that we're guaranteed that getTotalShards() == getSuccessfulShards() if getFailedShards() == 0 then I'd be happy to merge the two blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems odd that the response has total, successful, and failed if total is always equal to successful + failed. That was my reasoning behind separating that out. I definitely didn't go into the lucene code to see what really happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coincidentally, ExplainLifecycleDataIT.testExplainLifecycleForIndicesWithErrors() just exposed a case where the number of failed shards is 0, but the total number of shards was not equal to the number of successful shards. I think in that case it is because shards are being allocated. I think in this case we do want it to fail so that we run forcemerge again next time to merge the segments in the shards we missed.
| listener.onFailure(e); | ||
| // To avoid spamming our logs, we only want to log the error once. | ||
| if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) { | ||
| logger.warn( | ||
| () -> Strings.format( | ||
| "DLM encountered an error trying to force merge index [%s]. DLM will attempt to force merge the index on its " | ||
| + "next run.", | ||
| targetIndex | ||
| ), | ||
| e | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like above, maybe a nit, but should we do the logic that pertains to this method before we call listener.onFailure(e) ? (equally, if listener.onFailure fails with an exception we'd never have the logging happen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this Keith 🚀
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a minor comment, but otherwise LGTM also, thanks for iterating on this Keith!
| logger.warn(message); | ||
| listener.onFailure(new ElasticsearchException(message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could replace this with:
| logger.warn(message); | |
| listener.onFailure(new ElasticsearchException(message)); | |
| this.onFailure(new ElasticsearchException(message)); |
And it will do the logging only once, instead of (potentially) spamming with the warning multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the listener doing any logging at all -- it's going to ErrorRecordingActionListener which records the error, but I don't see it actually sending it to a logger, right? I also don't see multiple log messages coming out of DataLifecycleServiceTests::testForceMergeRetries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code:
public void onFailure(Exception e) { String previousError = errorStore.getError(targetIndex); // To avoid spamming our logs, we only want to log the error once. if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) { logger.warn( () -> Strings.format( "DLM encountered an error trying to force merge index [%s]. DLM will attempt to force merge the index on its " + "next run.", targetIndex ), e ); } listener.onFailure(e);It will log it as a warning if the previousError is null or if it isn't the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the listener doing any logging at all
Note that my version is calling this.onFailure(...) not listener.onFailure(...), so it invokes the onFailure that does the only-once logging (and then calls listener.onFailure(...))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! I had missed that you were changing which onFailure was being called! :)
| logger.warn(message); | ||
| listener.onFailure(new ElasticsearchException(message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here with replacing with:
| logger.warn(message); | |
| listener.onFailure(new ElasticsearchException(message)); | |
| this.onFailure(new ElasticsearchException(message)); |
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java Outdated Show resolved Hide resolved
…ervice.java Co-authored-by: Lee Hinman <dakrone@users.noreply.github.com>
…asticsearch into feature/DLM-auto-force-merge
This adds automatic force merging for indices in data streams that are being managed by DLM. The calls to force merge happen if an index (1) is not the current write index, (2) has not been rolled over in this DLM run, (3) and has not been previously force merged. The second requirement is so that we don't force merge an index that is still likely to be writing some data.
We are not specifying a number of segments for the force merge, so the force merge does not actually do a lot right now (because lucene will have been merging segments all along). But it puts us in position to take advantage of planned future work in force merge.
There is no configuration required, and this call to force merge happens automatically every time DLM runs.
Note that right now we are writing a "begin" marker in the cluster state before force merging an index, and a "complete" marker once it is complete. We use the begin marker so that we don't try to force merge an index more than once, since sometimes a force merge can take a significant amount of time. The complete marker is currently unused, but I have put it in because it seems like it could be useful for future alerting ( for example if we detect that some index began force merging X days ago but never completed it we probably need to alert someone that manual intervention is needed).
Relates to #93596