Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public Executable assemble(List<List<Attribute>> listOfKeys, List<PhysicalPlan>
}
}

return new SampleIterator(new PITAwareQueryClient(session), criteria, cfg.fetchSize());
return new SampleIterator(new PITAwareQueryClient(session), criteria, cfg.fetchSize(), session.circuitBreaker());
}

private HitExtractor timestampExtractor(HitExtractor hitExtractor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
Expand All @@ -30,6 +33,7 @@
import org.elasticsearch.xpack.ql.util.ActionListeners;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -47,19 +51,45 @@ public class SampleIterator implements Executable {

private final QueryClient client;
private final List<SampleCriterion> criteria;
private final Stack<Page> stack = new Stack<>();
final Stack<Page> stack = new Stack<>();
private final int maxCriteria;
private final List<Sample> samples;
final List<Sample> samples;
private final int fetchSize;

private long startTime;

public SampleIterator(QueryClient client, List<SampleCriterion> criteria, int fetchSize) {
// ---------- CIRCUIT BREAKER -----------

/**
* Memory consumption will be calculated every CB_STACK_SIZE_PRECISION hits added to the stack
* ie. the sum of sizes of pages added to the stack
* (not considering stack.pop(), so the number of hits added to the stack is different
* from the number of hits currently present in the stack)
*/
protected static final int CB_STACK_SIZE_PRECISION = 1000;
private static final String CB_COMPLETED_LABEL = "sample_completed";
private static final String CB_INFLIGHT_LABEL = "sample_inflight";
private final CircuitBreaker circuitBreaker;
private long samplesRamBytesUsed = 0;
private long stackRamBytesUsed = 0;
private long totalRamBytesUsed = 0;
/**
Comment on lines +73 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be a practice by now, with ES's heap pages being locked in RAM, but just xxxMemSize might have been a bit less assuming of where that memory is.

* total number of hits (ie. sum of page sizes) added to the stack
* (not considering stack.pop(), so different from current stack size)
*/
private long totalPageSize = 0;
/**
* total number of hits (ie. sum of page sizes) added to the stack when last memory check was executed
*/
private long previousTotalPageSize = 0;

public SampleIterator(QueryClient client, List<SampleCriterion> criteria, int fetchSize, CircuitBreaker circuitBreaker) {
this.client = client;
this.criteria = criteria;
this.maxCriteria = criteria.size();
this.fetchSize = fetchSize;
this.samples = new ArrayList<>();
this.circuitBreaker = circuitBreaker;
}

@Override
Expand All @@ -69,6 +99,7 @@ public void execute(ActionListener<Payload> listener) {
advance(runAfter(listener, () -> {
stack.clear();
samples.clear();
clearCircuitBreaker();
client.close(listener.delegateFailure((l, r) -> {}));
}));
}
Expand Down Expand Up @@ -114,8 +145,8 @@ private void queryForCompositeAggPage(ActionListener<Payload> listener, final Sa
InternalComposite composite = (InternalComposite) a;
log.trace("Found [{}] composite buckets", composite.getBuckets().size());
Page nextPage = new Page(composite, request);
if (nextPage.size() > 0) {
stack.push(nextPage);
if (nextPage.size > 0) {
pushToStack(nextPage);
advance(listener);
} else {
if (stack.size() > 0) {
Expand All @@ -127,6 +158,15 @@ private void queryForCompositeAggPage(ActionListener<Payload> listener, final Sa
}, listener::onFailure));
}

protected void pushToStack(Page nextPage) {
stack.push(nextPage);
totalPageSize += nextPage.size;
if (totalPageSize - previousTotalPageSize >= CB_STACK_SIZE_PRECISION) {
updateMemoryUsage();
previousTotalPageSize = totalPageSize;
}
}

/*
* Creates a _msearch request containing maxCriteria (number of filters in the query) * number_of_join_keys queries.
* For a query with three filters
Expand Down Expand Up @@ -160,7 +200,6 @@ private void finalStep(ActionListener<Payload> listener) {

int initialSize = samples.size();
client.multiQuery(searches, ActionListener.wrap(r -> {
List<List<SearchHit>> finalSamples = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

List<List<SearchHit>> sample = new ArrayList<>(maxCriteria);
MultiSearchResponse.Item[] response = r.getResponses();
int docGroupsCounter = 1;
Expand All @@ -174,7 +213,6 @@ private void finalStep(ActionListener<Payload> listener) {
if (docGroupsCounter == maxCriteria) {
List<SearchHit> match = matchSample(sample, maxCriteria);
if (match != null) {
finalSamples.add(match);
samples.add(new Sample(sampleKeys.get(responseIndex / maxCriteria), match));
}
docGroupsCounter = 1;
Expand All @@ -187,20 +225,30 @@ private void finalStep(ActionListener<Payload> listener) {
log.trace("Final step... found [{}] new Samples", samples.size() - initialSize);
// if this final page is max_page_size in size it means: either it's the last page and it happens to have max_page_size elements
// or it's just not the last page and we should advance
var next = page.size() == fetchSize ? page : stack.pop();
var next = page.size == fetchSize ? page : stack.pop();
log.trace("Final step... getting next page of the " + (next == page ? "current" : "previous") + " page");
nextPage(listener, next);
}, listener::onFailure));
}

private void updateMemoryUsage() {
long newSamplesRamSize = RamUsageEstimator.sizeOfCollection(samples);
addMemory(newSamplesRamSize - samplesRamBytesUsed, CB_COMPLETED_LABEL);
samplesRamBytesUsed = newSamplesRamSize;

long newStackRamSize = RamUsageEstimator.sizeOfCollection(stack);
addMemory(newStackRamSize - stackRamBytesUsed, CB_INFLIGHT_LABEL);
stackRamBytesUsed = newStackRamSize;
}

/*
* Finds the next set of results using the after_key of the previous set of buckets.
* It can go back on previous page(s) until either there are no more results, or it finds a page with an after_key to use.
*/
private void nextPage(ActionListener<Payload> listener, Page page) {
page.request().nextAfter(page.afterKey());
log.trace("Getting next page for page [{}] with afterkey [{}]", page, page.afterKey());
queryForCompositeAggPage(listener, page.request());
page.request.nextAfter(page.afterKey);
log.trace("Getting next page for page [{}] with afterkey [{}]", page, page.afterKey);
queryForCompositeAggPage(listener, page.request);
}

/*
Expand Down Expand Up @@ -288,19 +336,66 @@ private static boolean match(int currentCriterion, List<List<SearchHit>> hits, L
return false;
}

private void addMemory(long bytes, String label) {
totalRamBytesUsed += bytes;
circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label);
}

private void clearCircuitBreaker() {
circuitBreaker.addWithoutBreaking(-totalRamBytesUsed);
stackRamBytesUsed = 0;
samplesRamBytesUsed = 0;
totalRamBytesUsed = 0;
totalPageSize = 0;
previousTotalPageSize = 0;
}

private TimeValue timeTook() {
return new TimeValue(System.currentTimeMillis() - startTime);
}

private record Page(
List<InternalComposite.InternalBucket> hits,
int size,
Map<String, Object> afterKey,
List<String> keys,
SampleQueryRequest request
) {
Page(InternalComposite compositeAgg, SampleQueryRequest request) {
this(compositeAgg.getBuckets(), compositeAgg.getBuckets().size(), compositeAgg.afterKey(), request.keys(), request);
protected static class Page implements Accountable {
final List<InternalComposite.InternalBucket> hits;
final int size;
final Map<String, Object> afterKey;
final List<String> keys;
final SampleQueryRequest request;

long ramBytesUsed = 0;

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Page.class);

// for test purposes only
protected Page(int size) {
hits = null;
this.size = size;
afterKey = null;
keys = null;
request = null;
}

protected Page(InternalComposite compositeAgg, SampleQueryRequest request) {
hits = compositeAgg.getBuckets();
size = compositeAgg.getBuckets().size();
afterKey = compositeAgg.afterKey();
keys = request.keys();
this.request = request;
}

@Override
public long ramBytesUsed() {
if (ramBytesUsed == 0) {
ramBytesUsed = SHALLOW_SIZE;
ramBytesUsed += RamUsageEstimator.sizeOfCollection(hits);
ramBytesUsed += RamUsageEstimator.sizeOfCollection(keys);
ramBytesUsed += RamUsageEstimator.sizeOfMap(afterKey);
}
return ramBytesUsed;
}

@Override
public Collection<Accountable> getChildResources() {
return Accountable.super.getChildResources();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchSortValues;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -20,6 +23,7 @@
import org.elasticsearch.xpack.eql.expression.predicate.operator.comparison.InsensitiveNotEquals;
import org.elasticsearch.xpack.eql.expression.predicate.operator.comparison.InsensitiveWildcardEquals;
import org.elasticsearch.xpack.eql.expression.predicate.operator.comparison.InsensitiveWildcardNotEquals;
import org.elasticsearch.xpack.eql.plugin.EqlPlugin;
import org.elasticsearch.xpack.eql.session.EqlConfiguration;
import org.elasticsearch.xpack.ql.expression.Expression;

Expand Down Expand Up @@ -130,4 +134,17 @@ public static SearchSortValues randomSearchLongSortValues() {
}
return new SearchSortValues(values, sortValueFormats);
}

public static BreakerSettings circuitBreakerSettings(Settings settings) {
return BreakerSettings.updateFromSettings(
new BreakerSettings(
EqlPlugin.CIRCUIT_BREAKER_NAME,
EqlPlugin.CIRCUIT_BREAKER_LIMIT,
EqlPlugin.CIRCUIT_BREAKER_OVERHEAD,
CircuitBreaker.Type.MEMORY,
CircuitBreaker.Durability.TRANSIENT
),
settings
);
}
}
Loading