3030import java .io .IOException ;
3131import java .net .InetSocketAddress ;
3232import java .nio .ByteBuffer ;
33+ import java .util .Collections ;
3334import java .util .List ;
35+ import java .util .Map ;
3436import java .util .Set ;
37+ import java .util .WeakHashMap ;
3538import java .util .concurrent .Future ;
39+ import java .util .concurrent .RejectedExecutionException ;
40+ import java .util .concurrent .atomic .AtomicBoolean ;
41+ import java .util .concurrent .atomic .AtomicInteger ;
3642
3743import org .apache .hc .core5 .annotation .Internal ;
3844import org .apache .hc .core5 .concurrent .Cancellable ;
@@ -87,6 +93,14 @@ public class H2MultiplexingRequester extends AsyncRequester {
8793
8894 private final H2ConnPool connPool ;
8995
96+ /**
97+ * Hard cap on per-connection queued / in-flight requests.
98+ * {@code <= 0} disables the cap.
99+ */
100+ private final int maxRequestsPerConnection ;
101+
102+ private final Map <IOSession , AtomicInteger > pendingRequestMap ;
103+
90104 /**
91105 * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
92106 */
@@ -100,11 +114,14 @@ public H2MultiplexingRequester(
100114 final Resolver <HttpHost , InetSocketAddress > addressResolver ,
101115 final TlsStrategy tlsStrategy ,
102116 final IOReactorMetricsListener threadPoolListener ,
103- final IOWorkerSelector workerSelector ) {
117+ final IOWorkerSelector workerSelector ,
118+ final int maxRequestsPerConnection ) {
104119 super (eventHandlerFactory , ioReactorConfig , ioSessionDecorator , exceptionCallback , sessionListener ,
105120 ShutdownCommand .GRACEFUL_IMMEDIATE_CALLBACK , DefaultAddressResolver .INSTANCE ,
106121 threadPoolListener , workerSelector );
107122 this .connPool = new H2ConnPool (this , addressResolver , tlsStrategy );
123+ this .maxRequestsPerConnection = maxRequestsPerConnection ;
124+ this .pendingRequestMap = Collections .synchronizedMap (new WeakHashMap <>());
108125 }
109126
110127 public void closeIdle (final TimeValue idleTime ) {
@@ -166,6 +183,16 @@ public Cancellable execute(
166183 return execute (null , exchangeHandler , null , timeout , context );
167184 }
168185
186+ private AtomicInteger getPendingCounter (final IOSession ioSession ) {
187+ final AtomicInteger counter = pendingRequestMap .get (ioSession );
188+ if (counter != null ) {
189+ return counter ;
190+ }
191+ final AtomicInteger newCounter = new AtomicInteger (0 );
192+ pendingRequestMap .put (ioSession , newCounter );
193+ return newCounter ;
194+ }
195+
169196 private void execute (
170197 final HttpHost target ,
171198 final AsyncClientExchangeHandler exchangeHandler ,
@@ -182,83 +209,54 @@ private void execute(
182209 if (request .getAuthority () == null ) {
183210 request .setAuthority (new URIAuthority (host ));
184211 }
212+ if (request .getScheme () == null ) {
213+ request .setScheme (host .getSchemeName ());
214+ }
185215 connPool .getSession (host , timeout , new FutureCallback <IOSession >() {
186216
187217 @ Override
188218 public void completed (final IOSession ioSession ) {
189- final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler () {
190219
191- @ Override
192- public void releaseResources () {
220+ final int max = maxRequestsPerConnection ;
221+ final AtomicInteger pendingCounter ;
222+ if (max > 0 ) {
223+ pendingCounter = getPendingCounter (ioSession );
224+ final int current = pendingCounter .incrementAndGet ();
225+ if (current > max ) {
226+ pendingCounter .decrementAndGet ();
227+ exchangeHandler .failed (new RejectedExecutionException (
228+ "Maximum number of pending requests per connection reached (max=" + max + ")" ));
193229 exchangeHandler .releaseResources ();
230+ return ;
194231 }
232+ } else {
233+ pendingCounter = null ;
234+ }
195235
196- @ Override
197- public void produceRequest (final RequestChannel channel , final HttpContext httpContext ) throws HttpException , IOException {
198- channel .sendRequest (request , entityDetails , httpContext );
199- }
200-
201- @ Override
202- public int available () {
203- return exchangeHandler .available ();
204- }
205-
206- @ Override
207- public void produce (final DataStreamChannel channel ) throws IOException {
208- exchangeHandler .produce (channel );
209- }
210-
211- @ Override
212- public void consumeInformation (final HttpResponse response , final HttpContext httpContext ) throws HttpException , IOException {
213- exchangeHandler .consumeInformation (response , httpContext );
214- }
215-
216- @ Override
217- public void consumeResponse (
218- final HttpResponse response , final EntityDetails entityDetails , final HttpContext httpContext ) throws HttpException , IOException {
219- exchangeHandler .consumeResponse (response , entityDetails , httpContext );
220- }
221-
222- @ Override
223- public void updateCapacity (final CapacityChannel capacityChannel ) throws IOException {
224- exchangeHandler .updateCapacity (capacityChannel );
225- }
226-
227- @ Override
228- public void consume (final ByteBuffer src ) throws IOException {
229- exchangeHandler .consume (src );
230- }
231-
232- @ Override
233- public void streamEnd (final List <? extends Header > trailers ) throws HttpException , IOException {
234- exchangeHandler .streamEnd (trailers );
235- }
236-
237- @ Override
238- public void cancel () {
239- exchangeHandler .cancel ();
240- }
241-
242- @ Override
243- public void failed (final Exception cause ) {
244- exchangeHandler .failed (cause );
245- }
236+ final AsyncClientExchangeHandler handlerProxy ;
237+ if (pendingCounter != null ) {
238+ handlerProxy = new SlotReleasingExchangeHandler (exchangeHandler , pendingCounter );
239+ } else {
240+ handlerProxy = exchangeHandler ;
241+ }
246242
247- };
248243 final Timeout socketTimeout = ioSession .getSocketTimeout ();
249- ioSession .enqueue (new RequestExecutionCommand (
250- handlerProxy ,
251- pushHandlerFactory ,
252- context ,
253- streamControl -> {
254- cancellableDependency .setDependency (streamControl );
255- if (socketTimeout != null ) {
256- streamControl .setTimeout (socketTimeout );
257- }
258- }),
259- Command .Priority .NORMAL );
244+ final RequestExecutionCommand command = new RequestExecutionCommand (
245+ handlerProxy ,
246+ pushHandlerFactory ,
247+ context ,
248+ streamControl -> {
249+ cancellableDependency .setDependency (streamControl );
250+ if (socketTimeout != null ) {
251+ streamControl .setTimeout (socketTimeout );
252+ }
253+ });
254+
255+ ioSession .enqueue (command , Command .Priority .NORMAL );
256+
260257 if (!ioSession .isOpen ()) {
261- exchangeHandler .failed (new ConnectionClosedException ());
258+ handlerProxy .failed (new ConnectionClosedException ());
259+ handlerProxy .releaseResources ();
262260 }
263261 }
264262
@@ -350,4 +348,106 @@ public H2ConnPool getConnPool() {
350348 return connPool ;
351349 }
352350
351+ private static final class SlotReleasingExchangeHandler implements AsyncClientExchangeHandler {
352+
353+ private final AsyncClientExchangeHandler exchangeHandler ;
354+ private final AtomicInteger pendingCounter ;
355+ private final AtomicBoolean released ;
356+
357+ private SlotReleasingExchangeHandler (final AsyncClientExchangeHandler exchangeHandler , final AtomicInteger pendingCounter ) {
358+ this .exchangeHandler = exchangeHandler ;
359+ this .pendingCounter = pendingCounter ;
360+ this .released = new AtomicBoolean (false );
361+ }
362+
363+ @ Override
364+ public void releaseResources () {
365+ if (released .compareAndSet (false , true )) {
366+ pendingCounter .decrementAndGet ();
367+ }
368+ exchangeHandler .releaseResources ();
369+ }
370+
371+ @ Override
372+ public void produceRequest (final RequestChannel channel , final HttpContext httpContext ) throws HttpException , IOException {
373+ exchangeHandler .produceRequest (channel , httpContext );
374+ }
375+
376+ @ Override
377+ public int available () {
378+ return exchangeHandler .available ();
379+ }
380+
381+ @ Override
382+ public void produce (final DataStreamChannel channel ) throws IOException {
383+ exchangeHandler .produce (channel );
384+ }
385+
386+ @ Override
387+ public void consumeInformation (final HttpResponse response , final HttpContext httpContext ) throws HttpException , IOException {
388+ exchangeHandler .consumeInformation (response , httpContext );
389+ }
390+
391+ @ Override
392+ public void consumeResponse (
393+ final HttpResponse response , final EntityDetails entityDetails , final HttpContext httpContext ) throws HttpException , IOException {
394+ exchangeHandler .consumeResponse (response , entityDetails , httpContext );
395+ }
396+
397+ @ Override
398+ public void updateCapacity (final CapacityChannel capacityChannel ) throws IOException {
399+ exchangeHandler .updateCapacity (capacityChannel );
400+ }
401+
402+ @ Override
403+ public void consume (final ByteBuffer src ) throws IOException {
404+ exchangeHandler .consume (src );
405+ }
406+
407+ @ Override
408+ public void streamEnd (final List <? extends Header > trailers ) throws HttpException , IOException {
409+ exchangeHandler .streamEnd (trailers );
410+ }
411+
412+ @ Override
413+ public void cancel () {
414+ exchangeHandler .cancel ();
415+ }
416+
417+ @ Override
418+ public void failed (final Exception cause ) {
419+ exchangeHandler .failed (cause );
420+ }
421+
422+ }
423+
424+ /**
425+ * Cancellable that can be wired to the stream control once it becomes available.
426+ */
427+ private static final class CancellableExecution implements Cancellable , CancellableDependency {
428+
429+ private volatile Cancellable dependency ;
430+
431+ @ Override
432+ public void setDependency (final Cancellable dependency ) {
433+ this .dependency = dependency ;
434+ }
435+
436+ @ Override
437+ public boolean isCancelled () {
438+ return false ;
439+ }
440+
441+ @ Override
442+ public boolean cancel () {
443+ final Cancellable local = this .dependency ;
444+ if (local != null ) {
445+ local .cancel ();
446+ return true ;
447+ }
448+ return false ;
449+ }
450+
451+ }
452+
353453}
0 commit comments