- Notifications
You must be signed in to change notification settings - Fork 25.6k
Add heap usage estimate to ClusterInfo #128723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
2d60021 542c256 356beb5 81fd063 bc0682c 747b5a2 13d1de8 f4d9db5 bf51e85 c47c0ca 23eb8e6 887bcaf 7275acb 85fd019 f112a3b 3a1ada2 8fa587f 6d4b204 2c42a82 58402bd 63bbea8 0cacdc7 dd73d37 765ade8 e26b62f 55637b6 f4b90b5 0789fef 2d475c8 f56f00e 7b5bf95 08a5ca3 09ca9dc cd6b7e9 5e2cb9f c529194 d831194 b8387bb 26dba4d f15fca2 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -36,7 +36,7 @@ | |
| import static org.elasticsearch.cluster.routing.ShardRouting.newUnassigned; | ||
| import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED; | ||
| import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk; | ||
| import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endArray; | ||
| import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject; | ||
| import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject; | ||
| | ||
| /** | ||
| | @@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { | |
| final Map<ShardId, Long> shardDataSetSizes; | ||
| final Map<NodeAndShard, String> dataPath; | ||
| final Map<NodeAndPath, ReservedSpace> reservedSpace; | ||
| final Map<String, HeapUsage> nodesHeapUsage; | ||
| | ||
| protected ClusterInfo() { | ||
| this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
| this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
| } | ||
| | ||
| /** | ||
| | @@ -71,6 +72,7 @@ protected ClusterInfo() { | |
| * @param shardDataSetSizes a shard id to data set size in bytes mapping per shard | ||
| * @param dataPath the shard routing to datapath mapping | ||
| * @param reservedSpace reserved space per shard broken down by node and data path | ||
| * @param nodesHeapUsage heap usage broken down by node | ||
| * @see #shardIdentifierFromRouting | ||
| */ | ||
| public ClusterInfo( | ||
| | @@ -79,14 +81,16 @@ public ClusterInfo( | |
| Map<String, Long> shardSizes, | ||
| Map<ShardId, Long> shardDataSetSizes, | ||
| Map<NodeAndShard, String> dataPath, | ||
| Map<NodeAndPath, ReservedSpace> reservedSpace | ||
| Map<NodeAndPath, ReservedSpace> reservedSpace, | ||
| Map<String, HeapUsage> nodesHeapUsage | ||
| ) { | ||
| this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); | ||
| this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); | ||
| this.shardSizes = Map.copyOf(shardSizes); | ||
| this.shardDataSetSizes = Map.copyOf(shardDataSetSizes); | ||
| this.dataPath = Map.copyOf(dataPath); | ||
| this.reservedSpace = Map.copyOf(reservedSpace); | ||
| this.nodesHeapUsage = Map.copyOf(nodesHeapUsage); | ||
| } | ||
| | ||
| public ClusterInfo(StreamInput in) throws IOException { | ||
| | @@ -98,6 +102,11 @@ public ClusterInfo(StreamInput in) throws IOException { | |
| ? in.readImmutableMap(NodeAndShard::new, StreamInput::readString) | ||
| : in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString); | ||
| this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new); | ||
| if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { | ||
| this.nodesHeapUsage = in.readImmutableMap(HeapUsage::new); | ||
| } else { | ||
| this.nodesHeapUsage = Map.of(); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| | @@ -112,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException { | |
| out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString); | ||
| } | ||
| out.writeMap(this.reservedSpace); | ||
| if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { | ||
| out.writeMap(this.nodesHeapUsage, StreamOutput::writeWriteable); | ||
| } | ||
| } | ||
| | ||
| /** | ||
| | @@ -191,10 +203,31 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params | |
| } | ||
| return builder.endObject(); // NodeAndPath | ||
| }), | ||
| endArray() // end "reserved_sizes" | ||
| chunk( | ||
| (builder, p) -> builder.endArray() // end "reserved_sizes" | ||
| .startObject("heap_usage") | ||
| ), | ||
| Iterators.map(nodesHeapUsage.entrySet().iterator(), c -> (builder, p) -> { | ||
| builder.startObject(c.getKey()); | ||
| c.getValue().toShortXContent(builder); | ||
| builder.endObject(); | ||
| return builder; | ||
| }), | ||
| endObject() // end "heap_usage" | ||
| ||
| ); | ||
| } | ||
| | ||
| /** | ||
| * Returns a node id to estimated heap usage mapping for all nodes that we have such data for. | ||
| * Note that these estimates should be considered minimums. They may be used to determine whether | ||
| * there IS NOT capacity to do something, but not to determine that there IS capacity to do something. | ||
| * Also note that the map may not be complete, it may contain none, or a subset of the nodes in | ||
| * the cluster at any time. It may also contain entries for nodes that have since left the cluster. | ||
| */ | ||
| public Map<String, HeapUsage> getNodesHeapUsage() { | ||
| return nodesHeapUsage; | ||
| } | ||
| | ||
| /** | ||
| * Returns a node id to disk usage mapping for the path that has the least available space on the node. | ||
| * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. | ||
| | @@ -301,7 +334,8 @@ public String toString() { | |
| | ||
| // exposed for tests, computed here rather than exposing all the collections separately | ||
| int getChunkCount() { | ||
| return leastAvailableSpaceUsage.size() + shardSizes.size() + shardDataSetSizes.size() + dataPath.size() + reservedSpace.size() + 6; | ||
| return leastAvailableSpaceUsage.size() + shardSizes.size() + shardDataSetSizes.size() + dataPath.size() + reservedSpace.size() | ||
| + nodesHeapUsage.size() + 7; | ||
| } | ||
| | ||
| public record NodeAndShard(String nodeId, ShardId shardId) implements Writeable { | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
| | ||
| package org.elasticsearch.cluster; | ||
| | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.common.unit.ByteSizeValue; | ||
| import org.elasticsearch.xcontent.ToXContentFragment; | ||
| import org.elasticsearch.xcontent.XContentBuilder; | ||
| | ||
| import java.io.IOException; | ||
| | ||
| /** | ||
| * Record representing the heap usage for a single cluster node | ||
| */ | ||
| public record HeapUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) implements ToXContentFragment, Writeable { | ||
| ||
| | ||
| public HeapUsage(StreamInput in) throws IOException { | ||
| this(in.readString(), in.readString(), in.readVLong(), in.readVLong()); | ||
| } | ||
| | ||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeString(this.nodeId); | ||
| out.writeString(this.nodeName); | ||
| out.writeVLong(this.totalBytes); | ||
| out.writeVLong(this.freeBytes); | ||
| } | ||
| | ||
| public XContentBuilder toShortXContent(XContentBuilder builder) throws IOException { | ||
| builder.field("node_name", this.nodeName); | ||
| builder.humanReadableField("total_heap_bytes", "total", ByteSizeValue.ofBytes(this.totalBytes)); | ||
| builder.humanReadableField("used_heap_bytes", "used", ByteSizeValue.ofBytes(this.usedBytes())); | ||
| builder.humanReadableField("free_heap_bytes", "free", ByteSizeValue.ofBytes(this.freeBytes)); | ||
| builder.field("free_heap_percent", truncatePercent(this.freeHeapAsPercentage())); | ||
| builder.field("used_heap_percent", truncatePercent(this.usedHeapAsPercentage())); | ||
| return builder; | ||
| } | ||
| | ||
| @Override | ||
| public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
| builder.field("node_id", this.nodeId); | ||
| toShortXContent(builder); | ||
| return builder; | ||
| } | ||
| | ||
| public double freeHeapAsPercentage() { | ||
| return 100.0 * freeBytes / (double) totalBytes; | ||
| } | ||
| | ||
| public double usedHeapAsPercentage() { | ||
| return 100.0 - freeHeapAsPercentage(); | ||
| } | ||
| | ||
| public long usedBytes() { | ||
| return totalBytes - freeBytes; | ||
| } | ||
| | ||
| private static double truncatePercent(double pct) { | ||
| return Math.round(pct * 10.0) / 10.0; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
| | ||
| package org.elasticsearch.cluster; | ||
| | ||
| import org.elasticsearch.action.ActionListener; | ||
| | ||
| import java.util.Map; | ||
| | ||
| public interface HeapUsageSupplier { | ||
| | ||
| /** | ||
| * This will be used when there are no heap usage suppliers available | ||
| */ | ||
| HeapUsageSupplier EMPTY = listener -> listener.onResponse(Map.of()); | ||
| | ||
| /** | ||
| * Get the heap usage for every node in the cluster | ||
| * | ||
| * @param listener The listener which will receive the results | ||
| */ | ||
| void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add comment to say this field is deliberately ignored in
toXContentChunkedso that another reader knows this is intentional and not a bug.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 63bbea8