Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/87881.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87881
summary: Make CBE message creation more robust
area: Infra/Circuit Breakers
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,43 @@ public double getOverhead() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name.toLowerCase(Locale.ROOT));
builder.field(Fields.LIMIT, limit);
builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit));
builder.field(Fields.ESTIMATED, estimated);
builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated));
addBytesFieldsSafe(builder, limit, Fields.LIMIT, Fields.LIMIT_HUMAN);
addBytesFieldsSafe(builder, estimated, Fields.ESTIMATED, Fields.ESTIMATED_HUMAN);
builder.field(Fields.OVERHEAD, overhead);
builder.field(Fields.TRIPPED_COUNT, trippedCount);
builder.endObject();
return builder;
}

private void addBytesFieldsSafe(XContentBuilder builder, long bytes, String rawFieldName, String humanFieldName) throws IOException {
builder.field(rawFieldName, bytes);
if (-1L <= bytes) {
builder.field(humanFieldName, new ByteSizeValue(bytes));
} else {
// Something's definitely wrong, maybe a breaker was freed twice? Still, we're just writing out stats here, so we should keep
// going if we're running in production.
assert HierarchyCircuitBreakerService.permitNegativeValues : this;
// noinspection ResultOfMethodCallIgnored - we call toString() to log a warning
toString();
builder.field(humanFieldName, "");
}
}

@Override
public String toString() {
return "["
+ this.name
+ ",limit="
+ this.limit
+ "/"
+ new ByteSizeValue(this.limit)
+ ",estimated="
+ this.estimated
+ "/"
+ new ByteSizeValue(this.estimated)
+ ",overhead="
+ this.overhead
+ ",tripped="
+ this.trippedCount
+ "]";
final var stringBuilder = new StringBuilder();
stringBuilder.append("[");
stringBuilder.append(this.name);
stringBuilder.append(",limit=");
HierarchyCircuitBreakerService.appendBytesSafe(stringBuilder, this.limit);
stringBuilder.append(",estimated=");
HierarchyCircuitBreakerService.appendBytesSafe(stringBuilder, this.estimated);
stringBuilder.append(",overhead=");
stringBuilder.append(this.overhead);
stringBuilder.append(",tripped=");
stringBuilder.append(this.trippedCount);
stringBuilder.append("]");
return stringBuilder.toString();
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.indices.breaker.BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING;
Expand Down Expand Up @@ -397,47 +397,78 @@ public void checkParentLimit(long newBytesReserved, String label) throws Circuit
long parentLimit = this.parentSettings.getLimit();
if (memoryUsed.totalUsage > parentLimit && overLimitStrategy.overLimit(memoryUsed).totalUsage > parentLimit) {
this.parentTripCount.incrementAndGet();
final StringBuilder message = new StringBuilder(
"[parent] Data too large, data for ["
+ label
+ "]"
+ " would be ["
+ memoryUsed.totalUsage
+ "/"
+ new ByteSizeValue(memoryUsed.totalUsage)
+ "]"
+ ", which is larger than the limit of ["
+ parentLimit
+ "/"
+ new ByteSizeValue(parentLimit)
+ "]"
final String messageString = buildParentTripMessage(
newBytesReserved,
label,
memoryUsed,
parentLimit,
this.trackRealMemoryUsage,
this.breakers
);
if (this.trackRealMemoryUsage) {
final long realUsage = memoryUsed.baseUsage;
message.append(", real usage: [");
message.append(realUsage);
message.append("/");
message.append(new ByteSizeValue(realUsage));
message.append("], new bytes reserved: [");
message.append(newBytesReserved);
message.append("/");
message.append(new ByteSizeValue(newBytesReserved));
message.append("]");
}
message.append(", usages [");
message.append(this.breakers.entrySet().stream().map(e -> {
final CircuitBreaker breaker = e.getValue();
final long breakerUsed = (long) (breaker.getUsed() * breaker.getOverhead());
return e.getKey() + "=" + breakerUsed + "/" + new ByteSizeValue(breakerUsed);
}).collect(Collectors.joining(", ")));
message.append("]");
// derive durability of a tripped parent breaker depending on whether the majority of memory tracked by
// child circuit breakers is categorized as transient or permanent.
CircuitBreaker.Durability durability = memoryUsed.transientChildUsage >= memoryUsed.permanentChildUsage
? CircuitBreaker.Durability.TRANSIENT
: CircuitBreaker.Durability.PERMANENT;
logger.debug(() -> format("%s", message.toString()));
throw new CircuitBreakingException(message.toString(), memoryUsed.totalUsage, parentLimit, durability);
logger.debug(() -> format("%s", messageString));
throw new CircuitBreakingException(messageString, memoryUsed.totalUsage, parentLimit, durability);
}
}

// exposed for tests
static String buildParentTripMessage(
long newBytesReserved,
String label,
MemoryUsage memoryUsed,
long parentLimit,
boolean trackRealMemoryUsage,
Map<String, CircuitBreaker> breakers
) {
final var message = new StringBuilder();
message.append("[parent] Data too large, data for [");
message.append(label);
message.append("] would be [");
appendBytesSafe(message, memoryUsed.totalUsage);
message.append("], which is larger than the limit of [");
appendBytesSafe(message, parentLimit);
message.append("]");
if (trackRealMemoryUsage) {
final long realUsage = memoryUsed.baseUsage;
message.append(", real usage: [");
appendBytesSafe(message, realUsage);
message.append("], new bytes reserved: [");
appendBytesSafe(message, newBytesReserved);
message.append("]");
}
message.append(", usages [");
breakers.forEach(new BiConsumer<>() {
private boolean first = true;

@Override
public void accept(String key, CircuitBreaker breaker) {
if (first) {
first = false;
} else {
message.append(", ");
}
message.append(key).append("=");
appendBytesSafe(message, (long) (breaker.getUsed() * breaker.getOverhead()));
}
});
message.append("]");
return message.toString();
}

static void appendBytesSafe(StringBuilder stringBuilder, long bytes) {
stringBuilder.append(bytes);
if (-1L <= bytes) {
stringBuilder.append("/");
stringBuilder.append(new ByteSizeValue(bytes));
} else {
// Something's definitely wrong, maybe a breaker was freed twice? Still, we're just creating an exception message here, so we
// should keep going if we're running in production.
logger.error("negative value in circuit breaker: {}", stringBuilder);
assert permitNegativeValues : stringBuilder.toString();
}
}

Expand Down Expand Up @@ -636,4 +667,7 @@ TimeValue getLockTimeout() {
return lockTimeout;
}
}

