diff --git a/src/listener.ts b/src/listener.ts index 83603bf..31467dd 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -1,5 +1,5 @@ import type { IncomingMessage, ServerResponse, OutgoingHttpHeaders } from 'node:http' -import { Http2ServerRequest } from 'node:http2' +import { Http2ServerRequest, constants as h2constants } from 'node:http2' import type { Http2ServerResponse } from 'node:http2' import type { Writable } from 'node:stream' import type { IncomingMessageWithWrapBodyStream } from './request' @@ -23,9 +23,68 @@ import { X_ALREADY_SENT } from './utils/response/constants' import './globals' const outgoingEnded = Symbol('outgoingEnded') +const incomingDraining = Symbol('incomingDraining') type OutgoingHasOutgoingEnded = Http2ServerResponse & { [outgoingEnded]?: () => void } +type IncomingHasDrainState = (IncomingMessage | Http2ServerRequest) & { + [incomingDraining]?: boolean +} + +const DRAIN_TIMEOUT_MS = 500 +const MAX_DRAIN_BYTES = 64 * 1024 * 1024 + +const drainIncoming = (incoming: IncomingMessage | Http2ServerRequest): void => { + const incomingWithDrainState = incoming as IncomingHasDrainState + if (incoming.destroyed || incomingWithDrainState[incomingDraining]) { + return + } + incomingWithDrainState[incomingDraining] = true + + // HTTP/2: streams are multiplexed, so we can close immediately + // without risking TCP RST racing the response. + if (incoming instanceof Http2ServerRequest) { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(incoming as any).stream?.close?.(h2constants.NGHTTP2_NO_ERROR) + } catch { + // stream may already be closed + } + return + } + + let bytesRead = 0 + const cleanup = () => { + clearTimeout(timer) + incoming.off('data', onData) + incoming.off('end', cleanup) + incoming.off('error', cleanup) + } + + const forceClose = () => { + cleanup() + const socket = incoming.socket + if (socket && !socket.destroyed) { + socket.destroySoon() + } + } + + const timer = setTimeout(forceClose, DRAIN_TIMEOUT_MS) + timer.unref?.() + + const onData = (chunk: Buffer) => { + bytesRead += chunk.length + if (bytesRead > MAX_DRAIN_BYTES) { + forceClose() + } + } + + incoming.on('data', onData) + incoming.on('end', cleanup) + incoming.on('error', cleanup) + + incoming.resume() +} const handleRequestError = (): Response => new Response(null, { @@ -265,15 +324,21 @@ export const getRequestListener = ( // and end is called at this point. At that point, nothing is done. if (!incomingEnded) { setTimeout(() => { - incoming.destroy() - // a Http2ServerResponse instance will not terminate without also calling outgoing.destroy() - outgoing.destroy() + drainIncoming(incoming) }) } }) } } } + + // Drain incoming as soon as the response is flushed to the OS, + // before the socket is closed, to prevent TCP RST racing the response. + outgoing.on('finish', () => { + if (!incomingEnded) { + drainIncoming(incoming) + } + }) } // Detect if request was aborted. @@ -294,7 +359,7 @@ export const getRequestListener = ( // and end is called at this point. At that point, nothing is done. if (!incomingEnded) { setTimeout(() => { - incoming.destroy() + drainIncoming(incoming) }) } }) diff --git a/test/app.ts b/test/app.ts index 931f20f..ee41523 100644 --- a/test/app.ts +++ b/test/app.ts @@ -52,6 +52,13 @@ app.post('/partially-consumed-and-cancelled', async (c) => { reader.cancel() return c.text('Partially consumed and cancelled') }) +app.post('/early-413', (c) => { + if (!c.req.raw.body) { + // force create new request object + throw new Error('No body consumed') + } + return c.text('Payload Too Large', 413) +}) app.delete('/posts/:id', (c) => { return c.text(`DELETE ${c.req.param('id')}`) }) diff --git a/test/server-socket.test.ts b/test/server-socket.test.ts index f14a1c1..f50d0d1 100644 --- a/test/server-socket.test.ts +++ b/test/server-socket.test.ts @@ -1,9 +1,16 @@ +import { Hono } from 'hono' +import { bodyLimit } from 'hono/body-limit' import fs from 'node:fs' import { request as requestHTTP } from 'node:http' import type { IncomingMessage } from 'node:http' -import { connect as connectHTTP2, createSecureServer as createHTTP2Server } from 'node:http2' +import { + connect as connectHTTP2, + constants as h2constants, + createSecureServer as createHTTP2Server, +} from 'node:http2' import type { ClientHttp2Session } from 'node:http2' import { request as requestHTTPS, createServer as createHTTPSServer } from 'node:https' +import { connect as connectNet } from 'node:net' import type { AddressInfo } from 'node:net' import { serve } from '../src/server' import type { ServerType } from '../src/types' @@ -11,6 +18,21 @@ import { app } from './app' const nodeVersionV20OrLater = parseInt(process.version.slice(1).split('.')[0]) >= 20 +const createBodyLimitApp = () => { + const app = new Hono() + + app.post( + '/', + bodyLimit({ + maxSize: 1024 * 1024, + onError: (c) => c.text('Payload exceeded', 413), + }), + (c) => c.text('ok') + ) + + return app +} + describe('autoCleanupIncoming: true (default)', () => { let address: AddressInfo let server: ServerType @@ -222,6 +244,82 @@ describe('autoCleanupIncoming: true (default)', () => { await Promise.all([reqPromise, resPromise]) expect(responseBody).toBe(expectEmptyBody ? '' : 'Partially consumed and cancelled') }) + + // Skip on Windows because the client-side ECONNRESET timing for early HTTPS 413 + // responses is not stable across environments, even when the server response is valid. + ;(process.platform === 'win32' ? it.skip : it)( + 'Should return 413 response without ECONNRESET - POST /early-413', + async () => { + let responseBody = '' + let responseStatus = 0 + let requestError: Error | null = null + let sendTimer: ReturnType | undefined + + const cleanupSendTimer = () => { + if (sendTimer) { + clearTimeout(sendTimer) + sendTimer = undefined + } + } + + const req = request( + { + hostname: address.address, + port: address.port, + method: 'POST', + path: '/early-413', + rejectUnauthorized: false, + }, + (res) => { + responseStatus = res.statusCode ?? 0 + res.on('data', (chunk) => { + responseBody += chunk.toString() + }) + res.on('close', () => { + // For HTTP2, statusCode is set asynchronously via 'response' event + if (!responseStatus) { + responseStatus = res.statusCode ?? 0 + } + resClose() + }) + } + ) + + req.on('close', reqClose) + req.on('error', (err) => { + cleanupSendTimer() + requestError = err + }) + + // Send large body slowly to simulate real network upload + const chunkSize = 64 * 1024 + const totalSize = 1024 * 1024 + let offset = 0 + const sendChunk = () => { + if (offset >= totalSize) { + return + } + req.write(Buffer.alloc(Math.min(chunkSize, totalSize - offset))) + offset += chunkSize + sendTimer = setTimeout(sendChunk, 5) + } + sendChunk() + + await Promise.all([reqPromise, resPromise]) + cleanupSendTimer() + expect(responseStatus).toBe(413) + if (!expectEmptyBody) { + expect(responseBody).toBe('Payload Too Large') + } + if ('rstCode' in req) { + expect(req.rstCode).toBe(h2constants.NGHTTP2_NO_ERROR) + } + // Should not get ECONNRESET before receiving the response + if (requestError) { + expect((requestError as NodeJS.ErrnoException).code).not.toBe('ECONNRESET') + } + } + ) } beforeEach(() => { @@ -313,6 +411,24 @@ describe('autoCleanupIncoming: true (default)', () => { server.close() }) + // Reconnect HTTP2 client before each test to avoid session-level + // flow control window exhaustion from previous tests + beforeEach((done) => { + if (!client.closed && !client.destroyed) { + client.close(() => { + client = connectHTTP2(`https://${address.address}:${address.port}`, { + rejectUnauthorized: false, + }) + client.once('connect', () => done()) + }) + } else { + client = connectHTTP2(`https://${address.address}:${address.port}`, { + rejectUnauthorized: false, + }) + client.once('connect', () => done()) + } + }) + runner( (( { @@ -331,6 +447,9 @@ describe('autoCleanupIncoming: true (default)', () => { ':path': path, }) + req.on('response', (headers) => { + ;(req as unknown as { statusCode: number | undefined }).statusCode = headers[':status'] + }) callback(req as unknown as IncomingMessage) return req }) as unknown as typeof requestHTTP, @@ -339,6 +458,120 @@ describe('autoCleanupIncoming: true (default)', () => { }) }) +describe('lingering close for early 413 responses', () => { + let address: AddressInfo + let server: ServerType + + beforeAll(async () => { + const bodyLimitApp = createBodyLimitApp() + address = await new Promise((resolve) => { + server = serve( + { + hostname: '127.0.0.1', + fetch: bodyLimitApp.fetch, + port: 0, + }, + (address) => { + resolve(address) + } + ) + }) + }) + + afterAll(() => { + server.close() + }) + + it('Should keep HTTP/1 connection graceful after body-limit sends 413', async () => { + const total = 50 * 1024 * 1024 + 1 + + await new Promise((resolve, reject) => { + const socket = connectNet(address.port, address.address) + let response = '' + let saw413 = false + let settled = false + let writeTimer: ReturnType | undefined + let writesAfterResponse = 0 + let firstWriteAfterResponseSucceeded = false + + const finish = (error?: Error) => { + if (settled) { + return + } + settled = true + if (writeTimer) { + clearInterval(writeTimer) + } + socket.removeAllListeners() + socket.destroy() + if (error) { + reject(error) + } else { + resolve() + } + } + + socket.on('connect', () => { + socket.write( + 'POST / HTTP/1.1\r\n' + + 'Host: localhost\r\n' + + `Content-Length: ${total}\r\n` + + 'Content-Type: text/plain\r\n' + + '\r\n' + ) + socket.write(Buffer.alloc(64 * 1024)) + writeTimer = setInterval(() => { + if (socket.destroyed) { + return + } + socket.write(Buffer.alloc(64 * 1024), (error) => { + if (error) { + finish(error) + return + } + if (saw413) { + writesAfterResponse += 1 + firstWriteAfterResponseSucceeded = true + if (writesAfterResponse >= 4) { + if (writeTimer) { + clearInterval(writeTimer) + } + socket.end() + } + } + }) + }, 20) + }) + + socket.on('data', (chunk) => { + response += chunk.toString() + if (!saw413 && response.includes('413 Payload Too Large')) { + saw413 = true + } + }) + + socket.on('error', (error) => { + finish(error) + }) + + socket.on('close', (hadError) => { + if (hadError) { + return + } + if (!saw413) { + finish(new Error(`Expected 413 response, got: ${response}`)) + return + } + if (!firstWriteAfterResponseSucceeded) { + finish(new Error('Expected at least one successful write after the 413 response.')) + return + } + finish() + }) + }) + }) +}) + describe('autoCleanupIncoming: false', () => { let address: AddressInfo let server: ServerType diff --git a/test/server.test.ts b/test/server.test.ts index 298eeae..a89ce34 100644 --- a/test/server.test.ts +++ b/test/server.test.ts @@ -67,7 +67,7 @@ describe('Basic', () => { it('Should return 200 response - POST /partially-consumed', async () => { const buffer = Buffer.alloc(1024 * 10) // large buffer - const res = await new Promise((resolve, reject) => { + const res = await new Promise((resolve, reject) => { const req = request(server) .post('/partially-consumed') .set('Content-Length', buffer.length.toString()) @@ -88,7 +88,7 @@ describe('Basic', () => { it('Should return 200 response - POST /partially-consumed-and-cancelled', async () => { const buffer = Buffer.alloc(1) // A large buffer will not make the test go far, so keep it small because it won't go far. - const res = await new Promise((resolve, reject) => { + const res = await new Promise((resolve, reject) => { const req = request(server) .post('/partially-consumed-and-cancelled') .set('Content-Length', buffer.length.toString())