Skip to content

Commit 6e2ecfa

Browse files
feat: The TopicConfigWatcher polls the topic config and calls a handler whenever it changes
1 parent 1b1e14c commit 6e2ecfa

File tree

4 files changed

+271
-1
lines changed

4 files changed

+271
-1
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal.wire;
17+
18+
import com.google.api.core.ApiService;
19+
import com.google.cloud.pubsublite.proto.Topic;
20+
import java.time.Duration;
21+
import java.util.function.Consumer;
22+
23+
public interface TopicConfigWatcher extends ApiService {
24+
interface Factory {
25+
TopicConfigWatcher New(Consumer<Topic> receiver, Duration period);
26+
}
27+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal.wire;
17+
18+
import com.google.api.core.AbstractApiService;
19+
import com.google.cloud.pubsublite.AdminClient;
20+
import com.google.cloud.pubsublite.TopicPath;
21+
import com.google.cloud.pubsublite.internal.ExtractStatus;
22+
import com.google.cloud.pubsublite.proto.Topic;
23+
import com.google.common.flogger.GoogleLogger;
24+
import java.time.Duration;
25+
import java.util.concurrent.*;
26+
import java.util.function.Consumer;
27+
28+
public class TopicConfigWatcherImpl extends AbstractApiService implements TopicConfigWatcher {
29+
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
30+
private final Duration period;
31+
private final TopicPath topicPath;
32+
private final AdminClient adminClient;
33+
private final ScheduledExecutorService executorService;
34+
private final Consumer<Topic> configReceiver;
35+
36+
private ScheduledFuture<?> topicConfigPoll;
37+
private Topic currentConfig = null;
38+
39+
public static class Factory implements TopicConfigWatcher.Factory {
40+
private final TopicPath topicPath;
41+
private final AdminClient adminClient;
42+
43+
public Factory(TopicPath topicPath, AdminClient adminClient) {
44+
this.topicPath = topicPath;
45+
this.adminClient = adminClient;
46+
}
47+
48+
@Override
49+
public TopicConfigWatcher New(Consumer<Topic> receiver, Duration period) {
50+
return new TopicConfigWatcherImpl(topicPath, adminClient, receiver, period);
51+
}
52+
}
53+
54+
private TopicConfigWatcherImpl(
55+
TopicPath topicPath, AdminClient adminClient, Consumer<Topic> receiver, Duration period) {
56+
this.period = period;
57+
this.topicPath = topicPath;
58+
this.adminClient = adminClient;
59+
this.configReceiver = receiver;
60+
this.executorService = Executors.newSingleThreadScheduledExecutor();
61+
}
62+
63+
private void pollTopicConfig() {
64+
Topic topic;
65+
try {
66+
topic = adminClient.getTopic(topicPath).get();
67+
} catch (InterruptedException | ExecutionException e) {
68+
// If we encounter an exception on our first topic config poll, then fail. We need to fetch
69+
// the topic
70+
// config at least once to start up properly.
71+
if (currentConfig == null) {
72+
notifyFailed(ExtractStatus.toCanonical(e.getCause()));
73+
stop();
74+
}
75+
log.atWarning().withCause(e).log("Failed to refresh topic config");
76+
return;
77+
}
78+
if (currentConfig != null && currentConfig.equals(topic)) {
79+
return;
80+
}
81+
configReceiver.accept(topic);
82+
// Notify started after we successfully receive the config once.
83+
if (currentConfig == null) {
84+
notifyStarted();
85+
}
86+
currentConfig = topic;
87+
}
88+
89+
private void stop() {
90+
topicConfigPoll.cancel(true);
91+
adminClient.close();
92+
}
93+
94+
@Override
95+
protected void doStart() {
96+
topicConfigPoll =
97+
executorService.scheduleAtFixedRate(
98+
this::pollTopicConfig, 0, period.toMillis(), TimeUnit.MILLISECONDS);
99+
}
100+
101+
@Override
102+
protected void doStop() {
103+
try {
104+
stop();
105+
notifyStopped();
106+
} catch (Exception e) {
107+
notifyFailed(e);
108+
}
109+
}
110+
}

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/ApiExceptionMatcher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ public boolean matches(Throwable argument) {
5959

6060
public static void assertFutureThrowsCode(Future<?> f, Code code) {
6161
ExecutionException exception = assertThrows(ExecutionException.class, f::get);
62-
Optional<CheckedApiException> statusOr = ExtractStatus.extract(exception.getCause());
62+
assertThrowableMatches(exception.getCause(), code);
63+
}
64+
65+
public static void assertThrowableMatches(Throwable t, Code code) {
66+
Optional<CheckedApiException> statusOr = ExtractStatus.extract(t);
6367
assertThat(statusOr).isPresent();
6468
assertThat(statusOr.get().code()).isEqualTo(code);
6569
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal.wire;
17+
18+
import static org.junit.Assert.assertThrows;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.MockitoAnnotations.initMocks;
21+
22+
import com.google.api.core.ApiFutures;
23+
import com.google.api.gax.rpc.StatusCode;
24+
import com.google.cloud.pubsublite.*;
25+
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
26+
import com.google.cloud.pubsublite.internal.CheckedApiException;
27+
import com.google.cloud.pubsublite.proto.Topic;
28+
import java.time.Duration;
29+
import java.util.function.Consumer;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.junit.runner.RunWith;
33+
import org.junit.runners.JUnit4;
34+
import org.mockito.Mock;
35+
import org.mockito.Mockito;
36+
37+
@RunWith(JUnit4.class)
38+
public class TopicConfigWatcherImplTest {
39+
private static final CloudRegion REGION = CloudRegion.of("us-east1");
40+
41+
private static TopicPath path() {
42+
return TopicPath.newBuilder()
43+
.setName(TopicName.of("a"))
44+
.setProject(ProjectNumber.of(4))
45+
.setLocation(CloudZone.of(REGION, 'a'))
46+
.build();
47+
}
48+
49+
TopicConfigWatcher.Factory watcherFactory;
50+
@Mock AdminClient mockClient;
51+
@Mock Consumer<Topic> mockConsumer;
52+
53+
@Before
54+
public void setUp() {
55+
initMocks(this);
56+
watcherFactory = new TopicConfigWatcherImpl.Factory(path(), mockClient);
57+
}
58+
59+
@Test
60+
public void testFirstCallFails() {
61+
when(mockClient.getTopic(path()))
62+
.thenReturn(
63+
ApiFutures.immediateFailedFuture(
64+
new CheckedApiException(StatusCode.Code.FAILED_PRECONDITION).underlying));
65+
TopicConfigWatcher watcher = watcherFactory.New(mockConsumer, Duration.ofMinutes(1));
66+
watcher.startAsync();
67+
assertThrows(IllegalStateException.class, watcher::awaitTerminated);
68+
ApiExceptionMatcher.assertThrowableMatches(
69+
watcher.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
70+
verify(mockClient, times(1)).getTopic(path());
71+
}
72+
73+
@Test
74+
public void testCallsHandlerOnStart() {
75+
Topic topic = Topic.getDefaultInstance();
76+
when(mockClient.getTopic(path())).thenReturn(ApiFutures.immediateFuture(topic));
77+
TopicConfigWatcher watcher = watcherFactory.New(mockConsumer, Duration.ofMinutes(1));
78+
watcher.startAsync();
79+
verify(mockConsumer, after(1000)).accept(topic);
80+
verifyNoMoreInteractions(mockConsumer);
81+
}
82+
83+
@Test
84+
public void testHandlerCalledOnUpdates() {
85+
Topic topic1 = Topic.getDefaultInstance();
86+
Topic topic2 = Topic.newBuilder().setName("foo").build();
87+
when(mockClient.getTopic(path()))
88+
.thenReturn(ApiFutures.immediateFuture(topic1))
89+
.thenReturn(ApiFutures.immediateFuture(topic1))
90+
.thenReturn(ApiFutures.immediateFuture(topic2));
91+
TopicConfigWatcher watcher = watcherFactory.New(mockConsumer, Duration.ofMillis(10));
92+
watcher.startAsync();
93+
verify(mockClient, after(1000).atLeast(3)).getTopic(path());
94+
verify(mockConsumer, after(1000)).accept(topic1);
95+
verify(mockConsumer, after(1000)).accept(topic2);
96+
verifyNoMoreInteractions(mockConsumer);
97+
}
98+
99+
@Test
100+
public void testFailuresAfterFirstSuccessIgnored() {
101+
Topic topic1 = Topic.getDefaultInstance();
102+
Topic topic2 = Topic.newBuilder().setName("foo").build();
103+
when(mockClient.getTopic(path()))
104+
.thenReturn(ApiFutures.immediateFuture(topic1))
105+
.thenReturn(
106+
ApiFutures.immediateFailedFuture(
107+
new CheckedApiException(StatusCode.Code.FAILED_PRECONDITION)))
108+
.thenReturn(ApiFutures.immediateFuture(topic2));
109+
TopicConfigWatcher watcher = watcherFactory.New(mockConsumer, Duration.ofMillis(10));
110+
watcher.startAsync();
111+
verify(mockClient, after(1000).atLeast(3)).getTopic(path());
112+
verify(mockConsumer, after(1000)).accept(topic1);
113+
verify(mockConsumer, after(1000)).accept(topic2);
114+
verifyNoMoreInteractions(mockConsumer);
115+
}
116+
117+
@Test
118+
public void testStopPreventsFutureCalls() {
119+
Topic topic1 = Topic.getDefaultInstance();
120+
when(mockClient.getTopic(path())).thenReturn(ApiFutures.immediateFuture(topic1));
121+
TopicConfigWatcher watcher = watcherFactory.New(mockConsumer, Duration.ofMillis(10));
122+
watcher.startAsync();
123+
watcher.stopAsync();
124+
watcher.awaitTerminated();
125+
verify(mockClient, after(1000).atLeast(1)).getTopic(path());
126+
Mockito.reset(mockClient);
127+
verify(mockClient, after(20).never()).getTopic(any());
128+
}
129+
}

0 commit comments

Comments
 (0)