// exposed for testing
static boolean permitNegativeValues = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.breaker;

import org.elasticsearch.common.Strings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContentObject;

import static org.hamcrest.Matchers.equalTo;

public class CircuitBreakerStatsTests extends ESTestCase {

public void testStringRepresentations() {
final var circuitBreakerStats = new CircuitBreakerStats("t", 1L, 2L, 1.0, 3L);
assertThat(circuitBreakerStats.toString(), equalTo("[t,limit=1/1b,estimated=2/2b,overhead=1.0,tripped=3]"));
assertThat(toJson(circuitBreakerStats), equalTo("""
{"t":{"limit_size_in_bytes":1,"limit_size":"1b","estimated_size_in_bytes":2,"estimated_size":"2b","overhead":1.0,"tripped":3}}\
"""));
}

public void testStringRepresentationPermitsNegativeOne() {
final var circuitBreakerStats = new CircuitBreakerStats("t", -1L, -1L, 1.0, 3L);
assertThat(circuitBreakerStats.toString(), equalTo("[t,limit=-1/-1b,estimated=-1/-1b,overhead=1.0,tripped=3]"));
assertThat(toJson(circuitBreakerStats), equalTo("""
{"t":{"limit_size_in_bytes":-1,"limit_size":"-1b","estimated_size_in_bytes":-1,"estimated_size":"-1b",\
"overhead":1.0,"tripped":3}}"""));
}

public void testStringRepresentationsWithNegativeStats() {
try {
HierarchyCircuitBreakerService.permitNegativeValues = true;
final var circuitBreakerStats = new CircuitBreakerStats("t", -2L, -3L, 1.0, 3L);
assertThat(circuitBreakerStats.toString(), equalTo("[t,limit=-2,estimated=-3,overhead=1.0,tripped=3]"));
assertThat(toJson(circuitBreakerStats), equalTo("""
{"t":{"limit_size_in_bytes":-2,"limit_size":"","estimated_size_in_bytes":-3,"estimated_size":"",\
"overhead":1.0,"tripped":3}}"""));
} finally {
HierarchyCircuitBreakerService.permitNegativeValues = false;
}
}

private static String toJson(CircuitBreakerStats circuitBreakerStats) {
return Strings.toString((ToXContentObject) (builder, params) -> {
builder.startObject();
circuitBreakerStats.toXContent(builder, params);
builder.endObject();
return builder;
}, false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -25,6 +26,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand All @@ -46,6 +48,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance;

public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
Expand Down Expand Up @@ -873,4 +876,77 @@ public void testApplySettingForUpdatingUseRealMemory() {
);
}
}

public void testBuildParentTripMessage() {
class TestChildCircuitBreaker extends NoopCircuitBreaker {
private final long used;

TestChildCircuitBreaker(long used) {
super("child");
this.used = used;
}

@Override
public long getUsed() {
return used;
}

@Override
public double getOverhead() {
return 1.0;
}
}

assertThat(
HierarchyCircuitBreakerService.buildParentTripMessage(
1L,
"test",
new HierarchyCircuitBreakerService.MemoryUsage(2L, 3L, 4L, 5L),
6L,
false,
Map.of("child", new TestChildCircuitBreaker(7L), "otherChild", new TestChildCircuitBreaker(8L))
),
oneOf(
"[parent] Data too large, data for [test] would be [3/3b], which is larger than the limit of [6/6b], "
+ "usages [child=7/7b, otherChild=8/8b]",
"[parent] Data too large, data for [test] would be [3/3b], which is larger than the limit of [6/6b], "
+ "usages [otherChild=8/8b, child=7/7b]"
)
);

assertThat(
HierarchyCircuitBreakerService.buildParentTripMessage(
1L,
"test",
new HierarchyCircuitBreakerService.MemoryUsage(2L, 3L, 4L, 5L),
6L,
true,
Map.of()
),
equalTo(
"[parent] Data too large, data for [test] would be [3/3b], which is larger than the limit of [6/6b], "
+ "real usage: [2/2b], new bytes reserved: [1/1b], usages []"
)
);

try {
HierarchyCircuitBreakerService.permitNegativeValues = true;
assertThat(
HierarchyCircuitBreakerService.buildParentTripMessage(
-1L,
"test",
new HierarchyCircuitBreakerService.MemoryUsage(-2L, -3L, -4L, -5L),
-6L,
true,
Map.of("child1", new TestChildCircuitBreaker(-7L))
),
equalTo(
"[parent] Data too large, data for [test] would be [-3], which is larger than the limit of [-6], "
+ "real usage: [-2], new bytes reserved: [-1/-1b], usages [child1=-7]"
)
);
} finally {
HierarchyCircuitBreakerService.permitNegativeValues = false;
}
}
}