Skip to content

Commit c3086d9

Browse files
tengzhongerTeng Zhong
andauthored
feat: Change CDC related APIs to return ByteStringRange instead of Ro… (#1355)
* feat: Change CDC related APIs to return ByteStringRange instead of RowRange 1. GenerateInitialChangeStreamPartitions 2. ChangeStreamContinuationToken::GetRowRange * fix: Fix tests * fix: Address comments Co-authored-by: Teng Zhong <tengzhong@google.com>
1 parent 2a4e786 commit c3086d9

13 files changed

+134
-91
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
import com.google.api.gax.rpc.ServerStream;
3030
import com.google.api.gax.rpc.ServerStreamingCallable;
3131
import com.google.api.gax.rpc.UnaryCallable;
32-
import com.google.bigtable.v2.RowRange;
3332
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
3433
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
3534
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
3635
import com.google.cloud.bigtable.data.v2.models.Filters;
3736
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
3837
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
3938
import com.google.cloud.bigtable.data.v2.models.Query;
39+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
4040
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
4141
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
4242
import com.google.cloud.bigtable.data.v2.models.Row;
@@ -1503,11 +1503,11 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
15031503
* String tableId = "[TABLE]";
15041504
*
15051505
* try {
1506-
* ServerStream<RowRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
1506+
* ServerStream<ByteStringRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
15071507
* int count = 0;
15081508
*
15091509
* // Iterator style
1510-
* for (RowRange partition : stream) {
1510+
* for (ByteStringRange partition : stream) {
15111511
* if (++count > 10) {
15121512
* stream.cancel();
15131513
* break;
@@ -1525,7 +1525,7 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
15251525
* @see ServerStreamingCallable For call styles.
15261526
*/
15271527
@InternalApi("Used in Changestream beam pipeline.")
1528-
public ServerStream<RowRange> generateInitialChangeStreamPartitions(String tableId) {
1528+
public ServerStream<ByteStringRange> generateInitialChangeStreamPartitions(String tableId) {
15291529
return generateInitialChangeStreamPartitionsCallable().call(tableId);
15301530
}
15311531

@@ -1545,7 +1545,7 @@ public ServerStream<RowRange> generateInitialChangeStreamPartitions(String table
15451545
* public void onStart(StreamController controller) {
15461546
* this.controller = controller;
15471547
* }
1548-
* public void onResponse(RowRange partition) {
1548+
* public void onResponse(ByteStringRange partition) {
15491549
* if (++count > 10) {
15501550
* controller.cancel();
15511551
* return;
@@ -1568,7 +1568,7 @@ public ServerStream<RowRange> generateInitialChangeStreamPartitions(String table
15681568
*/
15691569
@InternalApi("Used in Changestream beam pipeline.")
15701570
public void generateInitialChangeStreamPartitionsAsync(
1571-
String tableId, ResponseObserver<RowRange> observer) {
1571+
String tableId, ResponseObserver<ByteStringRange> observer) {
15721572
generateInitialChangeStreamPartitionsCallable().call(tableId, observer);
15731573
}
15741574

@@ -1584,7 +1584,7 @@ public void generateInitialChangeStreamPartitionsAsync(
15841584
*
15851585
* // Iterator style
15861586
* try {
1587-
* for(RowRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
1587+
* for(ByteStringRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
15881588
* // Do something with partition
15891589
* }
15901590
* } catch (NotFoundException e) {
@@ -1595,18 +1595,18 @@ public void generateInitialChangeStreamPartitionsAsync(
15951595
*
15961596
* // Sync style
15971597
* try {
1598-
* List<RowRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
1598+
* List<ByteStringRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
15991599
* } catch (NotFoundException e) {
16001600
* System.out.println("Tried to read a non-existent table");
16011601
* } catch (RuntimeException e) {
16021602
* e.printStackTrace();
16031603
* }
16041604
*
16051605
* // Point look up
1606-
* ApiFuture<RowRange> partitionFuture =
1606+
* ApiFuture<ByteStringRange> partitionFuture =
16071607
* bigtableDataClient.generateInitialChangeStreamPartitionsCallable().first().futureCall(tableId);
16081608
*
1609-
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<RowRange>() {
1609+
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<ByteStringRange>() {
16101610
* public void onFailure(Throwable t) {
16111611
* if (t instanceof NotFoundException) {
16121612
* System.out.println("Tried to read a non-existent table");
@@ -1626,7 +1626,8 @@ public void generateInitialChangeStreamPartitionsAsync(
16261626
* @see ServerStreamingCallable For call styles.
16271627
*/
16281628
@InternalApi("Used in Changestream beam pipeline.")
1629-
public ServerStreamingCallable<String, RowRange> generateInitialChangeStreamPartitionsCallable() {
1629+
public ServerStreamingCallable<String, ByteStringRange>
1630+
generateInitialChangeStreamPartitionsCallable() {
16301631
return stub.generateInitialChangeStreamPartitionsCallable();
16311632
}
16321633

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,13 @@ public ChangeStreamContinuationToken(
5454
.build();
5555
}
5656

57-
// TODO: Change this to return ByteStringRange.
58-
public RowRange getRowRange() {
59-
return this.tokenProto.getPartition().getRowRange();
57+
/**
58+
* Get the partition of the current continuation token, represented by a {@link ByteStringRange}.
59+
*/
60+
public ByteStringRange getPartition() {
61+
return ByteStringRange.create(
62+
this.tokenProto.getPartition().getRowRange().getStartKeyClosed(),
63+
this.tokenProto.getPartition().getRowRange().getEndKeyOpen());
6064
}
6165

6266
public String getToken() {
@@ -95,19 +99,19 @@ public boolean equals(Object o) {
9599
return false;
96100
}
97101
ChangeStreamContinuationToken otherToken = (ChangeStreamContinuationToken) o;
98-
return Objects.equal(getRowRange(), otherToken.getRowRange())
102+
return Objects.equal(getPartition(), otherToken.getPartition())
99103
&& Objects.equal(getToken(), otherToken.getToken());
100104
}
101105

102106
@Override
103107
public int hashCode() {
104-
return Objects.hashCode(getRowRange(), getToken());
108+
return Objects.hashCode(getPartition(), getToken());
105109
}
106110

107111
@Override
108112
public String toString() {
109113
return MoreObjects.toStringHelper(this)
110-
.add("rowRange", getRowRange())
114+
.add("partition", getPartition())
111115
.add("token", getToken())
112116
.toString();
113117
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.models;
1717

18+
import com.google.api.core.InternalApi;
1819
import com.google.api.core.InternalExtensionOnly;
20+
import com.google.bigtable.v2.RowRange;
1921
import com.google.common.base.Objects;
2022
import com.google.common.base.Preconditions;
2123
import com.google.protobuf.ByteString;
24+
import com.google.protobuf.InvalidProtocolBufferException;
2225
import java.io.IOException;
2326
import java.io.ObjectInputStream;
2427
import java.io.ObjectOutputStream;
@@ -395,6 +398,22 @@ private void writeObject(ObjectOutputStream output) throws IOException {
395398
output.defaultWriteObject();
396399
}
397400

401+
@InternalApi("Used in Changestream beam pipeline.")
402+
public static ByteString toByteString(ByteStringRange byteStringRange) {
403+
return RowRange.newBuilder()
404+
.setStartKeyClosed(byteStringRange.getStart())
405+
.setEndKeyOpen(byteStringRange.getEnd())
406+
.build()
407+
.toByteString();
408+
}
409+
410+
@InternalApi("Used in Changestream beam pipeline.")
411+
public static ByteStringRange toByteStringRange(ByteString byteString)
412+
throws InvalidProtocolBufferException {
413+
RowRange rowRange = RowRange.newBuilder().mergeFrom(byteString).build();
414+
return ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen());
415+
}
416+
398417
@Override
399418
public boolean equals(Object o) {
400419
if (this == o) {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
7575
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
7676
import com.google.cloud.bigtable.data.v2.models.Query;
77+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
7778
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
7879
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
7980
import com.google.cloud.bigtable.data.v2.models.Row;
@@ -155,7 +156,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
155156
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
156157
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
157158

158-
private final ServerStreamingCallable<String, RowRange>
159+
private final ServerStreamingCallable<String, ByteStringRange>
159160
generateInitialChangeStreamPartitionsCallable;
160161

161162
private final ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecord>
@@ -833,7 +834,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
833834
* RowRange}.
834835
* </ul>
835836
*/
836-
private ServerStreamingCallable<String, RowRange>
837+
private ServerStreamingCallable<String, ByteStringRange>
837838
createGenerateInitialChangeStreamPartitionsCallable() {
838839
ServerStreamingCallable<
839840
GenerateInitialChangeStreamPartitionsRequest,
@@ -862,22 +863,22 @@ public Map<String, String> extract(
862863
.build(),
863864
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes());
864865

865-
ServerStreamingCallable<String, RowRange> userCallable =
866+
ServerStreamingCallable<String, ByteStringRange> userCallable =
866867
new GenerateInitialChangeStreamPartitionsUserCallable(base, requestContext);
867868

868-
ServerStreamingCallable<String, RowRange> withStatsHeaders =
869+
ServerStreamingCallable<String, ByteStringRange> withStatsHeaders =
869870
new StatsHeadersServerStreamingCallable<>(userCallable);
870871

871872
// Sometimes GenerateInitialChangeStreamPartitions connections are disconnected via an RST
872873
// frame. This error is transient and should be treated similar to UNAVAILABLE. However, this
873874
// exception has an INTERNAL error code which by default is not retryable. Convert the exception
874875
// so it can be retried in the client.
875-
ServerStreamingCallable<String, RowRange> convertException =
876+
ServerStreamingCallable<String, ByteStringRange> convertException =
876877
new ConvertStreamExceptionCallable<>(withStatsHeaders);
877878

878879
// Copy idle timeout settings for watchdog.
879-
ServerStreamingCallSettings<String, RowRange> innerSettings =
880-
ServerStreamingCallSettings.<String, RowRange>newBuilder()
880+
ServerStreamingCallSettings<String, ByteStringRange> innerSettings =
881+
ServerStreamingCallSettings.<String, ByteStringRange>newBuilder()
881882
.setRetryableCodes(
882883
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes())
883884
.setRetrySettings(
@@ -886,17 +887,17 @@ public Map<String, String> extract(
886887
settings.generateInitialChangeStreamPartitionsSettings().getIdleTimeout())
887888
.build();
888889

889-
ServerStreamingCallable<String, RowRange> watched =
890+
ServerStreamingCallable<String, ByteStringRange> watched =
890891
Callables.watched(convertException, innerSettings, clientContext);
891892

892-
ServerStreamingCallable<String, RowRange> withBigtableTracer =
893+
ServerStreamingCallable<String, ByteStringRange> withBigtableTracer =
893894
new BigtableTracerStreamingCallable<>(watched);
894895

895-
ServerStreamingCallable<String, RowRange> retrying =
896+
ServerStreamingCallable<String, ByteStringRange> retrying =
896897
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
897898

898899
SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
899-
ServerStreamingCallable<String, RowRange> traced =
900+
ServerStreamingCallable<String, ByteStringRange> traced =
900901
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);
901902

902903
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
@@ -1039,7 +1040,8 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
10391040
}
10401041

10411042
/** Returns a streaming generate initial change stream partitions callable */
1042-
public ServerStreamingCallable<String, RowRange> generateInitialChangeStreamPartitionsCallable() {
1043+
public ServerStreamingCallable<String, ByteStringRange>
1044+
generateInitialChangeStreamPartitionsCallable() {
10431045
return generateInitialChangeStreamPartitionsCallable;
10441046
}
10451047

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@
3333
import com.google.api.gax.rpc.TransportChannelProvider;
3434
import com.google.api.gax.rpc.UnaryCallSettings;
3535
import com.google.auth.Credentials;
36-
import com.google.bigtable.v2.RowRange;
3736
import com.google.cloud.bigtable.Version;
3837
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
3938
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
4039
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
4140
import com.google.cloud.bigtable.data.v2.models.Query;
41+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
4242
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
4343
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
4444
import com.google.cloud.bigtable.data.v2.models.Row;
@@ -212,7 +212,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
212212
private final BigtableBulkReadRowsCallSettings bulkReadRowsSettings;
213213
private final UnaryCallSettings<ConditionalRowMutation, Boolean> checkAndMutateRowSettings;
214214
private final UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings;
215-
private final ServerStreamingCallSettings<String, RowRange>
215+
private final ServerStreamingCallSettings<String, ByteStringRange>
216216
generateInitialChangeStreamPartitionsSettings;
217217
private final ServerStreamingCallSettings<ReadChangeStreamQuery, ChangeStreamRecord>
218218
readChangeStreamSettings;
@@ -537,7 +537,7 @@ public UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings() {
537537
return readModifyWriteRowSettings;
538538
}
539539

540-
public ServerStreamingCallSettings<String, RowRange>
540+
public ServerStreamingCallSettings<String, ByteStringRange>
541541
generateInitialChangeStreamPartitionsSettings() {
542542
return generateInitialChangeStreamPartitionsSettings;
543543
}
@@ -571,7 +571,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
571571
private final UnaryCallSettings.Builder<ConditionalRowMutation, Boolean>
572572
checkAndMutateRowSettings;
573573
private final UnaryCallSettings.Builder<ReadModifyWriteRow, Row> readModifyWriteRowSettings;
574-
private final ServerStreamingCallSettings.Builder<String, RowRange>
574+
private final ServerStreamingCallSettings.Builder<String, ByteStringRange>
575575
generateInitialChangeStreamPartitionsSettings;
576576
private final ServerStreamingCallSettings.Builder<ReadChangeStreamQuery, ChangeStreamRecord>
577577
readChangeStreamSettings;

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallable.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
import com.google.api.gax.rpc.StreamController;
2222
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest;
2323
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
24-
import com.google.bigtable.v2.RowRange;
2524
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
2625
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
26+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
2727

2828
/**
2929
* Simple wrapper for GenerateInitialChangeStreamPartitions to wrap the request and response
3030
* protobufs.
3131
*/
3232
public class GenerateInitialChangeStreamPartitionsUserCallable
33-
extends ServerStreamingCallable<String, RowRange> {
33+
extends ServerStreamingCallable<String, ByteStringRange> {
3434
private final RequestContext requestContext;
3535
private final ServerStreamingCallable<
3636
GenerateInitialChangeStreamPartitionsRequest,
@@ -49,7 +49,7 @@ public GenerateInitialChangeStreamPartitionsUserCallable(
4949

5050
@Override
5151
public void call(
52-
String tableId, ResponseObserver<RowRange> responseObserver, ApiCallContext context) {
52+
String tableId, ResponseObserver<ByteStringRange> responseObserver, ApiCallContext context) {
5353
String tableName =
5454
NameUtil.formatTableName(
5555
requestContext.getProjectId(), requestContext.getInstanceId(), tableId);
@@ -62,12 +62,12 @@ public void call(
6262
inner.call(request, new ConvertPartitionToRangeObserver(responseObserver), context);
6363
}
6464

65-
private class ConvertPartitionToRangeObserver
65+
private static class ConvertPartitionToRangeObserver
6666
implements ResponseObserver<GenerateInitialChangeStreamPartitionsResponse> {
6767

68-
private final ResponseObserver<RowRange> outerObserver;
68+
private final ResponseObserver<ByteStringRange> outerObserver;
6969

70-
ConvertPartitionToRangeObserver(ResponseObserver<RowRange> observer) {
70+
ConvertPartitionToRangeObserver(ResponseObserver<ByteStringRange> observer) {
7171
this.outerObserver = observer;
7272
}
7373

@@ -78,12 +78,11 @@ public void onStart(final StreamController controller) {
7878

7979
@Override
8080
public void onResponse(GenerateInitialChangeStreamPartitionsResponse response) {
81-
RowRange rowRange =
82-
RowRange.newBuilder()
83-
.setStartKeyClosed(response.getPartition().getRowRange().getStartKeyClosed())
84-
.setEndKeyOpen(response.getPartition().getRowRange().getEndKeyOpen())
85-
.build();
86-
outerObserver.onResponse(rowRange);
81+
ByteStringRange byteStringRange =
82+
ByteStringRange.create(
83+
response.getPartition().getRowRange().getStartKeyClosed(),
84+
response.getPartition().getRowRange().getEndKeyOpen());
85+
outerObserver.onResponse(byteStringRange);
8786
}
8887

8988
@Override

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
import com.google.api.gax.rpc.ResponseObserver;
2525
import com.google.api.gax.rpc.ServerStreamingCallable;
2626
import com.google.api.gax.rpc.UnaryCallable;
27-
import com.google.bigtable.v2.RowRange;
2827
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
2928
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
3029
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
3130
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
3231
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
3332
import com.google.cloud.bigtable.data.v2.models.Mutation;
3433
import com.google.cloud.bigtable.data.v2.models.Query;
34+
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
3535
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
3636
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
3737
import com.google.cloud.bigtable.data.v2.models.Row;
@@ -83,7 +83,7 @@ public class BigtableDataClientTests {
8383
@Mock private Batcher<ByteString, Row> mockBulkReadRowsBatcher;
8484

8585
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
86-
private ServerStreamingCallable<String, RowRange>
86+
private ServerStreamingCallable<String, ByteStringRange>
8787
mockGenerateInitialChangeStreamPartitionsCallable;
8888

8989
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
@@ -342,7 +342,7 @@ public void proxyGenerateInitialChangeStreamPartitionsAsyncTest() {
342342
.thenReturn(mockGenerateInitialChangeStreamPartitionsCallable);
343343

344344
@SuppressWarnings("unchecked")
345-
ResponseObserver<RowRange> mockObserver = Mockito.mock(ResponseObserver.class);
345+
ResponseObserver<ByteStringRange> mockObserver = Mockito.mock(ResponseObserver.class);
346346
bigtableDataClient.generateInitialChangeStreamPartitionsAsync("fake-table", mockObserver);
347347

348348
Mockito.verify(mockGenerateInitialChangeStreamPartitionsCallable)

0 commit comments

Comments
 (0)