Skip to content

perf: Add batch coalescing in BufBatchWriter to reduce IPC schema overhead#3441

Draft
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:batch-coalescing
Draft

perf: Add batch coalescing in BufBatchWriter to reduce IPC schema overhead#3441
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:batch-coalescing

Conversation

@andygrove
Copy link
Member

Which issue does this PR close?

Closes #.

Rationale for this change

In the multi-partition shuffle path, each small batch becomes its own IPC block with full schema metadata overhead (~500-2000 bytes). Small batches arise from:

  • Spills under memory pressure: 200 partitions × frequent spills → ~40 rows per partition per spill event, each becoming a separate IPC block with full schema
  • Trailing batches: Iterator chunks into batch_size (8192), the last chunk is often much smaller
  • Small partitions: Skewed data or aggregation results may have very few rows per partition

The single-partition path (SinglePartitionShufflePartitioner) already coalesces batches before writing. The multi-partition path does not.

What changes are included in this PR?

Adds batch coalescing directly to BufBatchWriter. Instead of serializing every batch immediately into an IPC block, batches accumulate in a Vec<RecordBatch> and get concat_batches'd when accumulated rows reach batch_size. This way both shuffle_write_partition and PartitionWriter::spill benefit automatically.

RecordBatch::clone() is cheap (Arc reference counting only — no data copy), so accumulation has minimal overhead. The single-partition path already feeds large batches, so the coalescing is effectively a no-op there.

Files changed

File Change
writers/buf_batch_writer.rs Add pending_batches, pending_rows, batch_size fields; coalescing in write(); flush_pending(); write_batch_to_buffer(); update flush() signature
writers/partition_writer.rs Add batch_size param to spill()
partitioners/multi_partition.rs Add batch_size param to shuffle_write_partition(); pass through at all call sites
partitioners/single_partition.rs Update BufBatchWriter::new() and flush() call signatures
shuffle_writer.rs Add coalescing size comparison test

How are these changes tested?

New test test_batch_coalescing_reduces_size writes 100 small batches (50 rows × 20 int32 columns) through BufBatchWriter:

  • With batch_size=8192 (coalescing active) → fewer, larger IPC blocks
  • With batch_size=1 (no coalescing) → many small IPC blocks
  • Asserts coalesced output is smaller
  • Verifies both roundtrip correctly via read_ipc_compressed

All 12 existing shuffle tests continue to pass.

🤖 Generated with Claude Code

@andygrove andygrove changed the title Add batch coalescing in BufBatchWriter to reduce IPC schema overhead perf: Add batch coalescing in BufBatchWriter to reduce IPC schema overhead Feb 8, 2026
@andygrove andygrove added this to the 0.14.0 milestone Feb 8, 2026
@andygrove
Copy link
Member Author

@EmilyMatt fyi

In the multi-partition shuffle path, each small batch becomes its own
IPC block with full schema metadata overhead (~500-2000 bytes). Small
batches arise from spills under memory pressure, trailing batches, and
skewed partitions.

This adds coalescing to BufBatchWriter: instead of serializing every
batch immediately, batches accumulate until reaching batch_size rows,
then get concat_batches'd into a single larger batch before
serialization. RecordBatch::clone() is cheap (Arc reference counting
only), so accumulation has minimal overhead.

The single-partition path already feeds large batches, so coalescing
is effectively a no-op there.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant