1212 */ 
1313package  org .asynchttpclient .extra ;
1414
15- import  org .asynchttpclient .AsyncHandler ;
16- import  org .asynchttpclient .HttpResponseBodyPart ;
17- import  org .asynchttpclient .HttpResponseHeaders ;
18- import  org .asynchttpclient .HttpResponseStatus ;
19- import  org .asynchttpclient .filter .FilterContext ;
20- import  org .asynchttpclient .filter .FilterException ;
21- import  org .asynchttpclient .filter .RequestFilter ;
15+ import  java .util .concurrent .Semaphore ;
16+ import  java .util .concurrent .TimeUnit ;
17+ 
2218import  org .asynchttpclient .filter .FilterContext ;
2319import  org .asynchttpclient .filter .FilterException ;
2420import  org .asynchttpclient .filter .RequestFilter ;
2521import  org .slf4j .Logger ;
2622import  org .slf4j .LoggerFactory ;
2723
28- import  java .util .concurrent .Semaphore ;
29- import  java .util .concurrent .TimeUnit ;
30- 
3124/** 
3225 * A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached, waiting for 
3326 * the response to arrives before executing the next request. 
3427 */ 
3528public  class  ThrottleRequestFilter  implements  RequestFilter  {
36-  private  final  static  Logger  logger  = LoggerFactory .getLogger (ThrottleRequestFilter .class );
37-  @ SuppressWarnings ("unused" )
38-  private  final  int  maxConnections ;
39-  private  final  Semaphore  available ;
40-  private  final  int  maxWait ;
41- 
42-  public  ThrottleRequestFilter (int  maxConnections ) {
43-  this .maxConnections  = maxConnections ;
44-  this .maxWait  = Integer .MAX_VALUE ;
45-  available  = new  Semaphore (maxConnections , true );
46-  }
47- 
48-  public  ThrottleRequestFilter (int  maxConnections , int  maxWait ) {
49-  this .maxConnections  = maxConnections ;
50-  this .maxWait  = maxWait ;
51-  available  = new  Semaphore (maxConnections , true );
52-  }
53- 
54-  /** 
55-  * {@inheritDoc} 
56-  */ 
57-  /* @Override */ 
58-  public  <T > FilterContext <T > filter (FilterContext <T > ctx ) throws  FilterException  {
59- 
60-  try  {
61-  if  (logger .isDebugEnabled ()) {
62-  logger .debug ("Current Throttling Status {}" , available .availablePermits ());
63-  }
64-  if  (!available .tryAcquire (maxWait , TimeUnit .MILLISECONDS )) {
65-  throw  new  FilterException (
66-  String .format ("No slot available for processing Request %s with AsyncHandler %s" ,
67-  ctx .getRequest (), ctx .getAsyncHandler ()));
68-  }
69-  ;
70-  } catch  (InterruptedException  e ) {
71-  throw  new  FilterException (
72-  String .format ("Interrupted Request %s with AsyncHandler %s" , ctx .getRequest (), ctx .getAsyncHandler ()));
73-  }
74- 
75-  return  new  FilterContext .FilterContextBuilder <T >(ctx ).asyncHandler (new  AsyncHandlerWrapper <T >(ctx .getAsyncHandler ())).build ();
76-  }
77- 
78-  private  class  AsyncHandlerWrapper <T > implements  AsyncHandler <T > {
79- 
80-  private  final  AsyncHandler <T > asyncHandler ;
81- 
82-  public  AsyncHandlerWrapper (AsyncHandler <T > asyncHandler ) {
83-  this .asyncHandler  = asyncHandler ;
84-  }
85- 
86-  /** 
87-  * {@inheritDoc} 
88-  */ 
89-  /* @Override */ 
90-  public  void  onThrowable (Throwable  t ) {
91-  try  {
92-  asyncHandler .onThrowable (t );
93-  } finally  {
94-  available .release ();
95-  if  (logger .isDebugEnabled ()) {
96-  logger .debug ("Current Throttling Status after onThrowable {}" , available .availablePermits ());
97-  }
98-  }
99-  }
100- 
101-  /** 
102-  * {@inheritDoc} 
103-  */ 
104-  /* @Override */ 
105-  public  STATE  onBodyPartReceived (HttpResponseBodyPart  bodyPart ) throws  Exception  {
106-  return  asyncHandler .onBodyPartReceived (bodyPart );
107-  }
108- 
109-  /** 
110-  * {@inheritDoc} 
111-  */ 
112-  /* @Override */ 
113-  public  STATE  onStatusReceived (HttpResponseStatus  responseStatus ) throws  Exception  {
114-  return  asyncHandler .onStatusReceived (responseStatus );
115-  }
116- 
117-  /** 
118-  * {@inheritDoc} 
119-  */ 
120-  /* @Override */ 
121-  public  STATE  onHeadersReceived (HttpResponseHeaders  headers ) throws  Exception  {
122-  return  asyncHandler .onHeadersReceived (headers );
123-  }
124- 
125-  /** 
126-  * {@inheritDoc} 
127-  */ 
128-  /* @Override */ 
129-  public  T  onCompleted () throws  Exception  {
130-  available .release ();
131-  if  (logger .isDebugEnabled ()) {
132-  logger .debug ("Current Throttling Status {}" , available .availablePermits ());
133-  }
134-  return  asyncHandler .onCompleted ();
135-  }
136-  }
137- }
29+ private  final  static  Logger  logger  = LoggerFactory .getLogger (ThrottleRequestFilter .class );
30+ private  final  Semaphore  available ;
31+ private  final  int  maxWait ;
32+ 
33+ public  ThrottleRequestFilter (int  maxConnections ) {
34+ this (maxConnections , Integer .MAX_VALUE );
35+ }
36+ 
37+ public  ThrottleRequestFilter (int  maxConnections , int  maxWait ) {
38+ this .maxWait  = maxWait ;
39+ available  = new  Semaphore (maxConnections , true );
40+ }
41+ 
42+ /** 
43+  * {@inheritDoc} 
44+  */ 
45+ /* @Override */ 
46+ public  <T > FilterContext <T > filter (FilterContext <T > ctx ) throws  FilterException  {
47+ 
48+ try  {
49+ if  (logger .isDebugEnabled ()) {
50+ logger .debug ("Current Throttling Status {}" , available .availablePermits ());
51+ }
52+ if  (!available .tryAcquire (maxWait , TimeUnit .MILLISECONDS )) {
53+ throw  new  FilterException (String .format (
54+ "No slot available for processing Request %s with AsyncHandler %s" , ctx .getRequest (),
55+ ctx .getAsyncHandler ()));
56+ }
57+ } catch  (InterruptedException  e ) {
58+ throw  new  FilterException (String .format ("Interrupted Request %s with AsyncHandler %s" , ctx .getRequest (),
59+ ctx .getAsyncHandler ()));
60+ }
61+ 
62+ return  new  FilterContext .FilterContextBuilder <T >(ctx ).asyncHandler (
63+ new  AsyncHandlerWrapper <T >(ctx .getAsyncHandler (), available )).build ();
64+ }
65+ }
0 commit comments