2828package org .apache .hc .client5 .http .impl .async ;
2929
3030import java .io .InterruptedIOException ;
31+ import java .lang .reflect .Proxy ;
32+ import java .util .concurrent .ConcurrentHashMap ;
33+ import java .util .concurrent .ConcurrentMap ;
34+ import java .util .concurrent .RejectedExecutionException ;
35+ import java .util .concurrent .atomic .AtomicBoolean ;
36+ import java .util .concurrent .atomic .AtomicInteger ;
3137import java .util .concurrent .atomic .AtomicReference ;
3238
3339import org .apache .hc .client5 .http .EndpointInfo ;
@@ -59,13 +65,15 @@ static class ReUseData {
5965 final Object state ;
6066 final TimeValue validDuration ;
6167
62- ReUseData (final Object state , final TimeValue validDuration ) {
63- this .state = state ;
64- this .validDuration = validDuration ;
65- }
68+ ReUseData (final Object state , final TimeValue validDuration ) {
69+ this .state = state ;
70+ this .validDuration = validDuration ;
71+ }
6672
6773 }
6874
75+ private static final ConcurrentMap <AsyncClientConnectionManager , AtomicInteger > QUEUE_COUNTERS = new ConcurrentHashMap <>();
76+
6977 private final Logger log ;
7078 private final AsyncClientConnectionManager manager ;
7179 private final ConnectionInitiator connectionInitiator ;
@@ -77,13 +85,25 @@ static class ReUseData {
7785 private final TlsConfig tlsConfig ;
7886 private final AtomicReference <AsyncConnectionEndpoint > endpointRef ;
7987 private final AtomicReference <ReUseData > reuseDataRef ;
88+ private final int maxQueued ;
89+ private final AtomicInteger sharedQueued ;
8090
8191 InternalHttpAsyncExecRuntime (
8292 final Logger log ,
8393 final AsyncClientConnectionManager manager ,
8494 final ConnectionInitiator connectionInitiator ,
8595 final HandlerFactory <AsyncPushConsumer > pushHandlerFactory ,
8696 final TlsConfig tlsConfig ) {
97+ this (log , manager , connectionInitiator , pushHandlerFactory , tlsConfig , -1 );
98+ }
99+
100+ InternalHttpAsyncExecRuntime (
101+ final Logger log ,
102+ final AsyncClientConnectionManager manager ,
103+ final ConnectionInitiator connectionInitiator ,
104+ final HandlerFactory <AsyncPushConsumer > pushHandlerFactory ,
105+ final TlsConfig tlsConfig ,
106+ final int maxQueued ) {
87107 super ();
88108 this .log = log ;
89109 this .manager = manager ;
@@ -92,6 +112,8 @@ static class ReUseData {
92112 this .tlsConfig = tlsConfig ;
93113 this .endpointRef = new AtomicReference <>();
94114 this .reuseDataRef = new AtomicReference <>();
115+ this .maxQueued = maxQueued ;
116+ this .sharedQueued = maxQueued > 0 ? QUEUE_COUNTERS .computeIfAbsent (manager , m -> new AtomicInteger (0 )) : null ;
95117 }
96118
97119 @ Override
@@ -218,8 +240,7 @@ public Cancellable connectEndpoint(
218240 return Operations .nonCancellable ();
219241 }
220242 final RequestConfig requestConfig = context .getRequestConfigOrDefault ();
221- @ SuppressWarnings ("deprecation" )
222- final Timeout connectTimeout = requestConfig .getConnectTimeout ();
243+ @ SuppressWarnings ("deprecation" ) final Timeout connectTimeout = requestConfig .getConnectTimeout ();
223244 if (log .isDebugEnabled ()) {
224245 log .debug ("{} connecting endpoint ({})" , ConnPoolSupport .getId (endpoint ), connectTimeout );
225246 }
@@ -241,7 +262,7 @@ public void completed(final AsyncConnectionEndpoint endpoint) {
241262 }
242263 }
243264
244- }));
265+ }));
245266
246267 }
247268
@@ -282,10 +303,61 @@ public EndpointInfo getEndpointInfo() {
282303 return endpoint != null ? endpoint .getInfo () : null ;
283304 }
284305
306+ private boolean tryAcquireSlot () {
307+ if (sharedQueued == null ) {
308+ return true ;
309+ }
310+ for (; ; ) {
311+ final int q = sharedQueued .get ();
312+ if (q >= maxQueued ) {
313+ return false ;
314+ }
315+ if (sharedQueued .compareAndSet (q , q + 1 )) {
316+ return true ;
317+ }
318+ }
319+ }
320+
321+ private void releaseSlot () {
322+ if (sharedQueued != null ) {
323+ sharedQueued .decrementAndGet ();
324+ }
325+ }
326+
327+ private AsyncClientExchangeHandler guard (final AsyncClientExchangeHandler handler ) {
328+ if (sharedQueued == null ) {
329+ return handler ;
330+ }
331+ final AtomicBoolean released = new AtomicBoolean (false );
332+ return (AsyncClientExchangeHandler ) Proxy .newProxyInstance (
333+ AsyncClientExchangeHandler .class .getClassLoader (),
334+ new Class <?>[]{AsyncClientExchangeHandler .class },
335+ (proxy , method , args ) -> {
336+ if ("releaseResources" .equals (method .getName ())
337+ && method .getParameterCount () == 0 ) {
338+ try {
339+ return method .invoke (handler , args );
340+ } finally {
341+ if (released .compareAndSet (false , true )) {
342+ releaseSlot ();
343+ }
344+ }
345+ }
346+ return method .invoke (handler , args );
347+ });
348+ }
349+
285350 @ Override
286351 public Cancellable execute (
287352 final String id , final AsyncClientExchangeHandler exchangeHandler , final HttpClientContext context ) {
288353 final AsyncConnectionEndpoint endpoint = ensureValid ();
354+ if (sharedQueued != null && !tryAcquireSlot ()) {
355+ exchangeHandler .failed (new RejectedExecutionException (
356+ "Execution pipeline queue limit reached (max=" + maxQueued + ")" ));
357+ return Operations .nonCancellable ();
358+ }
359+ final AsyncClientExchangeHandler actual =
360+ sharedQueued != null ? guard (exchangeHandler ) : exchangeHandler ;
289361 if (endpoint .isConnected ()) {
290362 if (log .isDebugEnabled ()) {
291363 log .debug ("{} start execution {}" , ConnPoolSupport .getId (endpoint ), id );
@@ -295,10 +367,10 @@ public Cancellable execute(
295367 if (responseTimeout != null ) {
296368 endpoint .setSocketTimeout (responseTimeout );
297369 }
298- endpoint .execute (id , exchangeHandler , pushHandlerFactory , context );
370+ endpoint .execute (id , actual , pushHandlerFactory , context );
299371 if (context .getRequestConfigOrDefault ().isHardCancellationEnabled ()) {
300372 return () -> {
301- exchangeHandler .cancel ();
373+ actual .cancel ();
302374 return true ;
303375 };
304376 }
@@ -311,20 +383,20 @@ public void completed(final AsyncExecRuntime runtime) {
311383 log .debug ("{} start execution {}" , ConnPoolSupport .getId (endpoint ), id );
312384 }
313385 try {
314- endpoint .execute (id , exchangeHandler , pushHandlerFactory , context );
386+ endpoint .execute (id , actual , pushHandlerFactory , context );
315387 } catch (final RuntimeException ex ) {
316388 failed (ex );
317389 }
318390 }
319391
320392 @ Override
321393 public void failed (final Exception ex ) {
322- exchangeHandler .failed (ex );
394+ actual .failed (ex );
323395 }
324396
325397 @ Override
326398 public void cancelled () {
327- exchangeHandler .failed (new InterruptedIOException ());
399+ actual .failed (new InterruptedIOException ());
328400 }
329401
330402 });
@@ -344,7 +416,7 @@ public void markConnectionNonReusable() {
344416
345417 @ Override
346418 public AsyncExecRuntime fork () {
347- return new InternalHttpAsyncExecRuntime (log , manager , connectionInitiator , pushHandlerFactory , tlsConfig );
419+ return new InternalHttpAsyncExecRuntime (log , manager , connectionInitiator , pushHandlerFactory , tlsConfig , maxQueued );
348420 }
349421
350- }
422+ }
0 commit comments