- Notifications
You must be signed in to change notification settings - Fork 25.6k
Allows SparseFileTracker to progressively execute listeners during Gap processing #58477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,203 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| | ||
| package org.elasticsearch.index.store.cache; | ||
| | ||
| import org.elasticsearch.action.ActionFuture; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.support.AdapterActionFuture; | ||
| import org.elasticsearch.common.collect.Tuple; | ||
| | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.function.Supplier; | ||
| | ||
| /** | ||
| * An {@link ActionFuture} that listeners can be attached to. Listeners are executed when the future is completed | ||
| * or when a given progress is reached. Progression is updated using the {@link #onProgress(long)} method. | ||
| * | ||
| * Listeners are executed within the thread that triggers the completion, the failure or the progress update and | ||
| * the progress value passed to the listeners on execution is the last updated value. | ||
| */ | ||
| class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> { | ||
| | ||
| protected final long start; | ||
| protected final long end; | ||
| | ||
| // modified under 'this' mutex | ||
| private volatile List<Tuple<Long, ActionListener<Long>>> listeners; | ||
| protected volatile long progress; | ||
| private volatile boolean completed; | ||
| | ||
| /** | ||
| * Creates a {@link ProgressListenableActionFuture} that accepts the progression | ||
| * to be within {@code start} (inclusive) and {@code end} (exclusive) values. | ||
| * | ||
| * @param start the start (inclusive) | ||
| * @param end the end (exclusive) | ||
| */ | ||
| ProgressListenableActionFuture(long start, long end) { | ||
| super(); | ||
| this.start = start; | ||
| this.end = end; | ||
| this.progress = start; | ||
| this.completed = false; | ||
| assert invariant(); | ||
| } | ||
| | ||
| private boolean invariant() { | ||
| assert start < end : start + " < " + end; | ||
| synchronized (this) { | ||
| assert completed == false || listeners == null; | ||
| assert start <= progress : start + " <= " + progress; | ||
| assert progress <= end : progress + " <= " + end; | ||
| assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1()); | ||
| } | ||
| return true; | ||
| } | ||
| | ||
| /** | ||
| * Updates the progress of the current {@link ActionFuture} with the given value, indicating that the range from {@code start} | ||
| * (inclusive) to {@code progress} (exclusive) is available. Calling this method potentially triggers the execution of one or | ||
| * more listeners that are waiting for the progress to reach a value lower than the one just updated. | ||
| * | ||
| * @param value the new progress value | ||
| ||
| */ | ||
| public void onProgress(final long value) { | ||
| ensureNotCompleted(); | ||
| | ||
| if (value <= start) { | ||
| assert false : value + " <= " + start; | ||
| throw new IllegalArgumentException("Cannot update progress with a value less than [start=" + start + ']'); | ||
| } | ||
| if (end < value) { | ||
| assert false : end + " < " + value; | ||
| throw new IllegalArgumentException("Cannot update progress with a value greater than [end=" + end + ']'); | ||
| } | ||
| | ||
| List<ActionListener<Long>> listenersToExecute = null; | ||
| synchronized (this) { | ||
| assert progress < value : progress + " < " + value; | ||
| this.progress = value; | ||
| | ||
| final List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners; | ||
| if (listeners != null) { | ||
| List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null; | ||
| for (Tuple<Long, ActionListener<Long>> listener : listeners) { | ||
| if (value < listener.v1()) { | ||
| if (listenersToKeep == null) { | ||
| listenersToKeep = new ArrayList<>(); | ||
| } | ||
| listenersToKeep.add(listener); | ||
| } else { | ||
| if (listenersToExecute == null) { | ||
| listenersToExecute = new ArrayList<>(); | ||
| } | ||
| listenersToExecute.add(listener.v2()); | ||
| } | ||
| } | ||
| this.listeners = listenersToKeep; | ||
| } | ||
| } | ||
| if (listenersToExecute != null) { | ||
| listenersToExecute.forEach(listener -> executeListener(listener, () -> value)); | ||
| } | ||
| assert invariant(); | ||
| } | ||
| | ||
| @Override | ||
| public void onResponse(Long result) { | ||
| ensureNotCompleted(); | ||
| super.onResponse(result); | ||
| } | ||
| | ||
| @Override | ||
| public void onFailure(Exception e) { | ||
| ensureNotCompleted(); | ||
| super.onFailure(e); | ||
| } | ||
| | ||
| private void ensureNotCompleted() { | ||
| if (completed) { | ||
| throw new IllegalStateException("Future is already completed"); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| protected void done() { | ||
| super.done(); | ||
| final List<Tuple<Long, ActionListener<Long>>> listenersToExecute; | ||
| synchronized (this) { | ||
| completed = true; | ||
| listenersToExecute = this.listeners; | ||
| listeners = null; | ||
| } | ||
| if (listenersToExecute != null) { | ||
| listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, () -> actionGet(0L))); | ||
| } | ||
| assert invariant(); | ||
| } | ||
| | ||
| /** | ||
| * Attach a {@link ActionListener} to the current future. The listener will be executed once the future is completed or once the | ||
| * progress reaches the given {@code value}, whichever comes first. | ||
| * | ||
| * @param listener the {@link ActionListener} to add | ||
| * @param value the value | ||
| */ | ||
| public void addListener(ActionListener<Long> listener, long value) { | ||
| boolean executeImmediate = false; | ||
| synchronized (this) { | ||
| final Long current = progress; | ||
| if (completed || (current != null && value <= current)) { | ||
| executeImmediate = true; | ||
| } else { | ||
| List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners; | ||
| if (listeners == null) { | ||
| listeners = new ArrayList<>(); | ||
| } | ||
| listeners.add(Tuple.tuple(value, listener)); | ||
| this.listeners = listeners; | ||
| } | ||
| } | ||
| if (executeImmediate) { | ||
| executeListener(listener, completed ? () -> actionGet(0L) : () -> value); | ||
| } | ||
| assert invariant(); | ||
| } | ||
| | ||
| private void executeListener(final ActionListener<Long> listener, final Supplier<Long> result) { | ||
| try { | ||
| listener.onResponse(result.get()); | ||
| } catch (Exception e) { | ||
| listener.onFailure(e); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| protected Long convert(Long response) { | ||
| if (response == null || response < start || end < response) { | ||
| assert false : start + " < " + response + " < " + end; | ||
| throw new IllegalArgumentException("Invalid completion value [start=" + start + ",end=" + end + ",response=" + response + ']'); | ||
| } | ||
| return response; | ||
| } | ||
| | ||
| @Override | ||
| public String toString() { | ||
| return "ProgressListenableActionFuture[start=" | ||
| + start | ||
| + ", end=" | ||
| + end | ||
| + ", progress=" | ||
| + progress | ||
| + ", completed=" | ||
| + completed | ||
| + ", listeners=" | ||
| + (listeners != null ? listeners.size() : 0) | ||
| + ']'; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we require
completed == false || progress == endtoo?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so:
completedindicates that the future is done, either with success or failure. In case of failure it could be completed before theprogressreachedend.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes so it does. Can we assert that successful completion only happens with
progress == end?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means to change the
done()method to pass around the completion state (success or failure/cancel) of the future so that we can later compare the successful completion plus theprogress/endvalues. It also means that the progress must be updated in a more strictly manner before completing the future. I gave it a try in a5b29d5, let me know what you think.