Skip to content

Commit cb7b455

Browse files
tengzhongerTeng Zhong
andauthored
feat: Add ChangeStreamRecordAdapter and ChangeStreamStateMachine (#1334)
* Add ChangeStreamRecordAdapter and ChangeStreamStateMachine These will be used later for ChangeStreamMergingCallable. * fix: Fix styles and add some tests. * fix: Address comments * fix: Update comments Co-authored-by: Teng Zhong <tengzhong@google.com>
1 parent 53dd0f0 commit cb7b455

File tree

8 files changed

+1457
-22
lines changed

8 files changed

+1457
-22
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,28 @@ public boolean equals(Object o) {
321321
if (o == null || getClass() != o.getClass()) {
322322
return false;
323323
}
324-
ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o;
325-
return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode());
324+
ChangeStreamMutation other = (ChangeStreamMutation) o;
325+
return Objects.equal(this.rowKey, other.rowKey)
326+
&& Objects.equal(this.type, other.type)
327+
&& Objects.equal(this.sourceClusterId, other.sourceClusterId)
328+
&& Objects.equal(this.commitTimestamp, other.commitTimestamp)
329+
&& Objects.equal(this.tieBreaker, other.tieBreaker)
330+
&& Objects.equal(this.token, other.token)
331+
&& Objects.equal(this.lowWatermark, other.lowWatermark)
332+
&& Objects.equal(this.entries.build(), other.entries.build());
326333
}
327334

328335
@Override
329336
public int hashCode() {
330337
return Objects.hashCode(
331-
rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries);
338+
rowKey,
339+
type,
340+
sourceClusterId,
341+
commitTimestamp,
342+
tieBreaker,
343+
token,
344+
lowWatermark,
345+
entries.build());
332346
}
333347

