Skip to content

Commit 9fe8b96

Browse files
Handling exceptions on watcher reload (#105442)
1 parent 8ed115a commit 9fe8b96

File tree

5 files changed

+134
-11
lines changed

5 files changed

+134
-11
lines changed

docs/changelog/105442.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 105442
2+
summary: Handling exceptions on watcher reload
3+
area: Watcher
4+
type: bug
5+
issues:
6+
- 69842

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ public void clusterChanged(ClusterChangedEvent event) {
166166
if (watcherService.validate(event.state())) {
167167
previousShardRoutings.set(localAffectedShardRoutings);
168168
if (state.get() == WatcherState.STARTED) {
169-
watcherService.reload(event.state(), "new local watcher shard allocation ids");
169+
watcherService.reload(event.state(), "new local watcher shard allocation ids", (exception) -> {
170+
clearAllocationIds(); // will cause reload again
171+
});
170172
} else if (isStoppedOrStopping) {
171173
this.state.set(WatcherState.STARTING);
172174
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED), (exception) -> {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ void stopExecutor() {
201201
* Reload the watcher service, does not switch the state from stopped to started, just keep going
202202
* @param state cluster state, which is needed to find out about local shards
203203
*/
204-
void reload(ClusterState state, String reason) {
204+
void reload(ClusterState state, String reason, Consumer<Exception> exceptionConsumer) {
205205
boolean hasValidWatcherTemplates = WatcherIndexTemplateRegistry.validate(state);
206206
if (hasValidWatcherTemplates == false) {
207207
logger.warn("missing watcher index templates");
@@ -221,7 +221,10 @@ void reload(ClusterState state, String reason) {
221221
int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {});
222222
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
223223

224-
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e)));
224+
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> {
225+
logger.error("error reloading watcher", e);
226+
exceptionConsumer.accept(e);
227+
}));
225228
}
226229

227230
/**

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,91 @@ public void testExceptionOnStart() {
258258
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STARTED));
259259
}
260260

261+
public void testReloadWithIdenticalRoutingTable() {
262+
/*
263+
* This tests that the identical routing table causes reload only once.
264+
*/
265+
startWatcher();
266+
267+
ClusterChangedEvent[] events = masterChangeScenario();
268+
assertThat(events[1].previousState(), equalTo(events[0].state()));
269+
assertFalse(events[1].routingTableChanged());
270+
271+
for (ClusterChangedEvent event : events) {
272+
when(watcherService.validate(event.state())).thenReturn(true);
273+
lifeCycleService.clusterChanged(event);
274+
}
275+
// reload should occur on the first event
276+
verify(watcherService).reload(eq(events[0].state()), anyString(), any());
277+
// but it shouldn't on the second event unless routing table changes
278+
verify(watcherService, never()).reload(eq(events[1].state()), anyString(), any());
279+
}
280+
281+
public void testReloadWithIdenticalRoutingTableAfterException() {
282+
/*
283+
* This tests that even the identical routing table causes reload again if some exception (for example a timeout while loading
284+
* watches) interrupted the previous one.
285+
*/
286+
startWatcher();
287+
288+
ClusterChangedEvent[] events = masterChangeScenario();
289+
assertThat(events[1].previousState(), equalTo(events[0].state()));
290+
assertFalse(events[1].routingTableChanged());
291+
292+
// simulate exception on the first event
293+
doAnswer(invocation -> {
294+
Consumer<Exception> exceptionConsumer = invocation.getArgument(2);
295+
exceptionConsumer.accept(new ElasticsearchTimeoutException(new TimeoutException("Artificial timeout")));
296+
return null;
297+
}).when(watcherService).reload(eq(events[0].state()), anyString(), any());
298+
299+
for (ClusterChangedEvent event : events) {
300+
when(watcherService.validate(event.state())).thenReturn(true);
301+
lifeCycleService.clusterChanged(event);
302+
}
303+
// reload should occur on the first event but it fails
304+
verify(watcherService).reload(eq(events[0].state()), anyString(), any());
305+
// reload should occur again on the second event because the previous one failed
306+
verify(watcherService).reload(eq(events[1].state()), anyString(), any());
307+
}
308+
309+
private ClusterChangedEvent[] masterChangeScenario() {
310+
DiscoveryNodes nodes = new DiscoveryNodes.Builder().localNodeId("node_1").add(newNode("node_1")).add(newNode("node_2")).build();
311+
312+
Index index = new Index(Watch.INDEX, "uuid");
313+
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
314+
indexRoutingTableBuilder.addShard(
315+
TestShardRouting.newShardRouting(new ShardId(index, 0), "node_1", true, ShardRoutingState.STARTED)
316+
);
317+
RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTableBuilder.build()).build();
318+
319+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(Watch.INDEX)
320+
.settings(settings(IndexVersion.current()).put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format,
321+
// required
322+
.numberOfShards(1)
323+
.numberOfReplicas(0);
324+
Metadata metadata = Metadata.builder()
325+
.put(IndexTemplateMetadata.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
326+
.put(indexMetadataBuilder)
327+
.build();
328+
329+
ClusterState emptyState = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).metadata(metadata).build();
330+
ClusterState stateWithMasterNode1 = ClusterState.builder(new ClusterName("my-cluster"))
331+
.nodes(nodes.withMasterNodeId("node_1"))
332+
.metadata(metadata)
333+
.routingTable(routingTable)
334+
.build();
335+
ClusterState stateWithMasterNode2 = ClusterState.builder(new ClusterName("my-cluster"))
336+
.nodes(nodes.withMasterNodeId("node_2"))
337+
.metadata(metadata)
338+
.routingTable(routingTable)
339+
.build();
340+
341+
return new ClusterChangedEvent[] {
342+
new ClusterChangedEvent("any", stateWithMasterNode1, emptyState),
343+
new ClusterChangedEvent("any", stateWithMasterNode2, stateWithMasterNode1) };
344+
}
345+
261346
public void testNoLocalShards() {
262347
Index watchIndex = new Index(Watch.INDEX, "foo");
263348
ShardId shardId = new ShardId(watchIndex, 0);
@@ -301,7 +386,7 @@ public void testNoLocalShards() {
301386
when(watcherService.validate(eq(clusterStateWithLocalShards))).thenReturn(true);
302387
when(watcherService.validate(eq(clusterStateWithoutLocalShards))).thenReturn(false);
303388
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithLocalShards, clusterStateWithoutLocalShards));
304-
verify(watcherService, times(1)).reload(eq(clusterStateWithLocalShards), eq("new local watcher shard allocation ids"));
389+
verify(watcherService, times(1)).reload(eq(clusterStateWithLocalShards), eq("new local watcher shard allocation ids"), any());
305390
verify(watcherService, times(1)).validate(eq(clusterStateWithLocalShards));
306391
verifyNoMoreInteractions(watcherService);
307392

@@ -380,12 +465,12 @@ public void testReplicaWasAddedOrRemoved() {
380465

381466
when(watcherService.validate(eq(firstEvent.state()))).thenReturn(true);
382467
lifeCycleService.clusterChanged(firstEvent);
383-
verify(watcherService).reload(eq(firstEvent.state()), anyString());
468+
verify(watcherService).reload(eq(firstEvent.state()), anyString(), any());
384469

385470
reset(watcherService);
386471
when(watcherService.validate(eq(secondEvent.state()))).thenReturn(true);
387472
lifeCycleService.clusterChanged(secondEvent);
388-
verify(watcherService).reload(eq(secondEvent.state()), anyString());
473+
verify(watcherService).reload(eq(secondEvent.state()), anyString(), any());
389474
}
390475

391476
// make sure that cluster state changes can be processed on nodes that do not hold data
@@ -425,7 +510,7 @@ public void testNonDataNode() {
425510

426511
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState));
427512
verify(watcherService, times(0)).pauseExecution(any());
428-
verify(watcherService, times(0)).reload(any(), any());
513+
verify(watcherService, times(0)).reload(any(), any(), any());
429514
}
430515

