perf: Add batch coalescing in BufBatchWriter to reduce IPC schema overhead#3441
Draft
andygrove wants to merge 1 commit intoapache:mainfrom
Draft
perf: Add batch coalescing in BufBatchWriter to reduce IPC schema overhead#3441andygrove wants to merge 1 commit intoapache:mainfrom
andygrove wants to merge 1 commit intoapache:mainfrom
Conversation
Member
Author
|
@EmilyMatt fyi |
In the multi-partition shuffle path, each small batch becomes its own IPC block with full schema metadata overhead (~500-2000 bytes). Small batches arise from spills under memory pressure, trailing batches, and skewed partitions. This adds coalescing to BufBatchWriter: instead of serializing every batch immediately, batches accumulate until reaching batch_size rows, then get concat_batches'd into a single larger batch before serialization. RecordBatch::clone() is cheap (Arc reference counting only), so accumulation has minimal overhead. The single-partition path already feeds large batches, so coalescing is effectively a no-op there. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
06b47cf to
c6991b8
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
In the multi-partition shuffle path, each small batch becomes its own IPC block with full schema metadata overhead (~500-2000 bytes). Small batches arise from:
batch_size(8192), the last chunk is often much smallerThe single-partition path (
SinglePartitionShufflePartitioner) already coalesces batches before writing. The multi-partition path does not.What changes are included in this PR?
Adds batch coalescing directly to
BufBatchWriter. Instead of serializing every batch immediately into an IPC block, batches accumulate in aVec<RecordBatch>and getconcat_batches'd when accumulated rows reachbatch_size. This way bothshuffle_write_partitionandPartitionWriter::spillbenefit automatically.RecordBatch::clone()is cheap (Arc reference counting only — no data copy), so accumulation has minimal overhead. The single-partition path already feeds large batches, so the coalescing is effectively a no-op there.Files changed
writers/buf_batch_writer.rspending_batches,pending_rows,batch_sizefields; coalescing inwrite();flush_pending();write_batch_to_buffer(); updateflush()signaturewriters/partition_writer.rsbatch_sizeparam tospill()partitioners/multi_partition.rsbatch_sizeparam toshuffle_write_partition(); pass through at all call sitespartitioners/single_partition.rsBufBatchWriter::new()andflush()call signaturesshuffle_writer.rsHow are these changes tested?
New test
test_batch_coalescing_reduces_sizewrites 100 small batches (50 rows × 20 int32 columns) throughBufBatchWriter:batch_size=8192(coalescing active) → fewer, larger IPC blocksbatch_size=1(no coalescing) → many small IPC blocksread_ipc_compressedAll 12 existing shuffle tests continue to pass.
🤖 Generated with Claude Code