3131import com .google .common .base .Stopwatch ;
3232import com .google .common .util .concurrent .RateLimiter ;
3333import java .util .concurrent .TimeUnit ;
34+ import java .util .concurrent .atomic .AtomicBoolean ;
3435import java .util .concurrent .atomic .AtomicReference ;
35- import java .util .logging .Level ;
3636import java .util .logging .Logger ;
3737import javax .annotation .Nonnull ;
3838import org .threeten .bp .Duration ;
3939import org .threeten .bp .Instant ;
4040
4141class RateLimitingServerStreamingCallable
4242 extends ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > {
43+
4344 private static final Logger logger =
4445 Logger .getLogger (RateLimitingServerStreamingCallable .class .getName ());
4546
@@ -64,16 +65,17 @@ class RateLimitingServerStreamingCallable
6465 // as the server side cap
6566 private static final double MAX_FACTOR = 1.3 ;
6667
67- private final RateLimiter limiter ;
68+ private final ConditionalRateLimiter limiter ;
6869
69- private final AtomicReference <Instant > lastQpsChangeTime = new AtomicReference <>(Instant .now ());
7070 private final ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ;
7171
7272 RateLimitingServerStreamingCallable (
7373 @ Nonnull ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ) {
74- this .limiter = RateLimiter . create (DEFAULT_QPS );
74+ this .limiter = new ConditionalRateLimiter (DEFAULT_QPS );
7575 this .innerCallable = Preconditions .checkNotNull (innerCallable , "Inner callable must be set" );
76- logger .info ("Rate limiting is enabled with initial QPS of " + limiter .getRate ());
76+ logger .info (
77+ "BatchWriteFlowControl: Rate limiting callable is initiated with QPS of "
78+ + limiter .getRate ());
7779 }
7880
7981 @ Override
@@ -88,44 +90,129 @@ public void call(
8890 ((BigtableTracer ) context .getTracer ())
8991 .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
9092 }
91- RateLimitingResponseObserver innerObserver =
92- new RateLimitingResponseObserver (limiter , lastQpsChangeTime , responseObserver );
93+ RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver (responseObserver );
9394 innerCallable .call (request , innerObserver , context );
9495 }
9596
97+ static class ConditionalRateLimiter {
98+
99+ private AtomicBoolean enabled = new AtomicBoolean (true );
100+
101+ private final RateLimiter limiter ;
102+
103+ // This is the next time allowed to change QPS or disable rate limiting.
104+ private final AtomicReference <Instant > nextQpsChangeTime = new AtomicReference <>(Instant .now ());
105+
106+ public ConditionalRateLimiter (long defaultQps ) {
107+ limiter = RateLimiter .create (defaultQps );
108+ }
109+
110+ public double acquire () {
111+ if (enabled .get ()) {
112+ return limiter .acquire ();
113+ } else {
114+ return 0 ;
115+ }
116+ }
117+
118+ // Enable rate limiting immediately or disable after the QPS update period. Otherwise, no-op.
119+ public void trySetEnabled (boolean enabled ) {
120+ if (enabled ) {
121+ // Always enable immediately.
122+ boolean wasEnabled = this .enabled .getAndSet (true );
123+ if (!wasEnabled ) {
124+ logger .fine ("BatchWriteFlowControl: rate limiter is enabled." );
125+ }
126+ return ;
127+ }
128+ // Only disable after the QPS update period.
129+ Instant nextTime = nextQpsChangeTime .get ();
130+ Instant now = Instant .now ();
131+ if (now .isAfter (nextTime )) {
132+ boolean wasEnabled = this .enabled .getAndSet (false );
133+ if (wasEnabled ) {
134+ logger .fine ("BatchWriteFlowControl: rate limiter is disabled." );
135+ }
136+ }
137+ }
138+
139+ public boolean isEnabled () {
140+ return this .enabled .get ();
141+ }
142+
143+ public double getRate () {
144+ return limiter .getRate ();
145+ }
146+
147+ // Update the rate after the QPS update period. Otherwise, no-op.
148+ public void trySetRate (double rate , Duration period ) {
149+ Instant nextTime = nextQpsChangeTime .get ();
150+ Instant now = Instant .now ();
151+
152+ if (now .isAfter (nextTime )
153+ && nextQpsChangeTime .compareAndSet (nextTime , now .plusSeconds (period .getSeconds ()))) {
154+ logger .fine (
155+ "BatchWriteFlowControl: updated rate from "
156+ + limiter .getRate ()
157+ + " to "
158+ + rate
159+ + " with period "
160+ + period .getSeconds ()
161+ + " seconds." );
162+ limiter .setRate (rate );
163+ }
164+ }
165+
166+ @ VisibleForTesting
167+ void setEnabled (boolean enabled ) {
168+ this .enabled .set (enabled );
169+ }
170+ }
171+
96172 class RateLimitingResponseObserver extends SafeResponseObserver <MutateRowsResponse > {
97- private final ResponseObserver <MutateRowsResponse > outerObserver ;
98- private final RateLimiter rateLimiter ;
99173
100- private final AtomicReference < Instant > lastQpsChangeTime ;
174+ private final ResponseObserver < MutateRowsResponse > outerObserver ;
101175
102- RateLimitingResponseObserver (
103- RateLimiter rateLimiter ,
104- AtomicReference <Instant > lastQpsChangeTime ,
105- ResponseObserver <MutateRowsResponse > observer ) {
176+ RateLimitingResponseObserver (ResponseObserver <MutateRowsResponse > observer ) {
106177 super (observer );
107178 this .outerObserver = observer ;
108- this .rateLimiter = rateLimiter ;
109- this .lastQpsChangeTime = lastQpsChangeTime ;
110179 }
111180
112181 @ Override
113182 protected void onStartImpl (StreamController controller ) {
114183 outerObserver .onStart (controller );
115184 }
116185
186+ private boolean isRateLimitInfoAbsentOrInvalid (MutateRowsResponse response ) {
187+ // RateLimitInfo is an optional field. However, proto3 sub-message field always
188+ // have presence even thought it's marked as "optional". Check the factor and
189+ // period to make sure they're not 0.
190+ if (!response .hasRateLimitInfo ()) {
191+ logger .fine ("BatchWriteFlowControl: response carries no RateLimitInfo" );
192+ return true ;
193+ } else if (response .getRateLimitInfo ().getFactor () == 0
194+ || response .getRateLimitInfo ().getPeriod ().getSeconds () == 0 ) {
195+ logger .fine (
196+ "BatchWriteFlowControl: response carries invalid RateLimitInfo="
197+ + response .getRateLimitInfo ());
198+ return true ;
199+ }
200+ logger .fine (
201+ "BatchWriteFlowControl: response carries valid RateLimitInfo="
202+ + response .getRateLimitInfo ());
203+ return false ;
204+ }
205+
117206 @ Override
118207 protected void onResponseImpl (MutateRowsResponse response ) {
119- if (response .hasRateLimitInfo ()) {
208+ if (isRateLimitInfoAbsentOrInvalid (response )) {
209+ limiter .trySetEnabled (false );
210+ } else {
211+ limiter .trySetEnabled (true );
120212 RateLimitInfo info = response .getRateLimitInfo ();
121- // RateLimitInfo is an optional field. However, proto3 sub-message field always
122- // have presence even thought it's marked as "optional". Check the factor and
123- // period to make sure they're not 0.
124- if (info .getFactor () != 0 && info .getPeriod ().getSeconds () != 0 ) {
125- updateQps (
126- info .getFactor (),
127- Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
128- }
213+ updateQps (
214+ info .getFactor (),
215+ Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
129216 }
130217 outerObserver .onResponse (response );
131218 }
@@ -148,28 +235,30 @@ protected void onCompleteImpl() {
148235 }
149236
150237 private void updateQps (double factor , Duration period ) {
151- Instant lastTime = lastQpsChangeTime .get ();
152- Instant now = Instant .now ();
153-
154- if (now .minus (period ).isAfter (lastTime ) && lastQpsChangeTime .compareAndSet (lastTime , now )) {
155- double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
156- double currentRate = limiter .getRate ();
157- limiter .setRate (Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS ));
158- logger .log (
159- Level .FINE ,
160- "Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}" ,
161- new Object [] {currentRate , limiter .getRate (), factor , cappedFactor });
162- }
238+ double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
239+ double currentRate = limiter .getRate ();
240+ double cappedRate = Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS );
241+ limiter .trySetRate (cappedRate , period );
163242 }
164243 }
165244
166245 @ VisibleForTesting
167- AtomicReference <Instant > getLastQpsChangeTime () {
168- return lastQpsChangeTime ;
246+ AtomicReference <Instant > getNextQpsChangeTime () {
247+ return limiter . nextQpsChangeTime ;
169248 }
170249
171250 @ VisibleForTesting
172251 double getCurrentRate () {
173252 return limiter .getRate ();
174253 }
254+
255+ @ VisibleForTesting
256+ boolean getLimiterEnabled () {
257+ return limiter .isEnabled ();
258+ }
259+
260+ @ VisibleForTesting
261+ void setLimiterEnabled (boolean enabled ) {
262+ limiter .setEnabled (enabled );
263+ }
175264}
0 commit comments