From 974070450038ffec9423adf5d1ae2eefac03cdd6 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Fri, 3 Apr 2026 12:01:25 +0200 Subject: [PATCH] Add configurable HTTP/2 connection window size --- .../hc/core5/http2/config/H2Config.java | 35 ++++- .../impl/nio/AbstractH2StreamMultiplexer.java | 33 +++-- .../hc/core5/http2/config/H2ConfigTest.java | 18 ++- .../nio/TestAbstractH2StreamMultiplexer.java | 128 ++++++++++++++++++ 4 files changed, 196 insertions(+), 18 deletions(-) diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java index 89b538860d..abd3f48254 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java @@ -47,19 +47,21 @@ public class H2Config { private final boolean pushEnabled; private final int maxConcurrentStreams; private final int initialWindowSize; + private final int connectionWindowSize; private final int maxFrameSize; private final int maxHeaderListSize; private final boolean compressionEnabled; private final int maxContinuations; H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams, - final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize, - final boolean compressionEnabled, final int maxContinuations) { + final int initialWindowSize, final int connectionWindowSize, final int maxFrameSize, + final int maxHeaderListSize, final boolean compressionEnabled, final int maxContinuations) { super(); this.headerTableSize = headerTableSize; this.pushEnabled = pushEnabled; this.maxConcurrentStreams = maxConcurrentStreams; this.initialWindowSize = initialWindowSize; + this.connectionWindowSize = connectionWindowSize; this.maxFrameSize = maxFrameSize; this.maxHeaderListSize = maxHeaderListSize; this.compressionEnabled = compressionEnabled; @@ -82,6 +84,16 @@ public int getInitialWindowSize() { return initialWindowSize; } + /** + * Returns the connection-level receive window size. This controls the flow-control + * window for the entire connection as opposed to individual streams. + * + * @since 5.5 + */ + public int getConnectionWindowSize() { + return connectionWindowSize; + } + public int getMaxFrameSize() { return maxFrameSize; } @@ -105,6 +117,7 @@ public String toString() { .append(", pushEnabled=").append(this.pushEnabled) .append(", maxConcurrentStreams=").append(this.maxConcurrentStreams) .append(", initialWindowSize=").append(this.initialWindowSize) + .append(", connectionWindowSize=").append(this.connectionWindowSize) .append(", maxFrameSize=").append(this.maxFrameSize) .append(", maxHeaderListSize=").append(this.maxHeaderListSize) .append(", compressionEnabled=").append(this.compressionEnabled) @@ -130,6 +143,7 @@ public static H2Config.Builder initial() { .setMaxConcurrentStreams(Integer.MAX_VALUE) // no limit .setMaxFrameSize(INIT_MAX_FRAME_SIZE) .setInitialWindowSize(INIT_WINDOW_SIZE) + .setConnectionWindowSize(INIT_WINDOW_SIZE) .setMaxHeaderListSize(Integer.MAX_VALUE); // unlimited } @@ -140,6 +154,7 @@ public static H2Config.Builder copy(final H2Config config) { .setPushEnabled(config.isPushEnabled()) .setMaxConcurrentStreams(config.getMaxConcurrentStreams()) .setInitialWindowSize(config.getInitialWindowSize()) + .setConnectionWindowSize(config.getConnectionWindowSize()) .setMaxFrameSize(config.getMaxFrameSize()) .setMaxHeaderListSize(config.getMaxHeaderListSize()) .setCompressionEnabled(config.isCompressionEnabled()); @@ -151,6 +166,7 @@ public static class Builder { private boolean pushEnabled; private int maxConcurrentStreams; private int initialWindowSize; + private int connectionWindowSize; private int maxFrameSize; private int maxHeaderListSize; private boolean compressionEnabled; @@ -161,6 +177,7 @@ public static class Builder { this.pushEnabled = INIT_ENABLE_PUSH; this.maxConcurrentStreams = INIT_CONCURRENT_STREAM; this.initialWindowSize = INIT_WINDOW_SIZE; + this.connectionWindowSize = Integer.MAX_VALUE; this.maxFrameSize = FrameConsts.MIN_FRAME_SIZE * 4; this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE; this.compressionEnabled = true; @@ -188,6 +205,19 @@ public Builder setInitialWindowSize(final int initialWindowSize) { return this; } + /** + * Sets the connection-level receive window size. This controls the flow-control + * window for the entire connection as opposed to individual streams governed by + * {@link #setInitialWindowSize(int)}. + * + * @since 5.5 + */ + public Builder setConnectionWindowSize(final int connectionWindowSize) { + this.connectionWindowSize = Args.checkRange(connectionWindowSize, INIT_WINDOW_SIZE, + Integer.MAX_VALUE, "Connection window size"); + return this; + } + public Builder setMaxFrameSize(final int maxFrameSize) { this.maxFrameSize = Args.checkRange(maxFrameSize, FrameConsts.MIN_FRAME_SIZE, FrameConsts.MAX_FRAME_SIZE, "Invalid max frame size"); @@ -222,6 +252,7 @@ public H2Config build() { pushEnabled, maxConcurrentStreams, initialWindowSize, + connectionWindowSize, maxFrameSize, maxHeaderListSize, compressionEnabled, diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 8e9171b57e..1113b24af2 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -104,7 +104,7 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection { - private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; + private static final int DEFAULT_CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN } enum SettingsHandshake { READY, TRANSMITTED, ACKED } @@ -132,6 +132,8 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private SettingsHandshake localSettingState = SettingsHandshake.READY; private SettingsHandshake remoteSettingState = SettingsHandshake.READY; + private final int connWindowSize; + private final long connWindowLowMark; private int initInputWinSize; private int initOutputWinSize; private int lowMark; @@ -178,6 +180,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); this.streams = new H2Streams(idGenerator); this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT; + this.connWindowSize = this.localConfig.getConnectionWindowSize(); + this.connWindowLowMark = this.connWindowSize == Integer.MAX_VALUE + ? DEFAULT_CONNECTION_WINDOW_LOW_MARK + : this.connWindowSize / 2; this.inputMetrics = new BasicH2TransportMetrics(); this.outputMetrics = new BasicH2TransportMetrics(); this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics); @@ -258,11 +264,15 @@ private int updateWindow(final AtomicInteger window, final int delta) throws Ari } } - private int updateWindowMax(final AtomicInteger window) throws ArithmeticException { + private int updateWindowTo(final AtomicInteger window, final int target) throws ArithmeticException { for (;;) { final int current = window.get(); - if (window.compareAndSet(current, Integer.MAX_VALUE)) { - return Integer.MAX_VALUE - current; + final int delta = target - current; + if (delta <= 0) { + return 0; + } + if (window.compareAndSet(current, target)) { + return delta; } } } @@ -446,7 +456,7 @@ public final void onConnect() throws HttpException, IOException { commitFrame(settingsFrame); localSettingState = SettingsHandshake.TRANSMITTED; - maximizeWindow(0, connInputWindow); + replenishWindow(0, connInputWindow, connWindowSize); if (streamListener != null) { final int initInputWindow = connInputWindow.get(); @@ -1119,15 +1129,15 @@ private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throw stream.produceInputCapacityUpdate(); } final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength); - if (connWinSize < CONNECTION_WINDOW_LOW_MARK) { - maximizeWindow(0, connInputWindow); + if (connWinSize < connWindowLowMark) { + replenishWindow(0, connInputWindow, connWindowSize); } } stream.consumeData(payload, frame.isFlagSet(FrameFlag.END_STREAM)); } - private void maximizeWindow(final int streamId, final AtomicInteger window) throws IOException { - final int delta = updateWindowMax(window); + private void replenishWindow(final int streamId, final AtomicInteger window, final int target) throws IOException { + final int delta = updateWindowTo(window, target); if (delta > 0) { final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, delta); commitFrame(windowUpdateFrame); @@ -1594,7 +1604,10 @@ public void push(final List
headers, final AsyncPushProducer pushProduce @Override public void update(final int increment) throws IOException { - incrementInputCapacity(0, connInputWindow, increment); + final int connCeiling = connWindowSize - connInputWindow.get(); + if (connCeiling > 0) { + incrementInputCapacity(0, connInputWindow, Math.min(increment, connCeiling)); + } incrementInputCapacity(id, inputWindow, increment); } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java index a6a2859d5f..73c535bd5b 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java @@ -37,28 +37,32 @@ class H2ConfigTest { @Test - void builder() { - // Create and start requester - final H2Config h2Config = H2Config.custom() - .setPushEnabled(false) - .build(); + void defaults() { + final H2Config h2Config = H2Config.custom().build(); assertNotNull(h2Config); + assertEquals(65535, h2Config.getInitialWindowSize()); + assertEquals(Integer.MAX_VALUE, h2Config.getConnectionWindowSize()); } @Test void checkValues() { - // Create and start requester final H2Config h2Config = H2Config.custom() .setHeaderTableSize(1) .setMaxConcurrentStreams(1) + .setInitialWindowSize(131072) + .setConnectionWindowSize(1048576) .setMaxFrameSize(16384) + .setMaxHeaderListSize(4096) .setPushEnabled(true) .setCompressionEnabled(true) .build(); assertEquals(1, h2Config.getHeaderTableSize()); assertEquals(1, h2Config.getMaxConcurrentStreams()); + assertEquals(131072, h2Config.getInitialWindowSize()); + assertEquals(1048576, h2Config.getConnectionWindowSize()); assertEquals(16384, h2Config.getMaxFrameSize()); + assertEquals(4096, h2Config.getMaxHeaderListSize()); assertTrue(h2Config.isPushEnabled()); assertTrue(h2Config.isCompressionEnabled()); } @@ -70,6 +74,7 @@ void copy() { .setHeaderTableSize(1) .setMaxConcurrentStreams(1) .setMaxFrameSize(16384) + .setConnectionWindowSize(2097152) .setPushEnabled(true) .setCompressionEnabled(true) .build(); @@ -80,6 +85,7 @@ void copy() { assertAll( () -> assertEquals(h2Config.getHeaderTableSize(), h2Config2.getHeaderTableSize()), () -> assertEquals(h2Config.getInitialWindowSize(), h2Config2.getInitialWindowSize()), + () -> assertEquals(h2Config.getConnectionWindowSize(), h2Config2.getConnectionWindowSize()), () -> assertEquals(h2Config.getMaxConcurrentStreams(), h2Config2.getMaxConcurrentStreams()), () -> assertEquals(h2Config.getMaxFrameSize(), h2Config2.getMaxFrameSize()), () -> assertEquals(h2Config.getMaxHeaderListSize(), h2Config2.getMaxHeaderListSize()) diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 6c9e88e4f1..fd47cef345 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.stream.IntStream; @@ -2017,5 +2018,132 @@ void testHeadersWithPrioritySelfDependencyIsStreamProtocolError() throws Excepti .consumeHeader(ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean()); } + @Test + void testConnectionWindowSizeOnConnect() throws Exception { + final int configuredWindowSize = 1048576; // 1 MB + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, + H2Config.custom() + .setConnectionWindowSize(configuredWindowSize) + .build(), + h2StreamListener, () -> streamHandler); + + mux.onConnect(); + + final List frames = parseFrames(concat(writes)); + final int expectedDelta = configuredWindowSize - 65535; + final boolean found = frames.stream().anyMatch(f -> + f.type == FrameType.WINDOW_UPDATE.getValue() + && f.streamId == 0 + && f.payload.length == 4 + && ((f.payload[0] & 0xff) << 24 + | (f.payload[1] & 0xff) << 16 + | (f.payload[2] & 0xff) << 8 + | (f.payload[3] & 0xff)) == expectedDelta); + Assertions.assertTrue(found, + "onConnect must send WINDOW_UPDATE(stream=0, delta=" + expectedDelta + ")"); + + final Field connInputWindowField = + AbstractH2StreamMultiplexer.class.getDeclaredField("connInputWindow"); + connInputWindowField.setAccessible(true); + final AtomicInteger connInputWindow = (AtomicInteger) connInputWindowField.get(mux); + Assertions.assertEquals(configuredWindowSize, connInputWindow.get()); + } + + @Test + void testDefaultConnectionWindowSizeMaximizesOnConnect() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, + h2StreamListener, () -> streamHandler); + + mux.onConnect(); + + final Field connInputWindowField = + AbstractH2StreamMultiplexer.class.getDeclaredField("connInputWindow"); + connInputWindowField.setAccessible(true); + final AtomicInteger connInputWindow = (AtomicInteger) connInputWindowField.get(mux); + Assertions.assertEquals(Integer.MAX_VALUE, connInputWindow.get(), + "Default config must maximize connection input window"); + } + + @Test + void testConnectionWindowSizeDoesNotAffectSettingsInitialWindowSize() throws Exception { + final int streamWindowSize = 32768; + final int connWindowSize = 1048576; + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, + H2Config.custom() + .setInitialWindowSize(streamWindowSize) + .setConnectionWindowSize(connWindowSize) + .build(), + h2StreamListener, () -> streamHandler); + + mux.onConnect(); + + final List frames = parseFrames(concat(writes)); + final FrameStub settingsFrame = frames.stream() + .filter(f -> f.type == FrameType.SETTINGS.getValue() + && f.streamId == 0 + && !f.isAck()) + .findFirst() + .orElseThrow(() -> new AssertionError("No SETTINGS frame found")); + + final int initialWindowSizeParamCode = 0x4; + int advertisedWindowSize = -1; + for (int i = 0; i + 5 < settingsFrame.payload.length; i += 6) { + final int id = ((settingsFrame.payload[i] & 0xff) << 8) + | (settingsFrame.payload[i + 1] & 0xff); + final int val = ((settingsFrame.payload[i + 2] & 0xff) << 24) + | ((settingsFrame.payload[i + 3] & 0xff) << 16) + | ((settingsFrame.payload[i + 4] & 0xff) << 8) + | (settingsFrame.payload[i + 5] & 0xff); + if (id == initialWindowSizeParamCode) { + advertisedWindowSize = val; + } + } + + Assertions.assertEquals(streamWindowSize, advertisedWindowSize, + "SETTINGS INITIAL_WINDOW_SIZE must reflect initialWindowSize, not connectionWindowSize"); + } + } \ No newline at end of file