Skip to content

Commit ce80b92

Browse files
committed
[CCR] Add total fetch time leader stat (elastic#34577)
Add total fetch time leader stat, that keeps track how much time was spent on fetches from the leader cluster perspective.
1 parent cdf3a7c commit ce80b92

File tree

11 files changed

+80
-21
lines changed

11 files changed

+80
-21
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Arrays;
4242
import java.util.List;
4343
import java.util.Objects;
44+
import java.util.concurrent.TimeUnit;
4445
import java.util.concurrent.TimeoutException;
4546

4647
import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -74,6 +75,8 @@ public static class Request extends SingleShardRequest<Request> {
7475
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
7576
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE;
7677

78+
private long relativeStartNanos;
79+
7780
public Request(ShardId shardId, String expectedHistoryUUID) {
7881
super(shardId.getIndexName());
7982
this.shardId = shardId;
@@ -149,6 +152,9 @@ public void readFrom(StreamInput in) throws IOException {
149152
expectedHistoryUUID = in.readString();
150153
pollTimeout = in.readTimeValue();
151154
maxBatchSize = new ByteSizeValue(in);
155+
156+
// Starting the clock in order to know how much time is spent on fetching operations:
157+
relativeStartNanos = System.nanoTime();
152158
}
153159

154160
@Override
@@ -227,6 +233,12 @@ public Translog.Operation[] getOperations() {
227233
return operations;
228234
}
229235

236+
private long tookInMillis;
237+
238+
public long getTookInMillis() {
239+
return tookInMillis;
240+
}
241+
230242
Response() {
231243
}
232244

@@ -235,13 +247,15 @@ public Translog.Operation[] getOperations() {
235247
final long globalCheckpoint,
236248
final long maxSeqNo,
237249
final long maxSeqNoOfUpdatesOrDeletes,
238-
final Translog.Operation[] operations) {
250+
final Translog.Operation[] operations,
251+
final long tookInMillis) {
239252

240253
this.mappingVersion = mappingVersion;
241254
this.globalCheckpoint = globalCheckpoint;
242255
this.maxSeqNo = maxSeqNo;
243256
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
244257
this.operations = operations;
258+
this.tookInMillis = tookInMillis;
245259
}
246260

247261
@Override
@@ -252,6 +266,7 @@ public void readFrom(final StreamInput in) throws IOException {
252266
maxSeqNo = in.readZLong();
253267
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
254268
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
269+
tookInMillis = in.readVLong();
255270
}
256271

257272
@Override
@@ -262,6 +277,7 @@ public void writeTo(final StreamOutput out) throws IOException {
262277
out.writeZLong(maxSeqNo);
263278
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
264279
out.writeArray(Translog.Operation::writeOperation, operations);
280+
out.writeVLong(tookInMillis);
265281
}
266282

267283
@Override
@@ -273,12 +289,14 @@ public boolean equals(final Object o) {
273289
globalCheckpoint == that.globalCheckpoint &&
274290
maxSeqNo == that.maxSeqNo &&
275291
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
276-
Arrays.equals(operations, that.operations);
292+
Arrays.equals(operations, that.operations) &&
293+
tookInMillis == that.tookInMillis;
277294
}
278295

279296
@Override
280297
public int hashCode() {
281-
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
298+
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
299+
Arrays.hashCode(operations), tookInMillis);
282300
}
283301
}
284302

@@ -322,7 +340,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
322340
request.getMaxBatchSize());
323341
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
324342
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
325-
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
343+
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
326344
}
327345

328346
@Override
@@ -387,7 +405,8 @@ private void globalCheckpointAdvancementFailure(
387405
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
388406
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
389407
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
390-
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
408+
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
409+
request.relativeStartNanos));
391410
} catch (final Exception caught) {
392411
caught.addSuppressed(e);
393412
listener.onFailure(caught);
@@ -473,8 +492,11 @@ static Translog.Operation[] getOperations(
473492
}
474493

475494
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
476-
final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
477-
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
495+
final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
496+
long tookInNanos = System.nanoTime() - relativeStartNanos;
497+
long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
498+
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
499+
operations, tookInMillis);
478500
}
479501

