Skip to content

Conversation

@tlrx
Copy link
Member

@tlrx tlrx commented Jun 24, 2020

Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available.

This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.

This pull request does not modify how CacheFile requests ranges from the SparseFileTracker, this should be done in another pull request. Therefore CacheFile uses a listener's range that is equal to the range to be written and a //TODO has been added.

@tlrx tlrx requested a review from DaveCTurner June 24, 2020 08:10
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore)

@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Jun 24, 2020
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I suggested mainly a few simplifications to the API.

* potentially triggers the execution of one or more listeners that are waiting for the progress
* to reach a value lower than the one just updated.
*
* @param value the new progress value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be an off-by-one error lurking here. IMO we should report progress of value when the range ⟨start, value⟩ is available, noting that our ranges are inclusive at the start and exclusive at the end. This means I think we can start with progress == start (not null) indicating that the available range is empty, and require start < value below.

Probably a good idea to document this here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; I've been a bit back and forth here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the parameter progress to align it with the {@code progress} in the paragraph above?

public List<Gap> waitForRange(final long start, final long end, final ActionListener<Void> listener) {
public List<Gap> waitForRange(
final Tuple<Long, Long> range,
@Nullable final Tuple<Long, Long> subRange,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's require the subrange to be non-null, it's only null in tests AFAICT.

final PlainListenableActionFuture<Void> completionListener;
final ProgressListenableActionFuture completionListener;

Range(long start, long end) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected this constructor to make a completed range whereas in fact we use the other one and pass null as the listener. There's only a couple of call-sites, I'd prefer to inline this to avoid that confusion.

@Override
public void onProgress(long value) {
if (value < start || end < value) {
throw new IllegalArgumentException("Cannot update progress [" + value + "] for gap [" + start + '-' + end + ']');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this shouldn't happen, so should be an assertion. Maybe just remove it since we assert it in onGapProgress anyway.

assert invariant();

final Range range = new Range(start, end, null);
final SortedSet<Range> existingRanges = ranges.tailSet(range);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems strange to have to look up the range corresponding with the gap here, maybe Gap should keep hold of the corresponding Range so it can call the listener directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onSuccess and onFailure methods would also benefit from keeping a reference to the range; the look up should not be necessary there too as a pending Range should not be completed or failed outside of the corresponding Gap. I'm tempted to address this in a follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, a followup is fine. The difference with onSuccess and onFailure is that they also adjust ranges, but they do indeed start with the same kind of lookup as we do here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #58587 for this.

* @param listener the {@link ActionListener} to add
*/
@Override
public void addListener(final ActionListener<Long> listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we drop this (and the implements ListenableActionFuture<Long> that requires it)? I think callers should always specify their target endpoint.

assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
fileContents[Math.toIntExact(i)] = AVAILABLE;
assertTrue(wasNotified.get());
gap.onProgress(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the off-by-one error: when fileContents[i] is available we should do this:

Suggested change
gap.onProgress(i);
gap.onProgress(i + 1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks David! It becomes a tradition (sadly)

@tlrx
Copy link
Member Author

tlrx commented Jun 25, 2020

@DaveCTurner Thanks for your review. I've updated the code, let me know what you think please.

@tlrx tlrx requested a review from DaveCTurner June 25, 2020 11:31
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Production code looks good; I suggested some extra tests.

* potentially triggers the execution of one or more listeners that are waiting for the progress
* to reach a value lower than the one just updated.
*
* @param value the new progress value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the parameter progress to align it with the {@code progress} in the paragraph above?

assert completed == false || listeners == null;
assert start <= progress : start + " <= " + progress;
assert progress <= end : progress + " <= " + end;
assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we require completed == false || progress == end too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so: completed indicates that the future is done, either with success or failure. In case of failure it could be completed before the progress reached end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes so it does. Can we assert that successful completion only happens with progress == end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that successful completion only happens with progress == end?

It means to change the done() method to pass around the completion state (success or failure/cancel) of the future so that we can later compare the successful completion plus the progress/end values. It also means that the progress must be updated in a more strictly manner before completing the future. I gave it a try in a5b29d5, let me know what you think.

assertThat(ise.getMessage(), containsString("Future is already completed"));
}

public void testProgressUpdatesCallsListeners() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have a test that partially completes and then fails and verify that some listeners completed successfully and others failed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added testPartialProgressionThenFailure

assertThat(listenerFailure.get(), nullValue());
}

public void testListenerCalledImmediatelyAfterFailure() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also a test that we call the listener immediately if its progress target is already reached?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added testListenerCalledImmediatelyWhenProgressReached

}

private static ProgressListenableActionFuture randomFuture() {
final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(randomIntBetween(1, 10)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need a random limit on the randomness here; also 1TB should be enough for anyone :)

Suggested change
final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(randomIntBetween(1, 10)));
final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(1));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed :)

@tlrx tlrx requested a review from DaveCTurner June 25, 2020 16:11
@tlrx
Copy link
Member Author

tlrx commented Jun 25, 2020

Thanks for your suggestions David. I've added more tests.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tlrx tlrx merged commit b01322e into elastic:master Jun 26, 2020
@tlrx tlrx deleted the add-progressable-listerner branch June 26, 2020 09:54
@tlrx
Copy link
Member Author

tlrx commented Jun 26, 2020

Thanks David!

tlrx added a commit to tlrx/elasticsearch that referenced this pull request Jun 26, 2020
…p processing (elastic#58477) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from elastic#58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
tlrx added a commit to tlrx/elasticsearch that referenced this pull request Jun 26, 2020
…p processing (elastic#58477) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from elastic#58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
tlrx added a commit that referenced this pull request Jun 26, 2020
…p processing (#58477) (#58585) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
tlrx added a commit that referenced this pull request Jun 26, 2020
…p processing (#58477) (#58584) Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on. Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
tlrx added a commit that referenced this pull request Jun 29, 2020
…nge (#58587) SparseFileTracker.Gap can keep a reference to the corresponding range it is about to fill, it does not need to resolve the range each time onSuccess/onProgress/onFailure are called. Relates #58477
tlrx added a commit to tlrx/elasticsearch that referenced this pull request Jun 29, 2020
…nge (elastic#58587) SparseFileTracker.Gap can keep a reference to the corresponding range it is about to fill, it does not need to resolve the range each time onSuccess/onProgress/onFailure are called. Relates elastic#58477
tlrx added a commit that referenced this pull request Jun 29, 2020
…nge (#58587) (#58666) SparseFileTracker.Gap can keep a reference to the corresponding range it is about to fill, it does not need to resolve the range each time onSuccess/onProgress/onFailure are called. Relates #58477
tlrx added a commit that referenced this pull request Jun 29, 2020
…nge (#58587) (#58665) SparseFileTracker.Gap can keep a reference to the corresponding range it is about to fill, it does not need to resolve the range each time onSuccess/onProgress/onFailure are called. Relates #58477
tlrx added a commit that referenced this pull request Jul 1, 2020
…o be written in cache (#58728) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
tlrx added a commit to tlrx/elasticsearch that referenced this pull request Jul 1, 2020
…o be written in cache (elastic#58728) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in elastic#58477 and it is the last change extracted from elastic#58164. Relates elastic#58164
tlrx added a commit that referenced this pull request Jul 1, 2020
…o be written in cache (#58728) (#58829) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
tlrx added a commit that referenced this pull request Jul 1, 2020
…o be written in cache (#58728) (#58835) This commit changes CacheFile and CachedBlobContainerIndexInput so that the read operations made by these classes are now progressively executed and do not wait for full range to be written in cache. It relies on the change introduced in #58477 and it is the last change extracted from #58164. Relates #58164
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs >enhancement Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v7.8.1 v7.9.0 v8.0.0-alpha1

4 participants