431516
public void testThatMissingWatcherIndexMetadataOnlyResetsOnce() {
@@ -452,7 +537,7 @@ public void testThatMissingWatcherIndexMetadataOnlyResetsOnce() {
452537

453538
// first add the shard allocation ids, by going from empty cs to CS with watcher index
454539
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithWatcherIndex, clusterStateWithoutWatcherIndex));
455-
verify(watcherService).reload(eq(clusterStateWithWatcherIndex), anyString());
540+
verify(watcherService).reload(eq(clusterStateWithWatcherIndex), anyString(), any());
456541

457542
// now remove watches index, and ensure that pausing is only called once, no matter how often called (i.e. each CS update)
458543
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutWatcherIndex, clusterStateWithWatcherIndex));
@@ -577,7 +662,7 @@ public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
577662
when(watcherService.validate(any())).thenReturn(true);
578663
ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState);
579664
lifeCycleService.clusterChanged(event);
580-
verify(watcherService).reload(eq(event.state()), anyString());
665+
verify(watcherService).reload(eq(event.state()), anyString(), any());
581666
}
582667

583668
private void startWatcher() {
@@ -609,7 +694,7 @@ private void startWatcher() {
609694

610695
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
611696
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
612-
verify(watcherService, times(1)).reload(eq(state), anyString());
697+
verify(watcherService, times(1)).reload(eq(state), anyString(), any());
613698
assertThat(lifeCycleService.shardRoutings(), hasSize(1));
614699

615700
// reset the mock, the user has to mock everything themselves again

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import static org.mockito.Mockito.doAnswer;
7878
import static org.mockito.Mockito.mock;
7979
import static org.mockito.Mockito.never;
80+
import static org.mockito.Mockito.spy;
8081
import static org.mockito.Mockito.verify;
8182
import static org.mockito.Mockito.when;
8283

@@ -349,12 +350,38 @@ void stopExecutor() {}
349350
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
350351
csBuilder.metadata(Metadata.builder());
351352

352-
service.reload(csBuilder.build(), "whatever");
353+
service.reload(csBuilder.build(), "whatever", exception -> {});
353354
verify(executionService).clearExecutionsAndQueue(any());
354355
verify(executionService, never()).pause(any());
355356
verify(triggerService).pauseExecution();
356357
}
357358

359+
// the trigger service should not start unless watches are loaded successfully
360+
public void testReloadingWatcherDoesNotStartTriggerServiceIfFailingToLoadWatches() {
361+
ExecutionService executionService = mock(ExecutionService.class);
362+
TriggerService triggerService = mock(TriggerService.class);
363+
WatcherService service = new WatcherService(
364+
Settings.EMPTY,
365+
triggerService,
366+
mock(TriggeredWatchStore.class),
367+
executionService,
368+
mock(WatchParser.class),
369+
client,
370+
EsExecutors.DIRECT_EXECUTOR_SERVICE
371+
) {
372+
@Override
373+
void stopExecutor() {}
374+
};
375+
376+
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
377+
Metadata metadata = spy(Metadata.builder().build());
378+
when(metadata.getIndicesLookup()).thenThrow(RuntimeException.class); // simulate exception in WatcherService's private loadWatches()
379+
380+
service.reload(csBuilder.metadata(metadata).build(), "whatever", exception -> {});
381+
verify(triggerService).pauseExecution();
382+
verify(triggerService, never()).start(any());
383+
}
384+
358385
private static DiscoveryNode newNode() {
359386
return DiscoveryNodeUtils.create("node");
360387
}

0 commit comments

Comments
 (0)