Skip to content

Commit 657c652

Browse files
committed
clean up tests and formatting
1 parent 9ec5723 commit 657c652

File tree

7 files changed

+41
-67
lines changed

7 files changed

+41
-67
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBatchingCallSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) {
199199
* reach the target rpc latency.
200200
*/
201201
public Builder enableLatencyBasedThrottling(long targetRpcLatency) {
202-
Preconditions.checkArgument(targetRpcLatency > 0,
203-
"target RPC latency must be greater than 0");
202+
Preconditions.checkArgument(
203+
targetRpcLatency > 0, "target RPC latency must be greater than 0");
204204
this.isLatencyBasedThrottlingEnabled = true;
205205
this.targetRpcLatencyMs = targetRpcLatency;
206206
return this;

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlCallable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void run() {
9595
dynamicFlowControlStats.updateLatency(timer.elapsed(TimeUnit.MILLISECONDS));
9696
long lastAdjustedTimestamp = dynamicFlowControlStats.getLastAdjustedTimestampMs();
9797
long now = System.currentTimeMillis();
98+
// Avoid adjusting the thresholds too frequently
9899
if (now - lastAdjustedTimestamp < adjustingIntervalMs) {
99100
return;
100101
}
@@ -105,17 +106,25 @@ public void run() {
105106
: (now - flowControlEvents.getLastFlowControlEvent().getTimestampMs()
106107
<= TimeUnit.MINUTES.toMillis(5));
107108
if (meanLatency > targetLatency * 3) {
109+
// Decrease at 30% of the maximum
108110
decrease(
109111
lastAdjustedTimestamp, now, flowController.getMaxOutstandingElementCount() * 3 / 10);
110112
} else if (meanLatency > targetLatency * 1.2) {
113+
// Decrease at 10% of the maximum
111114
decrease(lastAdjustedTimestamp, now, flowController.getMaxOutstandingElementCount() / 10);
112115
} else if (throttled && meanLatency < targetLatency * 0.8) {
116+
// if latency is low, and there was throttling, then increase the parallelism so that new
117+
// calls will not be throttled.
118+
119+
// Increase parallelism at a slower than we decrease. The lower rate should help the
120+
// system maintain stability.
113121
increase(
114122
lastAdjustedTimestamp, now, flowController.getMaxOutstandingElementCount() * 5 / 100);
115123
} else if (throttled
116124
&& flowController.getCurrentOutstandingElementCount()
117125
< flowController.getMaxOutstandingElementCount() * 0.05
118126
&& meanLatency < 2 * targetLatency) {
127+
// If latency is low and threshold is low, increase slowly.
119128
increase(
120129
lastAdjustedTimestamp, now, flowController.getMaxOutstandingElementCount() * 2 / 100);
121130
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DynamicFlowControlStats.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@
2727
* last timestamp when the thresholds in {@link FlowController} are updated.
2828
*/
2929
@InternalApi
30-
public class DynamicFlowControlStats {
30+
public final class DynamicFlowControlStats {
3131

3232
private static final double DEFAULT_DECAY_CONSTANT = 0.015; // Biased to the past 5 minutes
3333

3434
private AtomicLong lastAdjustedTimestampMs;
35-
private AtomicLong adjustedCounter;
3635
private DecayingAverage meanLatency;
3736

3837
DynamicFlowControlStats() {
@@ -41,7 +40,6 @@ public class DynamicFlowControlStats {
4140

4241
DynamicFlowControlStats(double decayConstant) {
4342
this.lastAdjustedTimestampMs = new AtomicLong(0);
44-
this.adjustedCounter = new AtomicLong(0);
4543
this.meanLatency = new DecayingAverage(decayConstant);
4644
}
4745

@@ -67,13 +65,8 @@ public long getLastAdjustedTimestampMs() {
6765
return lastAdjustedTimestampMs.get();
6866
}
6967

70-
public long getAdjustedCounter() {
71-
return adjustedCounter.get();
72-
}
73-
7468
boolean setLastAdjustedTimestampMs(long last, long now) {
7569
if (lastAdjustedTimestampMs.compareAndSet(last, now)) {
76-
adjustedCounter.addAndGet(1);
7770
return true;
7871
}
7972
return false;

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.google.api.core.InternalApi;
2020
import com.google.api.gax.batching.Batcher;
2121
import com.google.api.gax.batching.BatcherImpl;
22-
import com.google.api.gax.batching.FlowController;
2322
import com.google.api.gax.batching.FlowControlEventStats;
23+
import com.google.api.gax.batching.FlowController;
2424
import com.google.api.gax.core.BackgroundResource;
2525
import com.google.api.gax.core.FixedCredentialsProvider;
2626
import com.google.api.gax.core.GaxProperties;

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkMutateIT.java

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,15 @@
1717

1818
import static com.google.common.truth.Truth.assertThat;
1919

20-
import com.google.api.core.ApiFuture;
21-
import com.google.api.gax.batching.Batcher;
2220
import com.google.api.gax.batching.BatcherImpl;
23-
import com.google.api.gax.batching.BatchingSettings;
24-
import com.google.api.gax.batching.DynamicFlowControlSettings;
2521
import com.google.api.gax.batching.FlowControlEventStats;
26-
import com.google.api.gax.batching.FlowControlSettings;
27-
import com.google.api.gax.batching.FlowController;
2822
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
2923
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
3024
import com.google.cloud.bigtable.data.v2.models.Query;
3125
import com.google.cloud.bigtable.data.v2.models.Row;
3226
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
33-
import com.google.cloud.bigtable.data.v2.stub.DynamicFlowControlStats;
34-
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
3527
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
36-
import com.google.protobuf.ByteString;
3728
import java.io.IOException;
38-
import java.util.ArrayList;
39-
import java.util.Arrays;
40-
import java.util.List;
41-
import java.util.concurrent.TimeUnit;
4229
import org.junit.ClassRule;
4330
import org.junit.Test;
4431
import org.junit.runner.RunWith;
@@ -47,10 +34,9 @@
4734
@RunWith(JUnit4.class)
4835
public class BulkMutateIT {
4936

50-
@ClassRule
51-
public static TestEnvRule testEnvRule = new TestEnvRule();
37+
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();
5238

53-
@Test (timeout = 20 * 1000)
39+
@Test(timeout = 10 * 1000)
5440
public void test() throws IOException, InterruptedException {
5541
BigtableDataSettings settings = testEnvRule.env().getDataClientSettings();
5642
// Set target latency really low so it'll trigger adjusting thresholds
@@ -61,26 +47,24 @@ public void test() throws IOException, InterruptedException {
6147
(BatcherImpl) client.newBulkMutationBatcher(testEnvRule.env().getTableId());
6248
try {
6349
FlowControlEventStats events = batcher.getFlowControlEventStats();
64-
FlowController flowController = batcher.getFlowController();
65-
// DynamicFlowControlStats dynamicFlowControlStats =
66-
// settings.getStubSettings().bulkMutateRowsSettings().toBuilder()
67-
// .getDynamicFlowControlStats();
68-
69-
assertThat(batcher.getFlowController().getCurrentOutstandingElementCount()).isNotEqualTo(
70-
batcher.getFlowController().getMinOutstandingElementCount());
71-
assertThat(batcher.getFlowController().getCurrentOutstandingElementCount()).isNotEqualTo(
72-
batcher.getFlowController().getMaxOutstandingElementCount());
50+
long initialThreashold = batcher.getFlowController().getCurrentOutstandingElementCount();
51+
assertThat(batcher.getFlowController().getCurrentOutstandingElementCount())
52+
.isNotEqualTo(batcher.getFlowController().getMinOutstandingElementCount());
53+
assertThat(batcher.getFlowController().getCurrentOutstandingElementCount())
54+
.isNotEqualTo(batcher.getFlowController().getMaxOutstandingElementCount());
7355

7456
String familyId = testEnvRule.env().getFamilyId();
7557
long initial = batcher.getFlowController().getCurrentOutstandingElementCount();
7658
for (long i = 0; i < initial * 3; i++) {
7759
String key = "test-key" + i;
78-
batcher.add(
79-
RowMutationEntry.create(key).setCell(familyId, "qualifier", i));
60+
batcher.add(RowMutationEntry.create(key).setCell(familyId, "qualifier", i));
8061
}
8162
batcher.flush();
8263
assertThat(events.getLastFlowControlEvent()).isNotNull();
83-
//assertThat(dynamicFlowControlStats.getAdjustedCounter()).isEqualTo(1);
64+
// Verify that the threshold is adjusted
65+
assertThat(batcher.getFlowController().getCurrentOutstandingElementCount())
66+
.isNotEqualTo(initialThreashold);
67+
// Query a key to make sure the write succeeded
8468
Row row =
8569
testEnvRule
8670
.env()

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ public void settingsAreNotLostTest() {
7878
WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class);
7979
Duration watchdogInterval = Duration.ofSeconds(12);
8080
HeaderTracer headerTracer = Mockito.mock(HeaderTracer.class);
81-
boolean isBatchMutationLatencyBasedThrottlingEnabled = true;
82-
long batchMutationTargetLatency = 10;
8381

8482
EnhancedBigtableStubSettings.Builder builder =
8583
EnhancedBigtableStubSettings.newBuilder()
@@ -103,9 +101,7 @@ public void settingsAreNotLostTest() {
103101
credentialsProvider,
104102
watchdogProvider,
105103
watchdogInterval,
106-
headerTracer,
107-
isBatchMutationLatencyBasedThrottlingEnabled,
108-
batchMutationTargetLatency);
104+
headerTracer);
109105
verifySettings(
110106
builder.build(),
111107
projectId,
@@ -116,9 +112,7 @@ public void settingsAreNotLostTest() {
116112
credentialsProvider,
117113
watchdogProvider,
118114
watchdogInterval,
119-
headerTracer,
120-
isBatchMutationLatencyBasedThrottlingEnabled,
121-
batchMutationTargetLatency);
115+
headerTracer);
122116
verifyBuilder(
123117
builder.build().toBuilder(),
124118
projectId,
@@ -129,9 +123,7 @@ public void settingsAreNotLostTest() {
129123
credentialsProvider,
130124
watchdogProvider,
131125
watchdogInterval,
132-
headerTracer,
133-
isBatchMutationLatencyBasedThrottlingEnabled,
134-
batchMutationTargetLatency);
126+
headerTracer);
135127
}
136128

137129
private void verifyBuilder(
@@ -144,9 +136,7 @@ private void verifyBuilder(
144136
CredentialsProvider credentialsProvider,
145137
WatchdogProvider watchdogProvider,
146138
Duration watchdogInterval,
147-
HeaderTracer headerTracer,
148-
boolean isBatchMutationLatencyBasedThrottlingEnabled,
149-
long batchMutationTargetLatency) {
139+
HeaderTracer headerTracer) {
150140
assertThat(builder.getProjectId()).isEqualTo(projectId);
151141
assertThat(builder.getInstanceId()).isEqualTo(instanceId);
152142
assertThat(builder.getAppProfileId()).isEqualTo(appProfileId);
@@ -156,12 +146,6 @@ private void verifyBuilder(
156146
assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
157147
assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval);
158148
assertThat(builder.getHeaderTracer()).isEqualTo(headerTracer);
159-
assertThat(builder.isLatencyBasedThrottlingForBatchMutationEnabled())
160-
.isEqualTo(isBatchMutationLatencyBasedThrottlingEnabled);
161-
assertThat(builder.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled())
162-
.isEqualTo(isBatchMutationLatencyBasedThrottlingEnabled);
163-
assertThat(builder.bulkMutateRowsSettings().getTargetRpcLatencyMs())
164-
.isEqualTo(batchMutationTargetLatency);
165149
}
166150

167151
private void verifySettings(
@@ -174,9 +158,7 @@ private void verifySettings(
174158
CredentialsProvider credentialsProvider,
175159
WatchdogProvider watchdogProvider,
176160
Duration watchdogInterval,
177-
HeaderTracer headerTracer,
178-
boolean isBatchMutationLatencyBasedThrottlingEnabled,
179-
long batchMutationTargetLatency) {
161+
HeaderTracer headerTracer) {
180162
assertThat(settings.getProjectId()).isEqualTo(projectId);
181163
assertThat(settings.getInstanceId()).isEqualTo(instanceId);
182164
assertThat(settings.getAppProfileId()).isEqualTo(appProfileId);
@@ -186,10 +168,6 @@ private void verifySettings(
186168
assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
187169
assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval);
188170
assertThat(settings.getHeaderTracer()).isEqualTo(headerTracer);
189-
assertThat(settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled())
190-
.isEqualTo(isBatchMutationLatencyBasedThrottlingEnabled);
191-
assertThat(settings.bulkMutateRowsSettings().getTargetRpcLatencyMs())
192-
.isEqualTo(batchMutationTargetLatency);
193171
}
194172

195173
@Test
@@ -493,7 +471,6 @@ public void bulkMutateRowsSettingsAreNotLostTest() {
493471
.isSameInstanceAs(batchingSettings);
494472
assertThat(builder.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()).isTrue();
495473
assertThat(builder.bulkMutateRowsSettings().getTargetRpcLatencyMs()).isEqualTo(targetLatency);
496-
497474
assertThat(
498475
builder
499476
.bulkMutateRowsSettings()

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,16 @@ public void testBulkMutationFlowControllerConfigured() throws Exception {
183183
EnhancedBigtableStub stub1 = EnhancedBigtableStub.create(settings.build().getStubSettings());
184184
EnhancedBigtableStub stub2 = EnhancedBigtableStub.create(settings.build().getStubSettings());
185185

186+
// Creating 2 batchers from the same stub, they should share the same FlowController and
187+
// FlowControlEventStats
186188
try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1");
187189
BatcherImpl batcher2 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table2")) {
188190
assertThat(batcher1.getFlowController()).isNotNull();
191+
assertThat(batcher1.getFlowControlEventStats()).isNotNull();
189192
assertThat(batcher1).isNotSameInstanceAs(batcher2);
190193
assertThat(batcher1.getFlowController()).isSameInstanceAs(batcher2.getFlowController());
194+
assertThat(batcher1.getFlowControlEventStats())
195+
.isSameInstanceAs(batcher2.getFlowControlEventStats());
191196
// Verify flow controller settings
192197
assertThat(batcher1.getFlowController().getLimitExceededBehavior())
193198
.isEqualTo(LimitExceededBehavior.Block);
@@ -205,9 +210,15 @@ public void testBulkMutationFlowControllerConfigured() throws Exception {
205210
assertThat(batcher1.getFlowController().getMinOutstandingRequestBytes()).isEqualTo(1000L);
206211
}
207212

213+
// Creating 2 batchers from different stubs, they should not share the same FlowController and
214+
// FlowControlEventStats
208215
try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1");
209216
BatcherImpl batcher2 = (BatcherImpl) stub2.newMutateRowsBatcher("my-table2")) {
217+
assertThat(batcher1.getFlowController()).isNotNull();
218+
assertThat(batcher1.getFlowControlEventStats()).isNotNull();
210219
assertThat(batcher1.getFlowController()).isNotSameInstanceAs(batcher2.getFlowController());
220+
assertThat(batcher1.getFlowControlEventStats())
221+
.isNotSameInstanceAs(batcher2.getFlowControlEventStats());
211222
}
212223

213224
stub2 =

0 commit comments

Comments
 (0)