334348
@Override
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.models;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.bigtable.v2.ReadChangeStreamResponse;
20+
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
21+
import com.google.protobuf.ByteString;
22+
import com.google.protobuf.Timestamp;
23+
import javax.annotation.Nonnull;
24+
25+
/**
26+
* An extension point that allows end users to plug in a custom implementation of logical change
27+
* stream records. This is useful in cases where the user would like to apply advanced client side
28+
* filtering(for example, only keep DeleteFamily in the mutations). This adapter acts like a factory
29+
* for a SAX style change stream record builder.
30+
*/
31+
public interface ChangeStreamRecordAdapter<ChangeStreamRecordT> {
32+
/** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */
33+
ChangeStreamRecordBuilder<ChangeStreamRecordT> createChangeStreamRecordBuilder();
34+
35+
/** Checks if the given change stream record is a Heartbeat. */
36+
@InternalApi("Used in Changestream beam pipeline.")
37+
boolean isHeartbeat(ChangeStreamRecordT record);
38+
39+
/**
40+
* Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will
41+
* throw an Exception.
42+
*/
43+
@InternalApi("Used in Changestream beam pipeline.")
44+
String getTokenFromHeartbeat(ChangeStreamRecordT heartbeatRecord);
45+
46+
/** Checks if the given change stream record is a ChangeStreamMutation. */
47+
@InternalApi("Used in Changestream beam pipeline.")
48+
boolean isChangeStreamMutation(ChangeStreamRecordT record);
49+
50+
/**
51+
* Get the token from the given ChangeStreamMutation record. If the given record is not a
52+
* ChangeStreamMutation, it will throw an Exception.
53+
*/
54+
@InternalApi("Used in Changestream beam pipeline.")
55+
String getTokenFromChangeStreamMutation(ChangeStreamRecordT record);
56+
57+
/**
58+
* A SAX style change stream record factory. It is responsible for creating one of the three types
59+
* of change stream record: heartbeat, close stream, and a change stream mutation.
60+
*
61+
* <p>State management is handled external to the implementation of this class:
62+
*
63+
* <ol>
64+
* Case 1: Heartbeat
65+
* <li>Exactly 1 {@code onHeartbeat}.
66+
* </ol>
67+
*
68+
* <ol>
69+
* Case 2: CloseStream
70+
* <li>Exactly 1 {@code onCloseStream}.
71+
* </ol>
72+
*
73+
* <ol>
74+
* Case 3: ChangeStreamMutation. A change stream mutation consists of one or more mods, where
75+
* the SetCells might be chunked. There are 3 different types of mods that a ReadChangeStream
76+
* response can have:
77+
* <li>DeleteFamily -> Exactly 1 {@code deleteFamily}
78+
* <li>DeleteCell -> Exactly 1 {@code deleteCell}
79+
* <li>SetCell -> Exactly 1 {@code startCell}, At least 1 {@code CellValue}, Exactly 1 {@code
80+
* finishCell}.
81+
* </ol>
82+
*
83+
* <p>The whole flow of constructing a ChangeStreamMutation is:
84+
*
85+
* <ol>
86+
* <li>Exactly 1 {@code startUserMutation} or {@code startGcMutation}.
87+
* <li>At least 1 DeleteFamily/DeleteCell/SetCell mods.
88+
* <li>Exactly 1 {@code finishChangeStreamMutation}.
89+
* </ol>
90+
*
91+
* <p>Note: For a non-chunked SetCell, only 1 {@code CellValue} will be called. For a chunked
92+
* SetCell, more than 1 {@code CellValue}s will be called.
93+
*
94+
* <p>Note: DeleteRow's won't appear in data changes since they'll be converted to multiple
95+
* DeleteFamily's.
96+
*/
97+
interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
98+
/**
99+
* Called to create a heartbeat. This will be called at most once. If called, the current change
100+
* stream record must not include any data changes or close stream messages.
101+
*/
102+
ChangeStreamRecordT onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat);
103+
104+
/**
105+
* Called to create a close stream message. This will be called at most once. If called, the
106+
* current change stream record must not include any data changes or heartbeats.
107+
*/
108+
ChangeStreamRecordT onCloseStream(ReadChangeStreamResponse.CloseStream closeStream);
109+
110+
/**
111+
* Called to start a new user initiated ChangeStreamMutation. This will be called at most once.
112+
* If called, the current change stream record must not include any close stream message or
113+
* heartbeat.
114+
*/
115+
void startUserMutation(
116+
@Nonnull ByteString rowKey,
117+
@Nonnull String sourceClusterId,
118+
@Nonnull Timestamp commitTimestamp,
119+
int tieBreaker);
120+
121+
/**
122+
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
123+
* once. If called, the current change stream record must not include any close stream message
124+
* or heartbeat.
125+
*/
126+
void startGcMutation(
127+
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker);
128+
129+
/** Called to add a DeleteFamily mod. */
130+
void deleteFamily(@Nonnull String familyName);
131+
132+
/** Called to add a DeleteCell mod. */
133+
void deleteCells(
134+
@Nonnull String familyName,
135+
@Nonnull ByteString qualifier,
136+
@Nonnull TimestampRange timestampRange);
137+
138+
/**
139+
* Called to start a SetCell.
140+
*
141+
* <ol>
142+
* In case of a non-chunked cell, the following order is guaranteed:
143+
* <li>Exactly 1 {@code startCell}.
144+
* <li>Exactly 1 {@code cellValue}.
145+
* <li>Exactly 1 {@code finishCell}.
146+
* </ol>
147+
*
148+
* <ol>
149+
* In case of a chunked cell, the following order is guaranteed:
150+
* <li>Exactly 1 {@code startCell}.
151+
* <li>At least 2 {@code cellValue}.
152+
* <li>Exactly 1 {@code finishCell}.
153+
* </ol>
154+
*/
155+
void startCell(String family, ByteString qualifier, long timestampMicros);
156+
157+
/**
158+
* Called once per non-chunked cell, or at least twice per chunked cell to concatenate the cell
159+
* value.
160+
*/
161+
void cellValue(ByteString value);
162+
163+
/** Called once per cell to signal the end of the value (unless reset). */
164+
void finishCell();
165+
166+
/** Called once per stream record to signal that all mods have been processed (unless reset). */
167+
ChangeStreamRecordT finishChangeStreamMutation(
168+
@Nonnull String token, @Nonnull Timestamp lowWatermark);
169+
170+
/** Called when the current in progress change stream record should be dropped */
171+
void reset();
172+
}
173+
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.models;
17+
18+
import com.google.bigtable.v2.ReadChangeStreamResponse;
19+
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
20+
import com.google.common.base.Preconditions;
21+
import com.google.protobuf.ByteString;
22+
import com.google.protobuf.Timestamp;
23+
import javax.annotation.Nonnull;
24+
25+
/**
26+
* Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link
27+
* ChangeStreamRecord}s to represent change stream records.
28+
*/
29+
public class DefaultChangeStreamRecordAdapter
30+
implements ChangeStreamRecordAdapter<ChangeStreamRecord> {
31+
32+
/** {@inheritDoc} */
33+
@Override
34+
public ChangeStreamRecordBuilder<ChangeStreamRecord> createChangeStreamRecordBuilder() {
35+
return new DefaultChangeStreamRecordBuilder();
36+
}
37+
38+
/** {@inheritDoc} */
39+
@Override
40+
public boolean isHeartbeat(ChangeStreamRecord record) {
41+
return record instanceof Heartbeat;
42+
}
43+
44+
/** {@inheritDoc} */
45+
@Override
46+
public String getTokenFromHeartbeat(ChangeStreamRecord record) {
47+
Preconditions.checkArgument(isHeartbeat(record), "record is not a Heartbeat.");
48+
return ((Heartbeat) record).getChangeStreamContinuationToken().getToken();
49+
}
50+
51+
/** {@inheritDoc} */
52+
@Override
53+
public boolean isChangeStreamMutation(ChangeStreamRecord record) {
54+
return record instanceof ChangeStreamMutation;
55+
}
56+
57+
/** {@inheritDoc} */
58+
@Override
59+
public String getTokenFromChangeStreamMutation(ChangeStreamRecord record) {
60+
Preconditions.checkArgument(
61+
isChangeStreamMutation(record), "record is not a ChangeStreamMutation.");
62+
return ((ChangeStreamMutation) record).getToken();
63+
}
64+
65+
/** {@inheritDoc} */
66+
static class DefaultChangeStreamRecordBuilder
67+
implements ChangeStreamRecordBuilder<ChangeStreamRecord> {
68+
private ChangeStreamMutation.Builder changeStreamMutationBuilder = null;
69+
70+
// For the current SetCell.
71+
private String family;
72+
private ByteString qualifier;
73+
private long timestampMicros;
74+
private ByteString value;
75+
76+
public DefaultChangeStreamRecordBuilder() {
77+
reset();
78+
}
79+
80+
/** {@inheritDoc} */
81+
@Override
82+
public ChangeStreamRecord onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
83+
Preconditions.checkArgument(
84+
this.changeStreamMutationBuilder == null,
85+
"Can not create a Heartbeat when there is an existing ChangeStreamMutation being built.");
86+
return Heartbeat.fromProto(heartbeat);
87+
}
88+
89+
/** {@inheritDoc} */
90+
@Override
91+
public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
92+
Preconditions.checkArgument(
93+
this.changeStreamMutationBuilder == null,
94+
"Can not create a CloseStream when there is an existing ChangeStreamMutation being built.");
95+
return CloseStream.fromProto(closeStream);
96+
}
97+
98+
/** {@inheritDoc} */
99+
@Override
100+
public void startUserMutation(
101+
@Nonnull ByteString rowKey,
102+
@Nonnull String sourceClusterId,
103+
@Nonnull Timestamp commitTimestamp,
104+
int tieBreaker) {
105+
this.changeStreamMutationBuilder =
106+
ChangeStreamMutation.createUserMutation(
107+
rowKey, sourceClusterId, commitTimestamp, tieBreaker);
108+
}
109+
110+
/** {@inheritDoc} */
111+
@Override
112+
public void startGcMutation(
113+
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
114+
this.changeStreamMutationBuilder =
115+
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
116+
}
117+
118+
/** {@inheritDoc} */
119+
@Override
120+
public void deleteFamily(@Nonnull String familyName) {
121+
this.changeStreamMutationBuilder.deleteFamily(familyName);
122+
}
123+
124+
/** {@inheritDoc} */
125+
@Override
126+
public void deleteCells(
127+
@Nonnull String familyName,
128+
@Nonnull ByteString qualifier,
129+
@Nonnull TimestampRange timestampRange) {
130+
this.changeStreamMutationBuilder.deleteCells(familyName, qualifier, timestampRange);
131+
}
132+
133+
/** {@inheritDoc} */
134+
@Override
135+
public void startCell(String family, ByteString qualifier, long timestampMicros) {
136+
this.family = family;
137+
this.qualifier = qualifier;
138+
this.timestampMicros = timestampMicros;
139+
this.value = ByteString.EMPTY;
140+
}
141+
142+
/** {@inheritDoc} */
143+
@Override
144+
public void cellValue(ByteString value) {
145+
this.value = this.value.concat(value);
146+
}
147+
148+
/** {@inheritDoc} */
149+
@Override
150+
public void finishCell() {
151+
this.changeStreamMutationBuilder.setCell(
152+
this.family, this.qualifier, this.timestampMicros, this.value);
153+
}
154+
155+
/** {@inheritDoc} */
156+
@Override
157+
public ChangeStreamRecord finishChangeStreamMutation(
158+
@Nonnull String token, @Nonnull Timestamp lowWatermark) {
159+
this.changeStreamMutationBuilder.setToken(token);
160+
this.changeStreamMutationBuilder.setLowWatermark(lowWatermark);
161+
return this.changeStreamMutationBuilder.build();
162+
}
163+
164+
/** {@inheritDoc} */
165+
@Override
166+
public void reset() {
167+
changeStreamMutationBuilder = null;
168+
169+
family = null;
170+
qualifier = null;
171+
timestampMicros = 0;
172+
value = null;
173+
}
174+
}
175+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
2727
private static final long serialVersionUID = 7316215828353608504L;
2828

29-
public static Heartbeat create(
29+
private static Heartbeat create(
3030
ChangeStreamContinuationToken changeStreamContinuationToken, Timestamp lowWatermark) {
3131
return new AutoValue_Heartbeat(changeStreamContinuationToken, lowWatermark);
3232
}

0 commit comments

Comments
 (0)