Skip to content

Commit 6ab2754

Browse files
fix: Fail early when publishing to already failed publishers (#934)
Also log a warning by default when a publisher client fails. Also make dependencies.sh include googleapis/synthtool#1266
1 parent 5801212 commit 6ab2754

File tree

3 files changed

+33
-6
lines changed

3 files changed

+33
-6
lines changed

.kokoro/dependencies.sh

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,13 @@ function determineMavenOpts() {
3838
| sed -E 's/^(1\.[0-9]\.0).*$/\1/g'
3939
)
4040

41-
case $javaVersion in
42-
"17")
41+
if [[ $javaVersion == 17* ]]
42+
then
4343
# MaxPermSize is no longer supported as of jdk 17
4444
echo -n "-Xmx1024m"
45-
;;
46-
*)
45+
else
4746
echo -n "-Xmx1024m -XX:MaxPermSize=128m"
48-
;;
49-
esac
47+
fi
5048
}
5149

5250
export MAVEN_OPTS=$(determineMavenOpts)

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import com.google.cloud.pubsublite.internal.CheckedApiException;
2929
import com.google.cloud.pubsublite.internal.ProxyService;
3030
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
31+
import com.google.common.flogger.GoogleLogger;
3132
import com.google.pubsub.v1.PubsubMessage;
3233

3334
// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
3435
// publisher. It encodes a MessageMetadata object in the response string.
3536
public class WrappingPublisher extends ProxyService implements Publisher {
37+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
38+
3639
private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
3740
private final MessageTransformer<PubsubMessage, Message> transformer;
3841

@@ -43,11 +46,23 @@ public WrappingPublisher(
4346
super(wirePublisher);
4447
this.wirePublisher = wirePublisher;
4548
this.transformer = transformer;
49+
addListener(
50+
new Listener() {
51+
@Override
52+
public void failed(State from, Throwable failure) {
53+
logger.atWarning().withCause(failure).log(
54+
"Publisher client failed with permanent error.");
55+
}
56+
},
57+
SystemExecutors.getFuturesExecutor());
4658
}
4759

4860
// Publisher implementation.
4961
@Override
5062
public ApiFuture<String> publish(PubsubMessage message) {
63+
if (state().equals(State.FAILED)) {
64+
return ApiFutures.immediateFailedFuture(toCanonical(failureCause()).underlying);
65+
}
5166
Message wireMessage;
5267
try {
5368
wireMessage = transformer.transform(message);

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package com.google.cloud.pubsublite.cloudpubsub.internal;
1818

19+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
1920
import static com.google.common.truth.Truth.assertThat;
21+
import static org.junit.Assert.assertThrows;
2022
import static org.mockito.ArgumentMatchers.any;
2123
import static org.mockito.Mockito.times;
2224
import static org.mockito.Mockito.verify;
@@ -38,6 +40,7 @@
3840
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
3941
import com.google.protobuf.ByteString;
4042
import com.google.pubsub.v1.PubsubMessage;
43+
import java.util.concurrent.ExecutionException;
4144
import org.junit.After;
4245
import org.junit.Before;
4346
import org.junit.Test;
@@ -101,4 +104,15 @@ public void badTimestampCannotBeTransformed() {
101104
ApiExceptionMatcher.assertFutureThrowsCode(published, Code.INVALID_ARGUMENT);
102105
assertThat(publisher.isRunning()).isFalse();
103106
}
107+
108+
@Test
109+
public void publishAfterFailureFailedImmediately() throws Exception {
110+
underlying.fail(new CheckedApiException(Code.FAILED_PRECONDITION));
111+
assertThrows(Throwable.class, publisher::awaitTerminated);
112+
113+
PubsubMessage message = PubsubMessage.newBuilder().setOrderingKey("abc").build();
114+
ApiFuture<String> published = publisher.publish(message);
115+
ExecutionException e = assertThrows(ExecutionException.class, published::get);
116+
assertThat(toCanonical(e).code()).isEqualTo(Code.FAILED_PRECONDITION);
117+
}
104118
}

0 commit comments

Comments
 (0)