Skip to content

Commit c526235

Browse files
feat: Adding ability to create a subscription at HEAD (#545)
* feat: added ability to create subscriptions at head * fix: change CursorLocation to StartingOffset * fix: update docs * fix: default is end * fix: test name * feat: update starting offset to offset location * fix: update docs * fix: requested changes * fix: changing offset location to backlog location
1 parent c2b5680 commit c526235

File tree

4 files changed

+74
-2
lines changed

4 files changed

+74
-2
lines changed

google-cloud-pubsublite/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
<className>com/google/cloud/pubsublite/PublishMetadata</className>
88
<method>*</method>
99
</difference>
10+
<difference>
11+
<differenceType>7012</differenceType>
12+
<className>com/google/cloud/pubsublite/AdminClient</className>
13+
<method>*</method>
14+
</difference>
1015
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
1116
<difference>
1217
<differenceType>7013</differenceType>

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClient.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
3030
return settings.instantiate();
3131
}
3232

33+
/**
34+
* BacklogLoction refers to a location with respect to the message backlog.
35+
*
36+
* <p>BEGINNING refers to the location of the oldest retained message. END refers to the location
37+
* past all currently published messages, skipping the entire message backlog.
38+
*/
39+
public enum BacklogLocation {
40+
BEGINNING,
41+
END
42+
}
43+
3344
/** The Google Cloud region this client operates on. */
3445
CloudRegion region();
3546

@@ -102,11 +113,27 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
102113
/**
103114
* Create the provided subscription if it does not yet exist.
104115
*
116+
* <p>By default, a new subscription will only receive messages published after the subscription
117+
* was created.
118+
*
119+
* @param subscription The subscription to create.
120+
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
121+
* the subscription on success.
122+
*/
123+
default ApiFuture<Subscription> createSubscription(Subscription subscription) {
124+
return createSubscription(subscription, BacklogLocation.END);
125+
}
126+
127+
/**
128+
* Create the provided subscription at the given starting offset if it does not yet exist.
129+
*
105130
* @param subscription The subscription to create.
131+
* @param startingOffset The offset at which the new subscription will start receiving messages.
106132
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
107133
* the subscription on success.
108134
*/
109-
ApiFuture<Subscription> createSubscription(Subscription subscription);
135+
ApiFuture<Subscription> createSubscription(
136+
Subscription subscription, BacklogLocation startingOffset);
110137

111138
/**
112139
* Get the subscription with id {@code id} if it exists.

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/AdminClientImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiFutures;
2121
import com.google.cloud.pubsublite.AdminClient;
22+
import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
2223
import com.google.cloud.pubsublite.CloudRegion;
2324
import com.google.cloud.pubsublite.LocationPath;
2425
import com.google.cloud.pubsublite.SubscriptionPath;
@@ -136,7 +137,8 @@ public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path)
136137
}
137138

138139
@Override
139-
public ApiFuture<Subscription> createSubscription(Subscription subscription) {
140+
public ApiFuture<Subscription> createSubscription(
141+
Subscription subscription, BacklogLocation startingOffset) {
140142
SubscriptionPath path = SubscriptionPath.parse(subscription.getName());
141143
return serviceClient
142144
.createSubscriptionCallable()
@@ -145,6 +147,7 @@ public ApiFuture<Subscription> createSubscription(Subscription subscription) {
145147
.setParent(path.locationPath().toString())
146148
.setSubscription(subscription)
147149
.setSubscriptionId(path.name().toString())
150+
.setSkipBacklog(startingOffset == BacklogLocation.END)
148151
.build());
149152
}
150153

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/AdminClientImplTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.api.core.ApiFutures;
3030
import com.google.api.gax.rpc.StatusCode.Code;
3131
import com.google.api.gax.rpc.UnaryCallable;
32+
import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
3233
import com.google.cloud.pubsublite.CloudRegion;
3334
import com.google.cloud.pubsublite.CloudZone;
3435
import com.google.cloud.pubsublite.LocationPath;
@@ -372,6 +373,7 @@ public void createSubscription_Ok() throws Exception {
372373
.setParent(subscriptionPath().locationPath().toString())
373374
.setSubscription(SUBSCRIPTION)
374375
.setSubscriptionId(subscriptionName().value())
376+
.setSkipBacklog(true)
375377
.build();
376378

377379
when(createSubscriptionCallable.futureCall(request))
@@ -387,13 +389,48 @@ public void createSubscription_Error() {
387389
.setParent(subscriptionPath().locationPath().toString())
388390
.setSubscription(SUBSCRIPTION)
389391
.setSubscriptionId(subscriptionName().value())
392+
.setSkipBacklog(true)
390393
.build();
391394

392395
when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());
393396

394397
assertFutureThrowsCode(client.createSubscription(SUBSCRIPTION), Code.FAILED_PRECONDITION);
395398
}
396399

400+
@Test
401+
public void createSubscriptionAtBeginning_Ok() throws Exception {
402+
CreateSubscriptionRequest request =
403+
CreateSubscriptionRequest.newBuilder()
404+
.setParent(subscriptionPath().locationPath().toString())
405+
.setSubscription(SUBSCRIPTION)
406+
.setSubscriptionId(subscriptionName().value())
407+
.setSkipBacklog(false)
408+
.build();
409+
410+
when(createSubscriptionCallable.futureCall(request))
411+
.thenReturn(immediateFuture(SUBSCRIPTION_2));
412+
413+
assertThat(client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING).get())
414+
.isEqualTo(SUBSCRIPTION_2);
415+
}
416+
417+
@Test
418+
public void createSubscriptionAtBeginning_Error() throws Exception {
419+
CreateSubscriptionRequest request =
420+
CreateSubscriptionRequest.newBuilder()
421+
.setParent(subscriptionPath().locationPath().toString())
422+
.setSubscription(SUBSCRIPTION)
423+
.setSubscriptionId(subscriptionName().value())
424+
.setSkipBacklog(false)
425+
.build();
426+
427+
when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());
428+
429+
assertFutureThrowsCode(
430+
client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING),
431+
Code.FAILED_PRECONDITION);
432+
}
433+
397434
@Test
398435
public void updateSubscription_Ok() throws Exception {
399436
UpdateSubscriptionRequest request =

0 commit comments

Comments
 (0)