480502
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
7171
private int numConcurrentReads = 0;
7272
private int numConcurrentWrites = 0;
7373
private long currentMappingVersion = 0;
74+
private long totalFetchTookTimeMillis = 0;
7475
private long totalFetchTimeMillis = 0;
7576
private long numberOfSuccessfulFetches = 0;
7677
private long numberOfFailedFetches = 0;
@@ -238,6 +239,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
238239
fetchExceptions.remove(from);
239240
if (response.getOperations().length > 0) {
240241
// do not count polls against fetch stats
242+
totalFetchTookTimeMillis += response.getTookInMillis();
241243
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
242244
numberOfSuccessfulFetches++;
243245
operationsReceived += response.getOperations().length;
@@ -449,6 +451,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
449451
buffer.size(),
450452
currentMappingVersion,
451453
totalFetchTimeMillis,
454+
totalFetchTookTimeMillis,
452455
numberOfSuccessfulFetches,
453456
numberOfFailedFetches,
454457
operationsReceived,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ protected ShardChangesAction.Response createTestInstance() {
2626
leaderGlobalCheckpoint,
2727
leaderMaxSeqNo,
2828
maxSeqNoOfUpdatesOrDeletes,
29-
operations
29+
operations,
30+
randomNonNegativeLong()
3031
);
3132
}
3233

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
158158
final long globalCheckpoint = tracker.getCheckpoint();
159159
final long maxSeqNo = tracker.getMaxSeqNo();
160160
handler.accept(new ShardChangesAction.Response(
161-
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0]));
161+
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L));
162162
}
163163
};
164164
threadPool.generic().execute(task);
@@ -233,7 +233,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
233233
nextGlobalCheckPoint,
234234
nextGlobalCheckPoint,
235235
randomNonNegativeLong(),
236-
ops.toArray(EMPTY))
236+
ops.toArray(EMPTY),
237+
randomNonNegativeLong())
237238
)
238239
);
239240
responses.put(prevGlobalCheckpoint, item);
@@ -256,7 +257,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
256257
prevGlobalCheckpoint,
257258
prevGlobalCheckpoint,
258259
randomNonNegativeLong(),
259-
EMPTY
260+
EMPTY,
261+
randomNonNegativeLong()
260262
);
261263
item.add(new TestResponse(null, mappingVersion, response));
262264
}
@@ -273,7 +275,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
273275
localLeaderGCP,
274276
localLeaderGCP,
275277
randomNonNegativeLong(),
276-
ops.toArray(EMPTY)
278+
ops.toArray(EMPTY),
279+
randomNonNegativeLong()
277280
);
278281
item.add(new TestResponse(null, mappingVersion, response));
279282
responses.put(fromSeqNo, Collections.unmodifiableList(item));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
5656
randomNonNegativeLong(),
5757
randomNonNegativeLong(),
5858
randomNonNegativeLong(),
59+
randomNonNegativeLong(),
5960
randomReadExceptions(),
6061
randomLong(),
6162
randomBoolean() ? new ElasticsearchException("fatal error") : null);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public void testReceiveNothingExpectedSomething() {
439439
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
440440

441441
shardChangesRequests.clear();
442-
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0]));
442+
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L));
443443

444444
assertThat(shardChangesRequests.size(), equalTo(1));
445445
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@@ -782,7 +782,8 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
782782
leaderGlobalCheckpoints.poll(),
783783
maxSeqNos.poll(),
784784
randomNonNegativeLong(),
785-
operations
785+
operations,
786+
1L
786787
);
787788
handler.accept(response);
788789
}
@@ -813,7 +814,8 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro
813814
leaderGlobalCheckPoint,
814815
leaderGlobalCheckPoint,
815816
randomNonNegativeLong(),
816-
ops.toArray(new Translog.Operation[0])
817+
ops.toArray(new Translog.Operation[0]),
818+
1L
817819
);
818820
}
819821

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
431431
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
432432
if (from > seqNoStats.getGlobalCheckpoint()) {
433433
handler.accept(ShardChangesAction.getResponse(1L, seqNoStats,
434-
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
434+
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
435435
return;
436436
}
437437
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
@@ -442,7 +442,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
442442
seqNoStats.getGlobalCheckpoint(),
443443
seqNoStats.getMaxSeqNo(),
444444
maxSeqNoOfUpdatesOrDeletes,
445-
ops
445+
ops,
446+
1L
446447
);
447448
handler.accept(response);
448449
return;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
4949
randomNonNegativeLong(),
5050
randomNonNegativeLong(),
5151
randomNonNegativeLong(),
52+
randomNonNegativeLong(),
5253
Collections.emptyNavigableMap(),
5354
randomLong(),
5455
randomBoolean() ? new ElasticsearchException("fatal error") : null);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public void testToXContent() throws IOException {
9494
final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE);
9595
final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
9696
final long totalFetchTimeMillis = randomLongBetween(0, 4096);
97+
final long totalFetchTookTimeMillis = randomLongBetween(0, 4096);
9798
final long numberOfSuccessfulFetches = randomNonNegativeLong();
9899
final long numberOfFailedFetches = randomLongBetween(0, 8);
99100
final long operationsReceived = randomNonNegativeLong();
@@ -122,6 +123,7 @@ public void testToXContent() throws IOException {
122123
numberOfQueuedWrites,
123124
mappingVersion,
124125
totalFetchTimeMillis,
126+
totalFetchTookTimeMillis,
125127
numberOfSuccessfulFetches,
126128
numberOfFailedFetches,
127129
operationsReceived,
@@ -166,6 +168,7 @@ public void testToXContent() throws IOException {
166168
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
167169
+ "\"mapping_version\":" + mappingVersion + ","
168170
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
171+
+ "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + ","
169172
+ "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
170173
+ "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
171174
+ "\"operations_received\":" + operationsReceived + ","
@@ -208,6 +211,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
208211
1,
209212
1,
210213
100,
214+
50,
211215
10,
212216
0,
213217
10,
@@ -226,7 +230,6 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
226230
Map<String, Object> template =
227231
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
228232
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template);
229-
230233
assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
231234
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
232235
String fieldName = entry.getKey();

0 commit comments

Comments
 (0)