1919import com .google .api .gax .batching .BatchingCallSettings ;
2020import com .google .api .gax .batching .BatchingDescriptor ;
2121import com .google .api .gax .batching .BatchingSettings ;
22+ import com .google .api .gax .batching .DynamicFlowControlSettings ;
23+ import com .google .api .gax .batching .FlowControlSettings ;
24+ import com .google .api .gax .batching .FlowController ;
2225import com .google .api .gax .retrying .RetrySettings ;
2326import com .google .api .gax .rpc .StatusCode ;
2427import com .google .api .gax .rpc .UnaryCallSettings ;
2528import com .google .cloud .bigtable .data .v2 .models .BulkMutation ;
2629import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
30+ import com .google .common .base .MoreObjects ;
2731import com .google .common .base .Preconditions ;
2832import java .util .Set ;
2933import javax .annotation .Nonnull ;
34+ import javax .annotation .Nullable ;
3035
3136/**
3237 * This settings holds the batching thresholds as well as retry configuration.
4348 * .setDelayThreshold(Duration.ofSeconds(10))
4449 * .build())
4550 * .setRetryableCodes(Code.DEADLINE_EXCEEDED)
51+ * .setLatencyBasedThrottling(true, 1000L)
4652 * .build();
4753 * }</pre>
4854 *
@@ -54,7 +60,11 @@ public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMu
5460
5561 // This settings is just a simple wrapper for BatchingCallSettings to allow us to add
5662 // additional functionality.
57- private BatchingCallSettings <RowMutationEntry , Void , BulkMutation , Void > batchingCallSettings ;
63+ private final BatchingCallSettings <RowMutationEntry , Void , BulkMutation , Void >
64+ batchingCallSettings ;
65+ private final boolean isLatencyBasedThrottlingEnabled ;
66+ private final Long targetRpcLatencyMs ;
67+ private final DynamicFlowControlSettings dynamicFlowControlSettings ;
5868
5969 private BigtableBatchingCallSettings (Builder builder ) {
6070 super (builder );
@@ -64,6 +74,9 @@ private BigtableBatchingCallSettings(Builder builder) {
6474 .setRetrySettings (builder .getRetrySettings ())
6575 .setRetryableCodes (builder .getRetryableCodes ())
6676 .build ();
77+ this .isLatencyBasedThrottlingEnabled = builder .isLatencyBasedThrottlingEnabled ;
78+ this .targetRpcLatencyMs = builder .targetRpcLatencyMs ;
79+ this .dynamicFlowControlSettings = builder .dynamicFlowControlSettings ;
6780 }
6881
6982 /** Returns batching settings which contains multiple batch threshold levels. */
@@ -76,6 +89,26 @@ BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescri
7689 return batchingCallSettings .getBatchingDescriptor ();
7790 }
7891
92+ /** Gets if latency based throttling is enabled. */
93+ public boolean isLatencyBasedThrottlingEnabled () {
94+ return isLatencyBasedThrottlingEnabled ;
95+ }
96+
97+ /** Gets target rpc latency if latency based throttling is enabled. Otherwise returns null. */
98+ @ Nullable
99+ public Long getTargetRpcLatencyMs () {
100+ return targetRpcLatencyMs ;
101+ }
102+
103+ /**
104+ * Gets {@link DynamicFlowControlSettings}.
105+ *
106+ * @see Builder#getDynamicFlowControlSettings()
107+ */
108+ DynamicFlowControlSettings getDynamicFlowControlSettings () {
109+ return dynamicFlowControlSettings ;
110+ }
111+
79112 static Builder newBuilder (
80113 BatchingDescriptor <RowMutationEntry , Void , BulkMutation , Void > batchingDescriptor ) {
81114 return new Builder (batchingDescriptor );
@@ -90,6 +123,16 @@ public final Builder toBuilder() {
90123 return new Builder (this );
91124 }
92125
126+ @ Override
127+ public String toString () {
128+ return MoreObjects .toStringHelper (this )
129+ .add ("batchingCallSettings" , batchingCallSettings )
130+ .add ("isLatencyBasedThrottlingEnabled" , isLatencyBasedThrottlingEnabled )
131+ .add ("targetRpcLatency" , targetRpcLatencyMs )
132+ .add ("dynamicFlowControlSettings" , dynamicFlowControlSettings )
133+ .toString ();
134+ }
135+
93136 /**
94137 * A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
95138 * {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
@@ -98,6 +141,9 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void
98141
99142 private BatchingDescriptor <RowMutationEntry , Void , BulkMutation , Void > batchingDescriptor ;
100143 private BatchingSettings batchingSettings ;
144+ private boolean isLatencyBasedThrottlingEnabled ;
145+ private Long targetRpcLatencyMs ;
146+ private DynamicFlowControlSettings dynamicFlowControlSettings ;
101147
102148 private Builder (
103149 @ Nonnull
@@ -110,6 +156,9 @@ private Builder(@Nonnull BigtableBatchingCallSettings settings) {
110156 super (settings );
111157 this .batchingDescriptor = settings .getBatchingDescriptor ();
112158 this .batchingSettings = settings .getBatchingSettings ();
159+ this .isLatencyBasedThrottlingEnabled = settings .isLatencyBasedThrottlingEnabled ();
160+ this .targetRpcLatencyMs = settings .getTargetRpcLatencyMs ();
161+ this .dynamicFlowControlSettings = settings .getDynamicFlowControlSettings ();
113162 }
114163
115164 /** Sets the batching settings with various thresholds. */
@@ -145,9 +194,137 @@ public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) {
145194 return this ;
146195 }
147196
197+ /**
198+ * Enable latency based throttling. The number of allowed in-flight requests will be adjusted to
199+ * reach the target rpc latency.
200+ */
201+ public Builder enableLatencyBasedThrottling (long targetRpcLatency ) {
202+ Preconditions .checkArgument (
203+ targetRpcLatency > 0 , "target RPC latency must be greater than 0" );
204+ this .isLatencyBasedThrottlingEnabled = true ;
205+ this .targetRpcLatencyMs = targetRpcLatency ;
206+ return this ;
207+ }
208+
209+ /** Disable latency based throttling. */
210+ public Builder disableLatencyBasedThrottling () {
211+ this .isLatencyBasedThrottlingEnabled = false ;
212+ this .targetRpcLatencyMs = null ;
213+ return this ;
214+ }
215+
216+ /** Gets target rpc latency if latency based throttling is enabled. Otherwise returns null. */
217+ @ Nullable
218+ public Long getTargetRpcLatencyMs () {
219+ return isLatencyBasedThrottlingEnabled ? targetRpcLatencyMs : null ;
220+ }
221+
222+ /** Gets if latency based throttling is enabled. */
223+ public boolean isLatencyBasedThrottlingEnabled () {
224+ return this .isLatencyBasedThrottlingEnabled ;
225+ }
226+
227+ /**
228+ * Gets the {@link DynamicFlowControlSettings} that'll be used to set up a {@link
229+ * FlowController} for throttling.
230+ *
231+ * <p>By default, this will allow a maximum of 1000 entries per channel of {@link
232+ * FlowControlSettings.Builder#setMaxOutstandingElementCount request count} and 100MB of {@link
233+ * FlowControlSettings.Builder#setMaxOutstandingRequestBytes accumulated size} in-flight
234+ * requests. Once the limits are reached, pending operations will by default be {@link
235+ * FlowControlSettings.Builder#setLimitExceededBehavior blocked} until some of the in-flight
236+ * requests are resolved.
237+ *
238+ * <p>If latency based throttling is enabled, number of entries allowed by {@link
239+ * FlowController} will be adjusted to reach {@link Builder#getTargetRpcLatencyMs()}.
240+ *
241+ * <ul>
242+ * <li>{@link FlowController} will be set to allow Math.max({@link BatchingSettings.Builder
243+ * #setElementCountThreshold batch size}, {@link
244+ * FlowControlSettings.Builder#setMaxOutstandingElementCount request count} / 4) entries
245+ * to start with.
246+ * <li>If bulk mutation rpc latency is higher than target latency, decrease allowed entries to
247+ * a minimum of Math.max({@link BatchingSettings.Builder#setElementCountThreshold batch
248+ * size}, {@link FlowControlSettings.Builder#setMaxOutstandingElementCount request count}
249+ * / 100).
250+ * <li>If bulk mutation rpc latency is lower than target latency and there was throttling,
251+ * increase allowed entries to a maximum of {@link
252+ * FlowControlSettings.Builder#setMaxOutstandingElementCount request count}.
253+ * </ul>
254+ *
255+ * If latency based throttling is disabled, {@link FlowController} will always allow {@link
256+ * FlowControlSettings.Builder#setMaxOutstandingElementCount request count}.
257+ *
258+ * <p>Latency based throttling only updates outstanding entries count. {@link FlowController}
259+ * will always allow {@link FlowControlSettings.Builder#setMaxOutstandingRequestBytes
260+ * accumulated size}.
261+ */
262+ DynamicFlowControlSettings getDynamicFlowControlSettings () {
263+ return this .dynamicFlowControlSettings ;
264+ }
265+
148266 /** Builds the {@link BigtableBatchingCallSettings} object with provided configuration. */
149267 @ Override
150268 public BigtableBatchingCallSettings build () {
269+ Preconditions .checkState (batchingSettings != null , "batchingSettings must be set" );
270+ FlowControlSettings defaultSettings = batchingSettings .getFlowControlSettings ();
271+ Preconditions .checkState (
272+ defaultSettings .getMaxOutstandingElementCount () != null ,
273+ "maxOutstandingElementCount must be set in BatchingSettings#FlowControlSettings" );
274+ Preconditions .checkState (
275+ defaultSettings .getMaxOutstandingRequestBytes () != null ,
276+ "maxOutstandingRequestBytes must be set in BatchingSettings#FlowControlSettings" );
277+ Preconditions .checkArgument (
278+ batchingSettings .getElementCountThreshold () == null
279+ || defaultSettings .getMaxOutstandingElementCount ()
280+ >= batchingSettings .getElementCountThreshold (),
281+ "if elementCountThreshold is set in BatchingSettings, maxOutstandingElementCount must be >= elementCountThreshold" );
282+ Preconditions .checkArgument (
283+ batchingSettings .getRequestByteThreshold () == null
284+ || defaultSettings .getMaxOutstandingRequestBytes ()
285+ >= batchingSettings .getRequestByteThreshold (),
286+ "if requestByteThreshold is set in BatchingSettings, getMaxOutstandingRequestBytes must be >= getRequestByteThreshold" );
287+ // Combine static FlowControlSettings with latency based throttling settings to create
288+ // DynamicFlowControlSettings.
289+ if (isLatencyBasedThrottlingEnabled ()) {
290+ long maxThrottlingElementCount = defaultSettings .getMaxOutstandingElementCount ();
291+ long maxThrottlingRequestByteCount = defaultSettings .getMaxOutstandingRequestBytes ();
292+ // The maximum in flight element count is pretty high. Set the initial parallelism to 25%
293+ // of the maximum and then work up or down. This reduction should reduce the
294+ // impacts of a bursty job, such as those found in Dataflow.
295+ long initialElementCount = maxThrottlingElementCount / 4 ;
296+ // Decreases are floored at 1% of the maximum so that there is some level of
297+ // throughput.
298+ long minElementCount = maxThrottlingElementCount / 100 ;
299+ // Make sure initialOutstandingElementCount and minOutstandingElementCount element count are
300+ // greater or equal to batch size to avoid deadlocks.
301+ if (batchingSettings .getElementCountThreshold () != null ) {
302+ initialElementCount =
303+ Math .max (initialElementCount , batchingSettings .getElementCountThreshold ());
304+ minElementCount = Math .max (minElementCount , batchingSettings .getElementCountThreshold ());
305+ }
306+ dynamicFlowControlSettings =
307+ DynamicFlowControlSettings .newBuilder ()
308+ .setLimitExceededBehavior (defaultSettings .getLimitExceededBehavior ())
309+ .setInitialOutstandingElementCount (initialElementCount )
310+ .setMaxOutstandingElementCount (maxThrottlingElementCount )
311+ .setMinOutstandingElementCount (minElementCount )
312+ .setInitialOutstandingRequestBytes (maxThrottlingRequestByteCount )
313+ .setMinOutstandingRequestBytes (maxThrottlingRequestByteCount )
314+ .setMaxOutstandingRequestBytes (maxThrottlingRequestByteCount )
315+ .build ();
316+ } else {
317+ dynamicFlowControlSettings =
318+ DynamicFlowControlSettings .newBuilder ()
319+ .setLimitExceededBehavior (defaultSettings .getLimitExceededBehavior ())
320+ .setInitialOutstandingElementCount (defaultSettings .getMaxOutstandingElementCount ())
321+ .setMaxOutstandingElementCount (defaultSettings .getMaxOutstandingElementCount ())
322+ .setMinOutstandingElementCount (defaultSettings .getMaxOutstandingElementCount ())
323+ .setInitialOutstandingRequestBytes (defaultSettings .getMaxOutstandingRequestBytes ())
324+ .setMinOutstandingRequestBytes (defaultSettings .getMaxOutstandingRequestBytes ())
325+ .setMaxOutstandingRequestBytes (defaultSettings .getMaxOutstandingRequestBytes ())
326+ .build ();
327+ }
151328 return new BigtableBatchingCallSettings (this );
152329 }
153330 }
0 commit comments