3737import com .google .api .core .BetaApi ;
3838import com .google .api .core .InternalApi ;
3939import com .google .api .core .SettableApiFuture ;
40+ import com .google .api .gax .batching .FlowController .FlowControlException ;
41+ import com .google .api .gax .batching .FlowController .FlowControlRuntimeException ;
42+ import com .google .api .gax .batching .FlowController .LimitExceededBehavior ;
4043import com .google .api .gax .rpc .UnaryCallable ;
4144import com .google .common .annotations .VisibleForTesting ;
4245import com .google .common .base .Preconditions ;
5558import java .util .concurrent .atomic .AtomicInteger ;
5659import java .util .logging .Level ;
5760import java .util .logging .Logger ;
61+ import javax .annotation .Nullable ;
5862
5963/**
6064 * Queues up the elements until {@link #flush()} is called; once batching is over, returned future
@@ -87,13 +91,14 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
8791 private final Future <?> scheduledFuture ;
8892 private volatile boolean isClosed = false ;
8993 private final BatcherStats batcherStats = new BatcherStats ();
94+ private final FlowController flowController ;
9095
9196 /**
9297 * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
93- * into wrappers request and response.
94- * @param unaryCallable a {@link UnaryCallable} object.
95- * @param prototype a {@link RequestT} object.
96- * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds.
98+ * into wrappers request and response
99+ * @param unaryCallable a {@link UnaryCallable} object
100+ * @param prototype a {@link RequestT} object
101+ * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
97102 */
98103 public BatcherImpl (
99104 BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
@@ -102,15 +107,56 @@ public BatcherImpl(
102107 BatchingSettings batchingSettings ,
103108 ScheduledExecutorService executor ) {
104109
110+ this (batchingDescriptor , unaryCallable , prototype , batchingSettings , executor , null );
111+ }
112+
113+ /**
114+ * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
115+ * into wrappers request and response
116+ * @param unaryCallable a {@link UnaryCallable} object
117+ * @param prototype a {@link RequestT} object
118+ * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
119+ * @param flowController a {@link FlowController} for throttling requests. If it's null, create a
120+ * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
121+ */
122+ public BatcherImpl (
123+ BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
124+ UnaryCallable <RequestT , ResponseT > unaryCallable ,
125+ RequestT prototype ,
126+ BatchingSettings batchingSettings ,
127+ ScheduledExecutorService executor ,
128+ @ Nullable FlowController flowController ) {
129+
105130 this .batchingDescriptor =
106131 Preconditions .checkNotNull (batchingDescriptor , "batching descriptor cannot be null" );
107132 this .unaryCallable = Preconditions .checkNotNull (unaryCallable , "callable cannot be null" );
108133 this .prototype = Preconditions .checkNotNull (prototype , "request prototype cannot be null" );
109134 this .batchingSettings =
110135 Preconditions .checkNotNull (batchingSettings , "batching setting cannot be null" );
111136 Preconditions .checkNotNull (executor , "executor cannot be null" );
137+ if (flowController == null ) {
138+ flowController = new FlowController (batchingSettings .getFlowControlSettings ());
139+ }
140+ // If throttling is enabled, make sure flow control limits are greater or equal to batch sizes
141+ // to avoid deadlocking
142+ if (flowController .getLimitExceededBehavior () != LimitExceededBehavior .Ignore ) {
143+ Preconditions .checkArgument (
144+ flowController .getMaxOutstandingElementCount () == null
145+ || batchingSettings .getElementCountThreshold () == null
146+ || flowController .getMaxOutstandingElementCount ()
147+ >= batchingSettings .getElementCountThreshold (),
148+ "If throttling and batching on element count are enabled, FlowController"
149+ + "#maxOutstandingElementCount must be greater or equal to elementCountThreshold" );
150+ Preconditions .checkArgument (
151+ flowController .getMaxOutstandingRequestBytes () == null
152+ || batchingSettings .getRequestByteThreshold () == null
153+ || flowController .getMaxOutstandingRequestBytes ()
154+ >= batchingSettings .getRequestByteThreshold (),
155+ "If throttling and batching on request bytes are enabled, FlowController"
156+ + "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold" );
157+ }
158+ this .flowController = flowController ;
112159 currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
113-
114160 if (batchingSettings .getDelayThreshold () != null ) {
115161 long delay = batchingSettings .getDelayThreshold ().toMillis ();
116162 PushCurrentBatchRunnable <ElementT , ElementResultT , RequestT , ResponseT > runnable =
@@ -127,8 +173,29 @@ public BatcherImpl(
127173 @ Override
128174 public ApiFuture <ElementResultT > add (ElementT element ) {
129175 Preconditions .checkState (!isClosed , "Cannot add elements on a closed batcher" );
130- SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
176+ // This is not the optimal way of throttling. It does not send out partial batches, which
177+ // means that the Batcher might not use up all the resources allowed by FlowController.
178+ // The more efficient implementation should look like:
179+ // if (!flowController.tryReserve(1, bytes)) {
180+ // sendOutstanding();
181+ // reserve(1, bytes);
182+ // }
183+ // where tryReserve() will return false if there isn't enough resources, or reserve and return
184+ // true.
185+ // However, with the current FlowController implementation, adding a tryReserve() could be
186+ // confusing. FlowController will end up having 3 different reserve behaviors: blocking,
187+ // non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64
188+ // class, which made it seem unnecessary to have blocking and non-blocking semaphore
189+ // implementations. Some refactoring may be needed for the optimized implementation. So we'll
190+ // defer it till we decide on if refactoring FlowController is necessary.
191+ try {
192+ flowController .reserve (1 , batchingDescriptor .countBytes (element ));
193+ } catch (FlowControlException e ) {
194+ // This exception will only be thrown if the FlowController is set to ThrowException behavior
195+ throw FlowControlRuntimeException .fromFlowControlException (e );
196+ }
131197
198+ SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
132199 synchronized (elementLock ) {
133200 currentOpenBatch .add (element , result );
134201 }
@@ -169,6 +236,7 @@ public void sendOutstanding() {
169236 @ Override
170237 public void onSuccess (ResponseT response ) {
171238 try {
239+ flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
172240 accumulatedBatch .onBatchSuccess (response );
173241 } finally {
174242 onBatchCompletion ();
@@ -178,6 +246,7 @@ public void onSuccess(ResponseT response) {
178246 @ Override
179247 public void onFailure (Throwable throwable ) {
180248 try {
249+ flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
181250 accumulatedBatch .onBatchFailure (throwable );
182251 } finally {
183252 onBatchCompletion ();
@@ -224,6 +293,12 @@ public void close() throws InterruptedException {
224293 }
225294 }
226295
296+ /** Package-private for use in testing. */
297+ @ VisibleForTesting
298+ FlowController getFlowController () {
299+ return flowController ;
300+ }
301+
227302 /**
228303 * This class represent one logical Batch. It accumulates all the elements and their corresponding
229304 * future results for one batch.
0 commit comments