Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ CompletableFuture<Integer> fetchRange(
}
ensureOpen();
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
start,
end,
Tuple.tuple(start, end),
Tuple.tuple(start, end), // TODO use progressive sub range to trigger read operations sooner
ActionListener.wrap(
rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
rangeFailure -> future.completeExceptionally(rangeFailure)
Expand All @@ -285,8 +285,8 @@ CompletableFuture<Integer> fetchRange(
for (SparseFileTracker.Gap gap : gaps) {
try {
ensureOpen();
onRangeMissing.accept(gap.start, gap.end);
gap.onResponse(null);
onRangeMissing.accept(gap.start, gap.end); // TODO update progress
gap.onCompletion();
} catch (Exception e) {
gap.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.index.store.cache;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.AdapterActionFuture;
import org.elasticsearch.common.collect.Tuple;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

/**
* An {@link ActionFuture} that listeners can be attached to. Listeners are executed when the future is completed
* or when a given progress is reached. Progression is updated using the {@link #onProgress(long)} method.
*
* Listeners are executed within the thread that triggers the completion, the failure or the progress update and
* the progress value passed to the listeners on execution is the last updated value.
*/
class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> {

protected final long start;
protected final long end;

// modified under 'this' mutex
private volatile List<Tuple<Long, ActionListener<Long>>> listeners;
protected volatile long progress;
private volatile boolean completed;

/**
* Creates a {@link ProgressListenableActionFuture} that accepts the progression
* to be within {@code start} (inclusive) and {@code end} (exclusive) values.
*
* @param start the start (inclusive)
* @param end the end (exclusive)
*/
ProgressListenableActionFuture(long start, long end) {
super();
this.start = start;
this.end = end;
this.progress = start;
this.completed = false;
assert invariant();
}

private boolean invariant() {
assert start < end : start + " < " + end;
synchronized (this) {
assert completed == false || listeners == null;
assert start <= progress : start + " <= " + progress;
assert progress <= end : progress + " <= " + end;
assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we require completed == false || progress == end too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so: completed indicates that the future is done, either with success or failure. In case of failure it could be completed before the progress reached end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes so it does. Can we assert that successful completion only happens with progress == end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that successful completion only happens with progress == end?

It means to change the done() method to pass around the completion state (success or failure/cancel) of the future so that we can later compare the successful completion plus the progress/end values. It also means that the progress must be updated in a more strictly manner before completing the future. I gave it a try in a5b29d5, let me know what you think.

}
return true;
}

/**
* Updates the progress of the current {@link ActionFuture} with the given value, indicating that the range from {@code start}
* (inclusive) to {@code progress} (exclusive) is available. Calling this method potentially triggers the execution of one or
* more listeners that are waiting for the progress to reach a value lower than the one just updated.
*
* @param value the new progress value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be an off-by-one error lurking here. IMO we should report progress of value when the range ⟨start, value⟩ is available, noting that our ranges are inclusive at the start and exclusive at the end. This means I think we can start with progress == start (not null) indicating that the available range is empty, and require start < value below.

Probably a good idea to document this here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; I've been a bit back and forth here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the parameter progress to align it with the {@code progress} in the paragraph above?

*/
public void onProgress(final long value) {
ensureNotCompleted();

if (value <= start) {
assert false : value + " <= " + start;
throw new IllegalArgumentException("Cannot update progress with a value less than [start=" + start + ']');
}
if (end < value) {
assert false : end + " < " + value;
throw new IllegalArgumentException("Cannot update progress with a value greater than [end=" + end + ']');
}

List<ActionListener<Long>> listenersToExecute = null;
synchronized (this) {
assert progress < value : progress + " < " + value;
this.progress = value;

final List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
if (listeners != null) {
List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null;
for (Tuple<Long, ActionListener<Long>> listener : listeners) {
if (value < listener.v1()) {
if (listenersToKeep == null) {
listenersToKeep = new ArrayList<>();
}
listenersToKeep.add(listener);
} else {
if (listenersToExecute == null) {
listenersToExecute = new ArrayList<>();
}
listenersToExecute.add(listener.v2());
}
}
this.listeners = listenersToKeep;
}
}
if (listenersToExecute != null) {
listenersToExecute.forEach(listener -> executeListener(listener, () -> value));
}
assert invariant();
}

@Override
public void onResponse(Long result) {
ensureNotCompleted();
super.onResponse(result);
}

@Override
public void onFailure(Exception e) {
ensureNotCompleted();
super.onFailure(e);
}

private void ensureNotCompleted() {
if (completed) {
throw new IllegalStateException("Future is already completed");
}
}

@Override
protected void done() {
super.done();
final List<Tuple<Long, ActionListener<Long>>> listenersToExecute;
synchronized (this) {
completed = true;
listenersToExecute = this.listeners;
listeners = null;
}
if (listenersToExecute != null) {
listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, () -> actionGet(0L)));
}
assert invariant();
}

/**
* Attach a {@link ActionListener} to the current future. The listener will be executed once the future is completed or once the
* progress reaches the given {@code value}, whichever comes first.
*
* @param listener the {@link ActionListener} to add
* @param value the value
*/
public void addListener(ActionListener<Long> listener, long value) {
boolean executeImmediate = false;
synchronized (this) {
final Long current = progress;
if (completed || (current != null && value <= current)) {
executeImmediate = true;
} else {
List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(Tuple.tuple(value, listener));
this.listeners = listeners;
}
}
if (executeImmediate) {
executeListener(listener, completed ? () -> actionGet(0L) : () -> value);
}
assert invariant();
}

private void executeListener(final ActionListener<Long> listener, final Supplier<Long> result) {
try {
listener.onResponse(result.get());
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
protected Long convert(Long response) {
if (response == null || response < start || end < response) {
assert false : start + " < " + response + " < " + end;
throw new IllegalArgumentException("Invalid completion value [start=" + start + ",end=" + end + ",response=" + response + ']');
}
return response;
}

@Override
public String toString() {
return "ProgressListenableActionFuture[start="
+ start
+ ", end="
+ end
+ ", progress="
+ progress
+ ", completed="
+ completed
+ ", listeners="
+ (listeners != null ? listeners.size() : 0)
+ ']';
}
}
Loading