Skip to content

Commit 2cd8b85

Browse files
fix: Numerous publish path performance issues (#998)
* fix: Numerous publish path performance issues All of the uses of DirectExecutor are performing simple data-only transformations and are in the publisher hotpath * 🦉 Updates from OwlBot See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent a9e0870 commit 2cd8b85

File tree

10 files changed

+37
-32
lines changed

10 files changed

+37
-32
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
3232
If you are using Gradle without BOM, add this to your dependencies
3333

3434
```Groovy
35-
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.5'
35+
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.6'
3636
```
3737

3838
If you are using SBT, add this to your dependencies
3939

4040
```Scala
41-
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.5"
41+
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.6"
4242
```
4343

4444
## Authentication

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.cloud.pubsublite.internal.ProxyService;
3030
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
3131
import com.google.common.flogger.GoogleLogger;
32+
import com.google.common.util.concurrent.MoreExecutors;
3233
import com.google.pubsub.v1.PubsubMessage;
3334

3435
// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
@@ -74,6 +75,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
7475
return ApiFutures.transform(
7576
wirePublisher.publish(wireMessage),
7677
MessageMetadata::encode,
77-
SystemExecutors.getFuturesExecutor());
78+
MoreExecutors.directExecutor());
7879
}
7980
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CheckedApiPreconditions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,24 @@ public static void checkArgument(boolean test, String description) throws Checke
3030
if (!test) throw new CheckedApiException(description, Code.INVALID_ARGUMENT);
3131
}
3232

33+
public static void checkArgument(boolean test, String descriptionFormat, Object... args)
34+
throws CheckedApiException {
35+
if (!test)
36+
throw new CheckedApiException(String.format(descriptionFormat, args), Code.INVALID_ARGUMENT);
37+
}
38+
3339
public static void checkState(boolean test) throws CheckedApiException {
3440
checkState(test, "");
3541
}
3642

3743
public static void checkState(boolean test, String description) throws CheckedApiException {
3844
if (!test) throw new CheckedApiException(description, Code.FAILED_PRECONDITION);
3945
}
46+
47+
public static void checkState(boolean test, String descriptionFormat, Object... args)
48+
throws CheckedApiException {
49+
if (!test)
50+
throw new CheckedApiException(
51+
String.format(descriptionFormat, args), Code.FAILED_PRECONDITION);
52+
}
4053
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,27 @@
2222
import com.google.cloud.pubsublite.Partition;
2323
import com.google.common.hash.HashCode;
2424
import com.google.common.hash.Hashing;
25-
import com.google.errorprone.annotations.concurrent.GuardedBy;
2625
import com.google.protobuf.ByteString;
2726
import java.math.BigInteger;
28-
import java.util.Random;
2927
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.concurrent.atomic.AtomicLong;
3029

3130
public class DefaultRoutingPolicy implements RoutingPolicy {
3231
private final long numPartitions;
33-
private final CloseableMonitor monitor = new CloseableMonitor();
3432

35-
@GuardedBy("monitor.monitor")
36-
private long nextWithoutKeyPartition;
33+
// An incrementing counter, when taken mod(numPartitions), gives the partition choice.
34+
private final AtomicLong withoutKeyCounter;
3735

3836
public DefaultRoutingPolicy(long numPartitions) throws ApiException {
3937
checkArgument(numPartitions > 0, "Must have a positive number of partitions.");
4038
this.numPartitions = numPartitions;
41-
this.nextWithoutKeyPartition = ThreadLocalRandom.current().nextLong(numPartitions);
42-
this.nextWithoutKeyPartition = new Random().longs(1, 0, numPartitions).findFirst().getAsLong();
39+
this.withoutKeyCounter = new AtomicLong(ThreadLocalRandom.current().nextLong(numPartitions));
4340
}
4441

4542
@Override
4643
public Partition routeWithoutKey() throws ApiException {
47-
try (CloseableMonitor.Hold h = monitor.enter()) {
48-
Partition toReturn = Partition.of(nextWithoutKeyPartition);
49-
long next = nextWithoutKeyPartition + 1;
50-
next = next % numPartitions;
51-
nextWithoutKeyPartition = next;
52-
return toReturn;
53-
}
44+
long index = withoutKeyCounter.incrementAndGet();
45+
return Partition.of(index % numPartitions);
5446
}
5547

5648
@Override

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.gax.rpc.ApiException;
2222
import com.google.api.gax.rpc.StatusCode.Code;
2323
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
24+
import com.google.common.util.concurrent.MoreExecutors;
2425
import java.util.Optional;
2526
import java.util.concurrent.ExecutionException;
2627
import java.util.function.BiConsumer;
@@ -53,7 +54,7 @@ public static <T> ApiFuture<T> toClientFuture(ApiFuture<T> source) {
5354
source,
5455
Throwable.class,
5556
t -> ApiFutures.immediateFailedFuture(toCanonical(t).underlying),
56-
SystemExecutors.getFuturesExecutor());
57+
MoreExecutors.directExecutor());
5758
}
5859

