43
43
import java .util .concurrent .atomic .AtomicBoolean ;
44
44
import java .util .concurrent .atomic .AtomicInteger ;
45
45
import java .util .concurrent .atomic .AtomicReference ;
46
- import java .util .function .Consumer ;
47
46
48
47
import static org .hamcrest .Matchers .anEmptyMap ;
49
48
import static org .hamcrest .Matchers .containsString ;
50
49
import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
51
50
import static org .hamcrest .Matchers .not ;
52
51
import static org .mockito .Mockito .mock ;
53
-
54
52
import static org .mockito .Mockito .times ;
55
53
import static org .mockito .Mockito .verify ;
56
54
import static org .mockito .Mockito .when ;
57
55
58
56
public class TransportGetAllocationStatsActionTests extends ESTestCase {
59
57
60
58
private long startTimeMillis ;
61
- private TimeValue cacheTTL ;
59
+ private TimeValue allocationStatsCacheTTL ;
62
60
private ControlledRelativeTimeThreadPool threadPool ;
63
61
private ClusterService clusterService ;
64
62
private TransportService transportService ;
@@ -71,12 +69,14 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
71
69
public void setUp () throws Exception {
72
70
super .setUp ();
73
71
startTimeMillis = 0L ;
74
- cacheTTL = TimeValue .timeValueMinutes (1 );
72
+ allocationStatsCacheTTL = TimeValue .timeValueMinutes (1 );
75
73
threadPool = new ControlledRelativeTimeThreadPool (TransportClusterAllocationExplainActionTests .class .getName (), startTimeMillis );
76
74
clusterService = ClusterServiceUtils .createClusterService (
77
75
threadPool ,
78
76
new ClusterSettings (
79
- Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), cacheTTL .toString ()).build (),
77
+ Settings .builder ()
78
+ .put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), allocationStatsCacheTTL .toString ())
79
+ .build (),
80
80
ClusterSettings .BUILT_IN_CLUSTER_SETTINGS
81
81
)
82
82
);
@@ -107,7 +107,17 @@ public void tearDown() throws Exception {
107
107
transportService .close ();
108
108
}
109
109
110
+ private void disableAllocationStatsCache () {
111
+ setAllocationStatsCacheTTL (TimeValue .ZERO );
112
+ }
113
+
114
+ private void setAllocationStatsCacheTTL (TimeValue ttl ) {
115
+ clusterService .getClusterSettings ()
116
+ .applySettings (Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), ttl .toString ()).build ());
117
+ };
118
+
110
119
public void testReturnsOnlyRequestedStats () throws Exception {
120
+ disableAllocationStatsCache ();
111
121
int expectedNumberOfStatsServiceCalls = 0 ;
112
122
113
123
for (final var metrics : List .of (
@@ -149,6 +159,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
149
159
}
150
160
151
161
public void testDeduplicatesStatsComputations () throws InterruptedException {
162
+ disableAllocationStatsCache ();
152
163
final var requestCounter = new AtomicInteger ();
153
164
final var isExecuting = new AtomicBoolean ();
154
165
when (allocationStatsService .stats ()).thenAnswer (invocation -> {
@@ -205,13 +216,6 @@ public void testGetStatsWithCachingEnabled() throws Exception {
205
216
when (allocationStatsService .stats ()).thenReturn (stats );
206
217
};
207
218
208
- final Consumer <TimeValue > setCacheTTL = (ttl ) -> {
209
- clusterService .getClusterSettings ()
210
- .applySettings (
211
- Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), ttl .toString ()).build ()
212
- );
213
- };
214
-
215
219
final CheckedConsumer <ActionListener <Void >, Exception > threadTask = l -> {
216
220
final var request = new TransportGetAllocationStatsAction .Request (
217
221
TEST_REQUEST_TIMEOUT ,
@@ -233,22 +237,22 @@ public void testGetStatsWithCachingEnabled() throws Exception {
233
237
verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
234
238
235
239
// Force the cached stats to expire.
236
- threadPool .setCurrentTimeInMillis (startTimeMillis + (2 * cacheTTL .getMillis ()));
240
+ threadPool .setCurrentTimeInMillis (startTimeMillis + (2 * allocationStatsCacheTTL .getMillis ()));
237
241
238
242
// Expect a single call to the stats service on the cache miss.
239
243
resetExpectedAllocationStats .run ();
240
244
ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
241
245
verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
242
246
243
247
// Update the TTL setting to disable the cache, we expect a service call each time.
244
- setCacheTTL . accept (TimeValue .ZERO );
248
+ setAllocationStatsCacheTTL (TimeValue .ZERO );
245
249
threadTask .accept (ActionListener .noop ());
246
250
threadTask .accept (ActionListener .noop ());
247
251
numExpectedAllocationStatsServiceCalls += 2 ;
248
252
verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
249
253
250
254
// Re-enable the cache, only one thread should call the stats service.
251
- setCacheTTL . accept (TimeValue .timeValueMinutes (5 ));
255
+ setAllocationStatsCacheTTL (TimeValue .timeValueMinutes (5 ));
252
256
resetExpectedAllocationStats .run ();
253
257
ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
254
258
verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
0 commit comments