Skip to content

[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator#55967

Open
szehon-ho wants to merge 7 commits into
apache:masterfrom
szehon-ho:spark-cache-merge-rows-metrics
Open

[SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator#55967
szehon-ho wants to merge 7 commits into
apache:masterfrom
szehon-ho:spark-cache-merge-rows-metrics

Conversation

@szehon-ho
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho commented May 18, 2026

What changes were proposed in this pull request?

Cache SQLMetric references in MergeRowIterator and update them directly in the hot loop. Previously, each row called longMetric("…"), which performs a metrics(name) map lookup on every increment (up to 2–3 lookups per delete/update row). Metrics are lazy val fields so a partition only resolves metrics it actually increments.

This matches the pattern used elsewhere (e.g. FilterEvaluatorFactory passes a SQLMetric into the partition evaluator). The whole-stage codegen path is unchanged; it already resolves metrics once via metricTerm.

codegenBenchmark in SqlBasedBenchmark now accepts optional warmupTime, minTime, and per-case numIters. MergeRowsExecBenchmark uses 7s warmup and a 7s timed window for all whole-stage on/off cases.

Why are the changes needed?

MergeRowsExec updates multiple MERGE metrics per output row on the interpreted path (doExecute / MergeRowIterator). For delete-heavy workloads with little projection work, repeated map lookups were a noticeable fraction of per-row cost. Production MERGE typically runs with whole-stage codegen enabled, but the interpreted path is still used when codegen is disabled or unsupported.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing MergeRowsExec / MERGE tests (CI).

Local benchmark (MergeRowsExecBenchmark, 20M rows, Apple M4 Max, JDK 21). Both sides used the same benchmark harness (7s warmupTime, 7s minTime, wholestageOffNumIters = 0 / wholestageOnNumIters = 0 via extended codegenBenchmark). Compared MergeRowsExec without the cache (1ad4fa420cd, parent of the cache commit) vs with the cache (this PR), checking out only that file between runs.

SPARK_LOCAL_HOSTNAME=127.0.0.1 build/sbt -batch \
  -Dspark.driver.host=127.0.0.1 -Dspark.driver.bindAddress=127.0.0.1 \
  "sql/Test/runMain org.apache.spark.sql.execution.benchmark.MergeRowsExecBenchmark"

Whole-stage off (interpreted path) — best time (ms):

Case Without cache With cache (this PR) Change
matched update only 5475 5238 −4%
not matched insert only 7612 7337 −4%
matched update + not matched insert 5795 4315 −26%
matched delete 2914 546 −81%
conditional clauses 3872 1251 −68%
matched + not matched + not matched by source 3813 1119 −71%
split update (delete + insert) 1844 1400 −24%

Matched-update-only and insert-only are roughly unchanged on the interpreted path in this run; the largest wins are on delete-heavy and multi-metric cases.

Whole-stage on (codegen) — unchanged within noise (e.g. matched delete best ~13 ms; matched update only ~333–338 ms).

Was this patch authored or co-authored using generative AI tooling?

No.

Resolve SQLMetric references once per partition in MergeRowIterator instead
of calling longMetric on every row. This avoids repeated metrics map lookups
on the interpreted path; the codegen path is unchanged.
@szehon-ho szehon-ho changed the title [SQL] Cache SQL metrics in MergeRowsExec interpreted iterator [SPARK-56933][SQL] Cache SQL metrics in MergeRowsExec interpreted iterator May 18, 2026
szehon-ho added 4 commits May 18, 2026 16:47
Document why metrics are cached per partition and use longMetric directly
in MergeRowIterator instead of MergeRowsExec.this.
… cases

Use a dedicated benchmark helper with 15s JIT warm-up and a 15s timed
window for whole-stage on/off cases, and support running matched-update-only
in isolation for local A/B testing.
… MergeRowsExec

Use MergeRowsExec.this.longMetric in MergeRowIterator so metric resolution
clearly refers to the outer operator instance.
Run all benchmark cases from runBenchmarkSuite without main-args filtering.
Comment on lines +523 to +534
private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied")
private val numTargetRowsInserted = MergeRowsExec.this.longMetric("numTargetRowsInserted")
private val numTargetRowsDeleted = MergeRowsExec.this.longMetric("numTargetRowsDeleted")
private val numTargetRowsUpdated = MergeRowsExec.this.longMetric("numTargetRowsUpdated")
private val numTargetRowsMatchedUpdated =
MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated")
private val numTargetRowsMatchedDeleted =
MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted")
private val numTargetRowsNotMatchedBySourceUpdated =
MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated")
private val numTargetRowsNotMatchedBySourceDeleted =
MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make all these lazy to not look them up when not needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — metrics are now lazy val so we only resolve (and cache) metrics that are actually incremented in a given partition.


// Resolve metrics once per partition; longMetric(name) does a map lookup on each call.
// See SPARK-56933.
private val numTargetRowsCopied = MergeRowsExec.this.longMetric("numTargetRowsCopied")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I believe MergeRowsExec.this.longMetric() can be replaced with just longMetric()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — using longMetric() directly in MergeRowIterator.

Comment on lines +113 to +114
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f }
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the benchmarking infra, but it seems warmupTime parameter of the Benchmark is controlling this already, making these redundant?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed the redundant pre-run calls. MergeRowsExecBenchmark now only uses the extended codegenBenchmark helper (warmupTime / minTime); the Benchmark class already warms up each case before timed iterations.

* Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window so interpreted
* (whole-stage off) results are more stable when comparing metric caching changes.
*/
private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure about the benchmark numbers in the PR description. It says it's comparing origin/master to this PR, but here we are making changes to how these benchmarks are run. Are we doing apples-to-apples comparison here? Maybe it would be better to compare the following two:

  • This PR
  • This PR without the changes in MergeRowsExec

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, it is actually right. i had to modify the benchmark to increase the warmup, in order to get more consistent numbers. But I did run these two cases on the same new benchmark on this pr

  • spark code of origin/master
  • spark code of this pr (with cached metrics)

That being said, let me see if i need to do the benchmark changes

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description with a new A/B run on the same harness for both sides: MergeRowsExec at 1ad4fa420cd (before the cache) vs this PR (with cache), with only that file swapped between runs. Both used 7s warmup and 7s timed window via extended codegenBenchmark. The earlier table compared against origin/master with mixed harness settings and is replaced.

szehon-ho added 2 commits May 19, 2026 16:32
Resolve each SQLMetric on first use per partition and call longMetric()
directly in MergeRowIterator per review feedback.
…enchmark

Add warmupTime, minTime, and optional numIters to codegenBenchmark. Use
7s warmup and timed window in MergeRowsExecBenchmark without a separate
helper or redundant pre-runs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants