Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,21 @@ public class H2Config {
private final boolean pushEnabled;
private final int maxConcurrentStreams;
private final int initialWindowSize;
private final int connectionWindowSize;
private final int maxFrameSize;
private final int maxHeaderListSize;
private final boolean compressionEnabled;
private final int maxContinuations;

H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams,
final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize,
final boolean compressionEnabled, final int maxContinuations) {
final int initialWindowSize, final int connectionWindowSize, final int maxFrameSize,
final int maxHeaderListSize, final boolean compressionEnabled, final int maxContinuations) {
super();
this.headerTableSize = headerTableSize;
this.pushEnabled = pushEnabled;
this.maxConcurrentStreams = maxConcurrentStreams;
this.initialWindowSize = initialWindowSize;
this.connectionWindowSize = connectionWindowSize;
this.maxFrameSize = maxFrameSize;
this.maxHeaderListSize = maxHeaderListSize;
this.compressionEnabled = compressionEnabled;
Expand All @@ -82,6 +84,16 @@ public int getInitialWindowSize() {
return initialWindowSize;
}

/**
* Returns the connection-level receive window size. This controls the flow-control
* window for the entire connection as opposed to individual streams.
*
* @since 5.5
*/
public int getConnectionWindowSize() {
return connectionWindowSize;
}

public int getMaxFrameSize() {
return maxFrameSize;
}
Expand All @@ -105,6 +117,7 @@ public String toString() {
.append(", pushEnabled=").append(this.pushEnabled)
.append(", maxConcurrentStreams=").append(this.maxConcurrentStreams)
.append(", initialWindowSize=").append(this.initialWindowSize)
.append(", connectionWindowSize=").append(this.connectionWindowSize)
.append(", maxFrameSize=").append(this.maxFrameSize)
.append(", maxHeaderListSize=").append(this.maxHeaderListSize)
.append(", compressionEnabled=").append(this.compressionEnabled)
Expand All @@ -130,6 +143,7 @@ public static H2Config.Builder initial() {
.setMaxConcurrentStreams(Integer.MAX_VALUE) // no limit
.setMaxFrameSize(INIT_MAX_FRAME_SIZE)
.setInitialWindowSize(INIT_WINDOW_SIZE)
.setConnectionWindowSize(INIT_WINDOW_SIZE)
.setMaxHeaderListSize(Integer.MAX_VALUE); // unlimited
}

Expand All @@ -140,6 +154,7 @@ public static H2Config.Builder copy(final H2Config config) {
.setPushEnabled(config.isPushEnabled())
.setMaxConcurrentStreams(config.getMaxConcurrentStreams())
.setInitialWindowSize(config.getInitialWindowSize())
.setConnectionWindowSize(config.getConnectionWindowSize())
.setMaxFrameSize(config.getMaxFrameSize())
.setMaxHeaderListSize(config.getMaxHeaderListSize())
.setCompressionEnabled(config.isCompressionEnabled());
Expand All @@ -151,6 +166,7 @@ public static class Builder {
private boolean pushEnabled;
private int maxConcurrentStreams;
private int initialWindowSize;
private int connectionWindowSize;
private int maxFrameSize;
private int maxHeaderListSize;
private boolean compressionEnabled;
Expand All @@ -161,6 +177,7 @@ public static class Builder {
this.pushEnabled = INIT_ENABLE_PUSH;
this.maxConcurrentStreams = INIT_CONCURRENT_STREAM;
this.initialWindowSize = INIT_WINDOW_SIZE;
this.connectionWindowSize = Integer.MAX_VALUE;
this.maxFrameSize = FrameConsts.MIN_FRAME_SIZE * 4;
this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE;
this.compressionEnabled = true;
Expand Down Expand Up @@ -188,6 +205,19 @@ public Builder setInitialWindowSize(final int initialWindowSize) {
return this;
}

/**
* Sets the connection-level receive window size. This controls the flow-control
* window for the entire connection as opposed to individual streams governed by
* {@link #setInitialWindowSize(int)}.
*
* @since 5.5
*/
public Builder setConnectionWindowSize(final int connectionWindowSize) {
this.connectionWindowSize = Args.checkRange(connectionWindowSize, INIT_WINDOW_SIZE,
Integer.MAX_VALUE, "Connection window size");
return this;
}

public Builder setMaxFrameSize(final int maxFrameSize) {
this.maxFrameSize = Args.checkRange(maxFrameSize, FrameConsts.MIN_FRAME_SIZE, FrameConsts.MAX_FRAME_SIZE,
"Invalid max frame size");
Expand Down Expand Up @@ -222,6 +252,7 @@ public H2Config build() {
pushEnabled,
maxConcurrentStreams,
initialWindowSize,
connectionWindowSize,
maxFrameSize,
maxHeaderListSize,
compressionEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@

abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {

private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;
private static final int DEFAULT_CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024;

enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN }
enum SettingsHandshake { READY, TRANSMITTED, ACKED }
Expand Down Expand Up @@ -132,6 +132,8 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private SettingsHandshake localSettingState = SettingsHandshake.READY;
private SettingsHandshake remoteSettingState = SettingsHandshake.READY;

private final int connWindowSize;
private final long connWindowLowMark;
private int initInputWinSize;
private int initOutputWinSize;
private int lowMark;
Expand Down Expand Up @@ -178,6 +180,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.streams = new H2Streams(idGenerator);
this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
this.connWindowSize = this.localConfig.getConnectionWindowSize();
this.connWindowLowMark = this.connWindowSize == Integer.MAX_VALUE
? DEFAULT_CONNECTION_WINDOW_LOW_MARK
: this.connWindowSize / 2;
this.inputMetrics = new BasicH2TransportMetrics();
this.outputMetrics = new BasicH2TransportMetrics();
this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
Expand Down Expand Up @@ -258,11 +264,15 @@ private int updateWindow(final AtomicInteger window, final int delta) throws Ari
}
}

private int updateWindowMax(final AtomicInteger window) throws ArithmeticException {
private int updateWindowTo(final AtomicInteger window, final int target) throws ArithmeticException {
for (;;) {
final int current = window.get();
if (window.compareAndSet(current, Integer.MAX_VALUE)) {
return Integer.MAX_VALUE - current;
final int delta = target - current;
if (delta <= 0) {
return 0;
}
if (window.compareAndSet(current, target)) {
return delta;
}
}
}
Expand Down Expand Up @@ -446,7 +456,7 @@ public final void onConnect() throws HttpException, IOException {

commitFrame(settingsFrame);
localSettingState = SettingsHandshake.TRANSMITTED;
maximizeWindow(0, connInputWindow);
replenishWindow(0, connInputWindow, connWindowSize);

if (streamListener != null) {
final int initInputWindow = connInputWindow.get();
Expand Down Expand Up @@ -1119,15 +1129,15 @@ private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throw
stream.produceInputCapacityUpdate();
}
final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
maximizeWindow(0, connInputWindow);
if (connWinSize < connWindowLowMark) {
replenishWindow(0, connInputWindow, connWindowSize);
}
}
stream.consumeData(payload, frame.isFlagSet(FrameFlag.END_STREAM));
}

private void maximizeWindow(final int streamId, final AtomicInteger window) throws IOException {
final int delta = updateWindowMax(window);
private void replenishWindow(final int streamId, final AtomicInteger window, final int target) throws IOException {
final int delta = updateWindowTo(window, target);
if (delta > 0) {
final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, delta);
commitFrame(windowUpdateFrame);
Expand Down Expand Up @@ -1594,7 +1604,10 @@ public void push(final List<Header> headers, final AsyncPushProducer pushProduce

@Override
public void update(final int increment) throws IOException {
incrementInputCapacity(0, connInputWindow, increment);
final int connCeiling = connWindowSize - connInputWindow.get();
if (connCeiling > 0) {
incrementInputCapacity(0, connInputWindow, Math.min(increment, connCeiling));
}
incrementInputCapacity(id, inputWindow, increment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,32 @@
class H2ConfigTest {

@Test
void builder() {
// Create and start requester
final H2Config h2Config = H2Config.custom()
.setPushEnabled(false)
.build();
void defaults() {
final H2Config h2Config = H2Config.custom().build();
assertNotNull(h2Config);
assertEquals(65535, h2Config.getInitialWindowSize());
assertEquals(Integer.MAX_VALUE, h2Config.getConnectionWindowSize());
}

@Test
void checkValues() {
// Create and start requester
final H2Config h2Config = H2Config.custom()
.setHeaderTableSize(1)
.setMaxConcurrentStreams(1)
.setInitialWindowSize(131072)
.setConnectionWindowSize(1048576)
.setMaxFrameSize(16384)
.setMaxHeaderListSize(4096)
.setPushEnabled(true)
.setCompressionEnabled(true)
.build();

assertEquals(1, h2Config.getHeaderTableSize());
assertEquals(1, h2Config.getMaxConcurrentStreams());
assertEquals(131072, h2Config.getInitialWindowSize());
assertEquals(1048576, h2Config.getConnectionWindowSize());
assertEquals(16384, h2Config.getMaxFrameSize());
assertEquals(4096, h2Config.getMaxHeaderListSize());
assertTrue(h2Config.isPushEnabled());
assertTrue(h2Config.isCompressionEnabled());
}
Expand All @@ -70,6 +74,7 @@ void copy() {
.setHeaderTableSize(1)
.setMaxConcurrentStreams(1)
.setMaxFrameSize(16384)
.setConnectionWindowSize(2097152)
.setPushEnabled(true)
.setCompressionEnabled(true)
.build();
Expand All @@ -80,6 +85,7 @@ void copy() {
assertAll(
() -> assertEquals(h2Config.getHeaderTableSize(), h2Config2.getHeaderTableSize()),
() -> assertEquals(h2Config.getInitialWindowSize(), h2Config2.getInitialWindowSize()),
() -> assertEquals(h2Config.getConnectionWindowSize(), h2Config2.getConnectionWindowSize()),
() -> assertEquals(h2Config.getMaxConcurrentStreams(), h2Config2.getMaxConcurrentStreams()),
() -> assertEquals(h2Config.getMaxFrameSize(), h2Config2.getMaxFrameSize()),
() -> assertEquals(h2Config.getMaxHeaderListSize(), h2Config2.getMaxHeaderListSize())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -2017,5 +2018,132 @@ void testHeadersWithPrioritySelfDependencyIsStreamProtocolError() throws Excepti
.consumeHeader(ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean());
}

@Test
void testConnectionWindowSizeOnConnect() throws Exception {
final int configuredWindowSize = 1048576; // 1 MB
final List<byte[]> writes = new ArrayList<>();
Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
.thenAnswer(inv -> {
final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
final byte[] copy = new byte[b.remaining()];
b.get(copy);
writes.add(copy);
return copy.length;
});
Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());

final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
httpProcessor, CharCodingConfig.DEFAULT,
H2Config.custom()
.setConnectionWindowSize(configuredWindowSize)
.build(),
h2StreamListener, () -> streamHandler);

mux.onConnect();

final List<FrameStub> frames = parseFrames(concat(writes));
final int expectedDelta = configuredWindowSize - 65535;
final boolean found = frames.stream().anyMatch(f ->
f.type == FrameType.WINDOW_UPDATE.getValue()
&& f.streamId == 0
&& f.payload.length == 4
&& ((f.payload[0] & 0xff) << 24
| (f.payload[1] & 0xff) << 16
| (f.payload[2] & 0xff) << 8
| (f.payload[3] & 0xff)) == expectedDelta);
Assertions.assertTrue(found,
"onConnect must send WINDOW_UPDATE(stream=0, delta=" + expectedDelta + ")");

final Field connInputWindowField =
AbstractH2StreamMultiplexer.class.getDeclaredField("connInputWindow");
connInputWindowField.setAccessible(true);
final AtomicInteger connInputWindow = (AtomicInteger) connInputWindowField.get(mux);
Assertions.assertEquals(configuredWindowSize, connInputWindow.get());
}

@Test
void testDefaultConnectionWindowSizeMaximizesOnConnect() throws Exception {
final List<byte[]> writes = new ArrayList<>();
Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
.thenAnswer(inv -> {
final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
final byte[] copy = new byte[b.remaining()];
b.get(copy);
writes.add(copy);
return copy.length;
});
Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());

final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT,
h2StreamListener, () -> streamHandler);

mux.onConnect();

final Field connInputWindowField =
AbstractH2StreamMultiplexer.class.getDeclaredField("connInputWindow");
connInputWindowField.setAccessible(true);
final AtomicInteger connInputWindow = (AtomicInteger) connInputWindowField.get(mux);
Assertions.assertEquals(Integer.MAX_VALUE, connInputWindow.get(),
"Default config must maximize connection input window");
}

@Test
void testConnectionWindowSizeDoesNotAffectSettingsInitialWindowSize() throws Exception {
final int streamWindowSize = 32768;
final int connWindowSize = 1048576;
final List<byte[]> writes = new ArrayList<>();
Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
.thenAnswer(inv -> {
final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
final byte[] copy = new byte[b.remaining()];
b.get(copy);
writes.add(copy);
return copy.length;
});
Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());

final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
httpProcessor, CharCodingConfig.DEFAULT,
H2Config.custom()
.setInitialWindowSize(streamWindowSize)
.setConnectionWindowSize(connWindowSize)
.build(),
h2StreamListener, () -> streamHandler);

mux.onConnect();

final List<FrameStub> frames = parseFrames(concat(writes));
final FrameStub settingsFrame = frames.stream()
.filter(f -> f.type == FrameType.SETTINGS.getValue()
&& f.streamId == 0
&& !f.isAck())
.findFirst()
.orElseThrow(() -> new AssertionError("No SETTINGS frame found"));

final int initialWindowSizeParamCode = 0x4;
int advertisedWindowSize = -1;
for (int i = 0; i + 5 < settingsFrame.payload.length; i += 6) {
final int id = ((settingsFrame.payload[i] & 0xff) << 8)
| (settingsFrame.payload[i + 1] & 0xff);
final int val = ((settingsFrame.payload[i + 2] & 0xff) << 24)
| ((settingsFrame.payload[i + 3] & 0xff) << 16)
| ((settingsFrame.payload[i + 4] & 0xff) << 8)
| (settingsFrame.payload[i + 5] & 0xff);
if (id == initialWindowSizeParamCode) {
advertisedWindowSize = val;
}
}

Assertions.assertEquals(streamWindowSize, advertisedWindowSize,
"SETTINGS INITIAL_WINDOW_SIZE must reflect initialWindowSize, not connectionWindowSize");
}


}
Loading