Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ src/*.html

# SSL certificates
*.pem

/guest-profiles
/benchmark-results/**
536 changes: 536 additions & 0 deletions OPTIMIZATION.md

Large diffs are not rendered by default.

31 changes: 26 additions & 5 deletions crates/common/src/html_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use crate::tsjs;
struct HtmlWithPostProcessing {
inner: HtmlRewriterAdapter,
post_processors: Vec<Arc<dyn IntegrationHtmlPostProcessor>>,
/// Accumulated output from intermediate chunks. Only used when
/// `post_processors` is non-empty, because post-processors (e.g. RSC
/// placeholder substitution) need the complete document to operate on.
accumulated_output: Vec<u8>,
origin_host: String,
request_host: String,
request_scheme: String,
Expand All @@ -29,12 +33,27 @@ struct HtmlWithPostProcessing {
impl StreamProcessor for HtmlWithPostProcessing {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result<Vec<u8>, io::Error> {
let output = self.inner.process_chunk(chunk, is_last)?;
if !is_last || output.is_empty() || self.post_processors.is_empty() {

// No post-processors → stream through immediately (fast path).
if self.post_processors.is_empty() {
return Ok(output);
}

let Ok(output_str) = std::str::from_utf8(&output) else {
return Ok(output);
// Post-processors registered → must accumulate so they can operate on
// the complete document (e.g. RSC placeholder substitution).
self.accumulated_output.extend_from_slice(&output);
if !is_last {
return Ok(Vec::new());
}

// All chunks received — run post-processing on the complete output.
let full_output = std::mem::take(&mut self.accumulated_output);
if full_output.is_empty() {
return Ok(full_output);
}

let Ok(output_str) = std::str::from_utf8(&full_output) else {
return Ok(full_output);
};

let ctx = IntegrationHtmlContext {
Expand All @@ -50,10 +69,10 @@ impl StreamProcessor for HtmlWithPostProcessing {
.iter()
.any(|p| p.should_process(output_str, &ctx))
{
return Ok(output);
return Ok(full_output);
}

let mut html = String::from_utf8(output).map_err(|e| {
let mut html = String::from_utf8(full_output).map_err(|e| {
io::Error::other(format!(
"HTML post-processing expected valid UTF-8 output: {e}"
))
Expand All @@ -79,6 +98,7 @@ impl StreamProcessor for HtmlWithPostProcessing {

fn reset(&mut self) {
self.inner.reset();
self.accumulated_output.clear();
self.document_state.clear();
}
}
Expand Down Expand Up @@ -464,6 +484,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
HtmlWithPostProcessing {
inner: HtmlRewriterAdapter::new(rewriter_settings),
post_processors,
accumulated_output: Vec::new(),
origin_host: config.origin_host,
request_host: config.request_host,
request_scheme: config.request_scheme,
Expand Down
197 changes: 196 additions & 1 deletion crates/common/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use fastly::{Body, Request, Response};
use crate::backend::BackendConfig;
use crate::http_util::{serve_static_with_etag, RequestInfo};

use crate::constants::{HEADER_X_COMPRESS_HINT, HEADER_X_SYNTHETIC_ID};
use crate::constants::{
ENV_FASTLY_IS_STAGING, ENV_FASTLY_SERVICE_VERSION, HEADER_X_COMPRESS_HINT,
HEADER_X_SYNTHETIC_ID, HEADER_X_TS_ENV, HEADER_X_TS_VERSION,
};
use crate::cookies::set_synthetic_cookie;
use crate::error::TrustedServerError;
use crate::integrations::IntegrationRegistry;
Expand Down Expand Up @@ -307,6 +310,198 @@ pub fn handle_publisher_request(
Ok(response)
}

#[allow(clippy::large_enum_variant)]
pub enum RouteResult {
/// Response fully buffered — send via `send_to_client()`
Buffered(Response),
/// Response already streamed to client
Streamed,
}

/// Streaming version of publisher request handling.
/// Uses `stream_to_client()` for text responses, falling back to buffered for errors.
///
/// # Errors
///
/// Returns an error if the generation of a synthetic ID fails, or if making the backend HTTP request to the origin fails.
pub fn handle_publisher_request_streaming(
settings: &Settings,
integration_registry: &IntegrationRegistry,
mut req: Request,
) -> Result<RouteResult, Report<TrustedServerError>> {
log::debug!("Streaming: Proxying request to publisher_origin");

let request_info = RequestInfo::from_request(&req);
let request_host = &request_info.host;
let request_scheme = &request_info.scheme;

let synthetic_id = get_or_generate_synthetic_id(settings, &req)?;

let backend_name = BackendConfig::from_url(
&settings.publisher.origin_url,
settings.proxy.certificate_check,
)?;
let origin_host = settings.publisher.origin_host();

req.set_header("host", &origin_host);

let mut response = req
.send(&backend_name)
.change_context(TrustedServerError::Proxy {
message: "Failed to proxy request to origin".to_string(),
})?;

let content_type = response
.get_header(header::CONTENT_TYPE)
.map(|h| h.to_str().unwrap_or_default())
.unwrap_or_default()
.to_string();

let should_process = content_type.contains("text/")
|| content_type.contains("application/javascript")
|| content_type.contains("application/json");

// Gate: only stream 2xx processable text responses with a request host
// Non-processable but successful responses can still be buffered and passed through efficiently
// Wait: if it's successful but NOT processable, should we stream it or buffer it?
// It's fine to fall back to the buffered return, it will be sent via `send_to_client()` in main.rs.
// Actually, `stream_to_client()` with no processing means we have to pump the body.
// Let's just buffer it for now to match exactly what we have in the legacy path, since process_response_streaming
// also skips if !should_process.
// But wait, the standard path skips `process_response_streaming` but returns the buffered Response.
// Returning `RouteResult::Buffered(response)` handles this perfectly.

// Check if we will stream
let will_stream =
response.get_status().is_success() && should_process && !request_host.is_empty();

if !will_stream {
log::debug!(
"Falling back to buffered for response - status: {}, should_process: {}, request_host: '{}'",
response.get_status(),
should_process,
request_host
);
response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str());
set_synthetic_cookie(settings, &mut response, synthetic_id.as_str());
return Ok(RouteResult::Buffered(response));
}

let content_encoding = response
.get_header(header::CONTENT_ENCODING)
.map(|h| h.to_str().unwrap_or_default())
.unwrap_or_default()
.to_lowercase();

log::debug!(
"Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}",
content_type, content_encoding, request_host, origin_host
);

let body = response.take_body();
let compression = Compression::from_content_encoding(&content_encoding);

response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str());
set_synthetic_cookie(settings, &mut response, synthetic_id.as_str());

if let Ok(v) = ::std::env::var(ENV_FASTLY_SERVICE_VERSION) {
response.set_header(HEADER_X_TS_VERSION, v);
}
if ::std::env::var(ENV_FASTLY_IS_STAGING).as_deref() == Ok("1") {
response.set_header(HEADER_X_TS_ENV, "staging");
}

// Add global settings headers before streaming since we commit headers
for (key, value) in &settings.response_headers {
response.set_header(key, value);
}

// Remove content-length since we stream and modify size
response.remove_header(header::CONTENT_LENGTH);

// Commit to streaming — headers (including our additions) sent NOW
let streaming_body = response.stream_to_client();
let mut buffered_streaming_body = std::io::BufWriter::with_capacity(8192, streaming_body);

let params = ProcessResponseParams {
content_encoding: &content_encoding,
origin_host: &origin_host,
origin_url: &settings.publisher.origin_url,
request_host,
request_scheme,
settings,
content_type: &content_type,
integration_registry,
};

let is_html = params.content_type.contains("text/html");
let is_rsc_flight = params.content_type.contains("text/x-component");

let config = PipelineConfig {
input_compression: compression,
output_compression: compression,
chunk_size: 8192,
};

let process_result = if is_html {
match create_html_stream_processor(
params.origin_host,
params.request_host,
params.request_scheme,
params.settings,
params.integration_registry,
) {
Ok(processor) => {
let mut pipeline = StreamingPipeline::new(config, processor);
pipeline.process(body, &mut buffered_streaming_body)
}
Err(e) => {
log::error!("Failed to create html stream processor: {:?}", e);
// We've already sent headers, we can't change the status. Just return.
return Ok(RouteResult::Streamed);
}
}
} else if is_rsc_flight {
let processor = RscFlightUrlRewriter::new(
params.origin_host,
params.origin_url,
params.request_host,
params.request_scheme,
);
let mut pipeline = StreamingPipeline::new(config, processor);
pipeline.process(body, &mut buffered_streaming_body)
} else {
let replacer = create_url_replacer(
params.origin_host,
params.origin_url,
params.request_host,
params.request_scheme,
);
let mut pipeline = StreamingPipeline::new(config, replacer);
pipeline.process(body, &mut buffered_streaming_body)
};

match process_result {
Ok(()) => match buffered_streaming_body.into_inner() {
Ok(streaming_body) => {
if let Err(e) = streaming_body.finish() {
log::error!("Failed to finish streaming_body: {:?}", e);
} else {
log::debug!("Completed streaming processing of response body");
}
}
Err(e) => {
log::error!("Failed to flush buffered streaming body: {:?}", e.error());
}
},
Err(e) => {
log::error!("Streaming failed mid-flight: {:?}", e);
}
}

Ok(RouteResult::Streamed)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading