[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator#55967
[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator#55967szehon-ho wants to merge 7 commits into
Conversation
Resolve SQLMetric references once per partition in MergeRowIterator instead of calling longMetric on every row. This avoids repeated metrics map lookups on the interpreted path; the codegen path is unchanged.
Document why metrics are cached per partition and use longMetric directly in MergeRowIterator instead of MergeRowsExec.this.
… cases Use a dedicated benchmark helper with 15s JIT warm-up and a 15s timed window for whole-stage on/off cases, and support running matched-update-only in isolation for local A/B testing.
… MergeRowsExec Use MergeRowsExec.this.longMetric in MergeRowIterator so metric resolution clearly refers to the outer operator instance.
Run all benchmark cases from runBenchmarkSuite without main-args filtering.
| private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied") | ||
| private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted") | ||
| private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted") | ||
| private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated") | ||
| private val numTargetRowsMatchedUpdated = | ||
| MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated") | ||
| private val numTargetRowsMatchedDeleted = | ||
| MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted") | ||
| private val numTargetRowsNotMatchedBySourceUpdated = | ||
| MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated") | ||
| private val numTargetRowsNotMatchedBySourceDeleted = | ||
| MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted") |
There was a problem hiding this comment.
Should we make all these lazy to not look them up when not needed?
There was a problem hiding this comment.
Done — metrics are now lazy val so we only resolve (and cache) metrics that are actually incremented in a given partition.
|
|
||
| // Resolve metrics once per partition; longMetric(name) does a map lookup on each call. | ||
| // See SPARK-56933. | ||
| private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied") |
There was a problem hiding this comment.
nit: I believe MergeRowsExec.this.longMetric() can be replaced with just longMetric()
There was a problem hiding this comment.
Done — using longMetric() directly in MergeRowIterator.
| withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f } | ||
| withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f } |
There was a problem hiding this comment.
I'm not familiar with the benchmarking infra, but it seems warmupTime parameter of the Benchmark is controlling this already, making these redundant?
There was a problem hiding this comment.
Agreed — removed the redundant pre-run calls. MergeRowsExecBenchmark now only uses the extended codegenBenchmark helper (warmupTime / minTime); the Benchmark class already warms up each case before timed iterations.
| * Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window so interpreted | ||
| * (whole-stage off) results are more stable when comparing metric caching changes. | ||
| */ | ||
| private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { |
There was a problem hiding this comment.
I am unsure about the benchmark numbers in the PR description. It says it's comparing origin/master to this PR, but here we are making changes to how these benchmarks are run. Are we doing apples-to-apples comparison here? Maybe it would be better to compare the following two:
- This PR
- This PR without the changes in MergeRowsExec
There was a problem hiding this comment.
yea, it is actually right. i had to modify the benchmark to increase the warmup, in order to get more consistent numbers. But I did run these two cases on the same new benchmark on this pr
- spark code of origin/master
- spark code of this pr (with cached metrics)
That being said, let me see if i need to do the benchmark changes
There was a problem hiding this comment.
Updated the PR description with a new A/B run on the same harness for both sides: MergeRowsExec at 1ad4fa420cd (before the cache) vs this PR (with cache), with only that file swapped between runs. Both used 7s warmup and 7s timed window via extended codegenBenchmark. The earlier table compared against origin/master with mixed harness settings and is replaced.
Resolve each SQLMetric on first use per partition and call longMetric() directly in MergeRowIterator per review feedback.
…enchmark Add warmupTime, minTime, and optional numIters to codegenBenchmark. Use 7s warmup and timed window in MergeRowsExecBenchmark without a separate helper or redundant pre-runs.
What changes were proposed in this pull request?
Cache
SQLMetricreferences inMergeRowIteratorand update them directly in the hot loop. Previously, each row calledlongMetric("…"), which performs ametrics(name)map lookup on every increment (up to 2–3 lookups per delete/update row). Metrics arelazy valfields so a partition only resolves metrics it actually increments.This matches the pattern used elsewhere (e.g.
FilterEvaluatorFactorypasses aSQLMetricinto the partition evaluator). The whole-stage codegen path is unchanged; it already resolves metrics once viametricTerm.codegenBenchmarkinSqlBasedBenchmarknow accepts optionalwarmupTime,minTime, and per-casenumIters.MergeRowsExecBenchmarkuses 7s warmup and a 7s timed window for all whole-stage on/off cases.Why are the changes needed?
MergeRowsExecupdates multiple MERGE metrics per output row on the interpreted path (doExecute/MergeRowIterator). For delete-heavy workloads with little projection work, repeated map lookups were a noticeable fraction of per-row cost. Production MERGE typically runs with whole-stage codegen enabled, but the interpreted path is still used when codegen is disabled or unsupported.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing
MergeRowsExec/ MERGE tests (CI).Local benchmark (
MergeRowsExecBenchmark, 20M rows, Apple M4 Max, JDK 21). Both sides used the same benchmark harness (7swarmupTime, 7sminTime,wholestageOffNumIters = 0/wholestageOnNumIters = 0via extendedcodegenBenchmark). ComparedMergeRowsExecwithout the cache (1ad4fa420cd, parent of the cache commit) vs with the cache (this PR), checking out only that file between runs.SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \ -Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \ "sql/Test/runMain org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"Whole-stage off (interpreted path) — best time (ms):
Matched-update-only and insert-only are roughly unchanged on the interpreted path in this run; the largest wins are on delete-heavy and multi-metric cases.
Whole-stage on (codegen) — unchanged within noise (e.g. matched delete best ~13 ms; matched update only ~333–338 ms).
Was this patch authored or co-authored using generative AI tooling?
No.