[VL] Reduce Velox hash shuffle partition buffer memory by evicting large partitions after split#12089
[VL] Reduce Velox hash shuffle partition buffer memory by evicting large partitions after split#12089wankunde wants to merge 4 commits into
Conversation
…artitions after split
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
Can you explain a bit more about this? According to the code logic, https://github.com/apache/gluten/blob/main/cpp/velox/shuffle/VeloxHashShuffleWriter.cc#L1389-L1391 |
When the shuffle writer is in
|
marin-ma
left a comment
There was a problem hiding this comment.
Thanks. Some minor comments for the configuration
| .createWithDefault(0.25) | ||
|
|
||
| val COLUMNAR_SHUFFLE_EVICT_PARTITION_SIZE = | ||
| buildConf("spark.gluten.sql.columnar.shuffle.evictPartitionSize") |
There was a problem hiding this comment.
I would suggest use spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold for this configuration.
| "For Velox hash shuffle writer, evict partition buffers larger than this threshold " + | ||
| "after splitting an input batch.") | ||
| .intConf | ||
| .createWithDefault(256 * 1024) |
There was a problem hiding this comment.
Can you set the default value to -1 to disable this eviction behaviour by default?
|
Run Gluten Clickhouse CI on x86 |
| | spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | | ||
| | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | ||
| | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | | ||
| | spark.gluten.sql.columnar.shuffle.evictPartitionSize | 262144 | For Velox hash shuffle writer, evict partition buffers larger than this threshold after splitting an input batch. | |
There was a problem hiding this comment.
Please also update the document.
|
LGTM. cc @FelixYBW Do you have any comments? |
|
Run Gluten Clickhouse CI on x86 |
What changes are proposed in this pull request?
Why this PR is needed?
In
doSplit()method, VeloxHashShuffleWriter will reclaim memory when the memory is full, butevictPartitionBuffersMinSize()method will be skipped due tosplitState_ == kInitcondition. Although there are some large shuffle partition buffers, we can not evict them.Changes in this PR:
This change adds a configurable threshold for the Velox hash shuffle writer to evict large partition buffers immediately after splitting an input batch.
When enabled, the hash shuffle writer estimates each partition buffer’s allocated capacity after split and evicts partitions whose buffered data exceeds the threshold.
This reduces retained partition buffer memory before processing the next input RowVector.
The new config is:
spark.gluten.sql.columnar.shuffle.evictPartitionSize
Default value: 256 * 1024 bytes.
How was this patch tested?
Fixed some OOM cases in our production.
Was this patch authored or co-authored using generative AI tooling?
NO