5960
public static void addFailureHandler(

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberImpl.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,18 +100,16 @@ protected void handleStreamResponse(SubscribeResponse response) throws CheckedAp
100100
private void onMessages(MessageResponse response) throws CheckedApiException {
101101
checkState(
102102
response.getMessagesCount() > 0,
103-
String.format(
104-
"Received an empty MessageResponse on stream with initial request %s.",
105-
initialRequest));
103+
"Received an empty MessageResponse on stream with initial request %s.",
104+
initialRequest);
106105
List<SequencedMessage> messages =
107106
response.getMessagesList().stream()
108107
.map(SequencedMessage::fromProto)
109108
.collect(Collectors.toList());
110109
checkState(
111110
Predicates.isOrdered(messages),
112-
String.format(
113-
"Received out of order messages on the stream with initial request %s.",
114-
initialRequest));
111+
"Received out of order messages on the stream with initial request %s.",
112+
initialRequest);
115113
sendToClient(messages);
116114
}
117115
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,8 @@ public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiExce
6363
: routingPolicy.route(message.key());
6464
checkState(
6565
publishers.containsKey(routedPartition),
66-
String.format(
67-
"Routed to partition %s for which there is no publisher available.",
68-
routedPartition));
66+
"Routed to partition %s for which there is no publisher available.",
67+
routedPartition);
6968
return publishers.get(routedPartition).publish(message);
7069
} catch (Throwable t) {
7170
throw toCanonical(t);

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ public ApiFuture<Offset> publish(Message message) {
234234
ApiService.State currentState = state();
235235
checkState(
236236
currentState == ApiService.State.RUNNING,
237-
String.format("Cannot publish when Publisher state is %s.", currentState.name()));
237+
"Cannot publish when Publisher state is %s.",
238+
currentState.name());
238239
return batcher.add(proto);
239240
} catch (CheckedApiException e) {
240241
onPermanentError(e);

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ public ApiFuture<MessageMetadata> publish(Message message) {
5252
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
5353
checkState(
5454
partitionPublishers.containsKey(routedPartition),
55-
String.format(
56-
"Routed to partition %s for which there is no publisher available.",
57-
routedPartition));
55+
"Routed to partition %s for which there is no publisher available.",
56+
routedPartition);
5857
return partitionPublishers.get(routedPartition).publish(message);
5958
} catch (Throwable t) {
6059
CheckedApiException e = toCanonical(t);

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.pubsublite.Partition;
2626
import com.google.cloud.pubsublite.internal.ProxyService;
2727
import com.google.cloud.pubsublite.internal.Publisher;
28+
import com.google.common.util.concurrent.MoreExecutors;
2829
import java.io.IOException;
2930

3031
public class SinglePartitionPublisher extends ProxyService implements Publisher<MessageMetadata> {
@@ -43,7 +44,7 @@ public ApiFuture<MessageMetadata> publish(Message message) {
4344
return ApiFutures.transform(
4445
publisher.publish(message),
4546
offset -> MessageMetadata.of(partition, offset),
46-
SystemExecutors.getFuturesExecutor());
47+
MoreExecutors.directExecutor());
4748
}
4849

4950
@Override

0 commit comments

Comments
 (0)