- Notifications
You must be signed in to change notification settings - Fork 25.6k
Allows SparseFileTracker to progressively execute listeners during Gap processing #58477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore) |
DaveCTurner left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I suggested mainly a few simplifications to the API.
| * potentially triggers the execution of one or more listeners that are waiting for the progress | ||
| * to reach a value lower than the one just updated. | ||
| * | ||
| * @param value the new progress value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be an off-by-one error lurking here. IMO we should report progress of value when the range ⟨start, value⟩ is available, noting that our ranges are inclusive at the start and exclusive at the end. This means I think we can start with progress == start (not null) indicating that the available range is empty, and require start < value below.
Probably a good idea to document this here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree; I've been a bit back and forth here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename the parameter progress to align it with the {@code progress} in the paragraph above?
| public List<Gap> waitForRange(final long start, final long end, final ActionListener<Void> listener) { | ||
| public List<Gap> waitForRange( | ||
| final Tuple<Long, Long> range, | ||
| @Nullable final Tuple<Long, Long> subRange, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's require the subrange to be non-null, it's only null in tests AFAICT.
| final PlainListenableActionFuture<Void> completionListener; | ||
| final ProgressListenableActionFuture completionListener; | ||
| | ||
| Range(long start, long end) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expected this constructor to make a completed range whereas in fact we use the other one and pass null as the listener. There's only a couple of call-sites, I'd prefer to inline this to avoid that confusion.
| @Override | ||
| public void onProgress(long value) { | ||
| if (value < start || end < value) { | ||
| throw new IllegalArgumentException("Cannot update progress [" + value + "] for gap [" + start + '-' + end + ']'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this shouldn't happen, so should be an assertion. Maybe just remove it since we assert it in onGapProgress anyway.
| assert invariant(); | ||
| | ||
| final Range range = new Range(start, end, null); | ||
| final SortedSet<Range> existingRanges = ranges.tailSet(range); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems strange to have to look up the range corresponding with the gap here, maybe Gap should keep hold of the corresponding Range so it can call the listener directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onSuccess and onFailure methods would also benefit from keeping a reference to the range; the look up should not be necessary there too as a pending Range should not be completed or failed outside of the corresponding Gap. I'm tempted to address this in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, a followup is fine. The difference with onSuccess and onFailure is that they also adjust ranges, but they do indeed start with the same kind of lookup as we do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #58587 for this.
| * @param listener the {@link ActionListener} to add | ||
| */ | ||
| @Override | ||
| public void addListener(final ActionListener<Long> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we drop this (and the implements ListenableActionFuture<Long> that requires it)? I think callers should always specify their target endpoint.
| assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); | ||
| fileContents[Math.toIntExact(i)] = AVAILABLE; | ||
| assertTrue(wasNotified.get()); | ||
| gap.onProgress(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the off-by-one error: when fileContents[i] is available we should do this:
| gap.onProgress(i); | |
| gap.onProgress(i + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks David! It becomes a tradition (sadly)
| @DaveCTurner Thanks for your review. I've updated the code, let me know what you think please. |
DaveCTurner left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Production code looks good; I suggested some extra tests.
| * potentially triggers the execution of one or more listeners that are waiting for the progress | ||
| * to reach a value lower than the one just updated. | ||
| * | ||
| * @param value the new progress value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename the parameter progress to align it with the {@code progress} in the paragraph above?
| assert completed == false || listeners == null; | ||
| assert start <= progress : start + " <= " + progress; | ||
| assert progress <= end : progress + " <= " + end; | ||
| assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we require completed == false || progress == end too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so: completed indicates that the future is done, either with success or failure. In case of failure it could be completed before the progress reached end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes so it does. Can we assert that successful completion only happens with progress == end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert that successful completion only happens with
progress == end?
It means to change the done() method to pass around the completion state (success or failure/cancel) of the future so that we can later compare the successful completion plus the progress/end values. It also means that the progress must be updated in a more strictly manner before completing the future. I gave it a try in a5b29d5, let me know what you think.
| assertThat(ise.getMessage(), containsString("Future is already completed")); | ||
| } | ||
| | ||
| public void testProgressUpdatesCallsListeners() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also have a test that partially completes and then fails and verify that some listeners completed successfully and others failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added testPartialProgressionThenFailure
| assertThat(listenerFailure.get(), nullValue()); | ||
| } | ||
| | ||
| public void testListenerCalledImmediatelyAfterFailure() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also a test that we call the listener immediately if its progress target is already reached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I added testListenerCalledImmediatelyWhenProgressReached
...s/src/test/java/org/elasticsearch/index/store/cache/ProgressListenableActionFutureTests.java Outdated Show resolved Hide resolved
| } | ||
| | ||
| private static ProgressListenableActionFuture randomFuture() { | ||
| final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(randomIntBetween(1, 10))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we need a random limit on the randomness here; also 1TB should be enough for anyone :)
| final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(randomIntBetween(1, 10))); | |
| final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed :)
...s/src/test/java/org/elasticsearch/index/store/cache/ProgressListenableActionFutureTests.java Show resolved Hide resolved
| Thanks for your suggestions David. I've added more tests. |
DaveCTurner left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| Thanks David! |
…p processing (elastic#58477) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from elastic#58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
…p processing (elastic#58477) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from elastic#58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
…p processing (#58477) (#58585) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
…p processing (#58477) (#58584) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on. Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
…nge (elastic#58587) SparseFileTracker.Gap can keep a reference to the corresponding range it is about to fill, it does not need to resolve the range each time onSuccess/onProgress/onFailure are called. Relates elastic#58477
…o be written in cache (#58728) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
…o be written in cache (elastic#58728) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in elastic#58477 and it is the last change extracted from elastic#58164. Relates elastic#58164
…o be written in cache (#58728) (#58829) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
…o be written in cache (#58728) (#58835) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
Today
SparseFileTrackerallows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available.This pull request is an extract from #58164 which introduces a
ProgressListenableActionFuturethat is used internally bySparseFileTracker. The progressive listenable future allows to register listeners attached toSparseFileTracker.Gapso that they are executed once theGapis completed (with success or failure) or as soon as theGapprogress reaches a given progress value. This progress value is defined when thetracker.waitForRange()method is called; this method has been modified to accept a range and another listener's range to operate on.This pull request does not modify how
CacheFilerequests ranges from theSparseFileTracker, this should be done in another pull request. ThereforeCacheFileuses a listener's range that is equal to the range to be written and a//TODOhas been added.